LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant - mgr.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 81.2 % 1154 937 217 937
Current Date: 2024-01-09 02:06:09 Functions: 55.1 % 214 118 96 118
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! This module acts as a switchboard to access different repositories managed by this
       2                 : //! page server.
       3                 : 
       4                 : use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
       5                 : use pageserver_api::key::Key;
       6                 : use pageserver_api::shard::{ShardIdentity, ShardNumber, TenantShardId};
       7                 : use rand::{distributions::Alphanumeric, Rng};
       8                 : use std::borrow::Cow;
       9                 : use std::collections::{BTreeMap, HashMap};
      10                 : use std::ops::Deref;
      11                 : use std::sync::Arc;
      12                 : use std::time::{Duration, Instant};
      13                 : use tokio::fs;
      14                 : use utils::timeout::{timeout_cancellable, TimeoutCancellableError};
      15                 : 
      16                 : use anyhow::Context;
      17                 : use once_cell::sync::Lazy;
      18                 : use tokio::task::JoinSet;
      19                 : use tokio_util::sync::CancellationToken;
      20                 : use tracing::*;
      21                 : 
      22                 : use remote_storage::GenericRemoteStorage;
      23                 : use utils::crashsafe;
      24                 : 
      25                 : use crate::config::PageServerConf;
      26                 : use crate::context::{DownloadBehavior, RequestContext};
      27                 : use crate::control_plane_client::{
      28                 :     ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError,
      29                 : };
      30                 : use crate::deletion_queue::DeletionQueueClient;
      31                 : use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
      32                 : use crate::task_mgr::{self, TaskKind};
      33                 : use crate::tenant::config::{
      34                 :     AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, TenantConfOpt,
      35                 : };
      36                 : use crate::tenant::delete::DeleteTenantFlow;
      37                 : use crate::tenant::span::debug_assert_current_span_has_tenant_id;
      38                 : use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState};
      39                 : use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
      40                 : 
      41                 : use utils::crashsafe::path_with_suffix_extension;
      42                 : use utils::fs_ext::PathExt;
      43                 : use utils::generation::Generation;
      44                 : use utils::id::{TenantId, TimelineId};
      45                 : 
      46                 : use super::delete::DeleteTenantError;
      47                 : use super::secondary::SecondaryTenant;
      48                 : use super::TenantSharedResources;
      49                 : 
      50                 : /// For a tenant that appears in TenantsMap, it may either be
      51                 : /// - `Attached`: has a full Tenant object, is elegible to service
      52                 : ///    reads and ingest WAL.
      53                 : /// - `Secondary`: is only keeping a local cache warm.
      54                 : ///
      55                 : /// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
      56                 : /// that way we avoid having to carefully switch a tenant's ingestion etc on and off during
      57                 : /// its lifetime, and we can preserve some important safety invariants like `Tenant` always
      58                 : /// having a properly acquired generation (Secondary doesn't need a generation)
      59                 : pub(crate) enum TenantSlot {
      60                 :     Attached(Arc<Tenant>),
      61                 :     Secondary(Arc<SecondaryTenant>),
      62                 :     /// In this state, other administrative operations acting on the TenantId should
      63                 :     /// block, or return a retry indicator equivalent to HTTP 503.
      64                 :     InProgress(utils::completion::Barrier),
      65                 : }
      66                 : 
      67                 : impl std::fmt::Debug for TenantSlot {
      68 UBC           0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      69               0 :         match self {
      70               0 :             Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()),
      71               0 :             Self::Secondary(_) => write!(f, "Secondary"),
      72               0 :             Self::InProgress(_) => write!(f, "InProgress"),
      73                 :         }
      74               0 :     }
      75                 : }
      76                 : 
      77                 : impl TenantSlot {
      78                 :     /// Return the `Tenant` in this slot if attached, else None
      79 CBC        1231 :     fn get_attached(&self) -> Option<&Arc<Tenant>> {
      80            1231 :         match self {
      81            1208 :             Self::Attached(t) => Some(t),
      82              11 :             Self::Secondary(_) => None,
      83              12 :             Self::InProgress(_) => None,
      84                 :         }
      85            1231 :     }
      86                 : }
      87                 : 
      88                 : /// The tenants known to the pageserver.
      89                 : /// The enum variants are used to distinguish the different states that the pageserver can be in.
      90                 : pub(crate) enum TenantsMap {
      91                 :     /// [`init_tenant_mgr`] is not done yet.
      92                 :     Initializing,
      93                 :     /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
      94                 :     /// New tenants can be added using [`tenant_map_acquire_slot`].
      95                 :     Open(BTreeMap<TenantShardId, TenantSlot>),
      96                 :     /// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
      97                 :     /// Existing tenants are still accessible, but no new tenants can be created.
      98                 :     ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
      99                 : }
     100                 : 
     101                 : pub(crate) enum TenantsMapRemoveResult {
     102                 :     Occupied(TenantSlot),
     103                 :     Vacant,
     104                 :     InProgress(utils::completion::Barrier),
     105                 : }
     106                 : 
     107                 : /// When resolving a TenantId to a shard, we may be looking for the 0th
     108                 : /// shard, or we might be looking for whichever shard holds a particular page.
     109                 : pub(crate) enum ShardSelector {
     110                 :     /// Only return the 0th shard, if it is present.  If a non-0th shard is present,
     111                 :     /// ignore it.
     112                 :     Zero,
     113                 :     /// Pick the first shard we find for the TenantId
     114                 :     First,
     115                 :     /// Pick the shard that holds this key
     116                 :     Page(Key),
     117                 : }
     118                 : 
     119                 : impl TenantsMap {
     120                 :     /// Convenience function for typical usage, where we want to get a `Tenant` object, for
     121                 :     /// working with attached tenants.  If the TenantId is in the map but in Secondary state,
     122                 :     /// None is returned.
     123             322 :     pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc<Tenant>> {
     124             322 :         match self {
     125 UBC           0 :             TenantsMap::Initializing => None,
     126 CBC         322 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
     127             322 :                 m.get(tenant_shard_id).and_then(|slot| slot.get_attached())
     128                 :             }
     129                 :         }
     130             322 :     }
     131                 : 
     132                 :     /// A page service client sends a TenantId, and to look up the correct Tenant we must
     133                 :     /// resolve this to a fully qualified TenantShardId.
     134            8067 :     fn resolve_attached_shard(
     135            8067 :         &self,
     136            8067 :         tenant_id: &TenantId,
     137            8067 :         selector: ShardSelector,
     138            8067 :     ) -> Option<TenantShardId> {
     139            8067 :         let mut want_shard = None;
     140            8067 :         match self {
     141 UBC           0 :             TenantsMap::Initializing => None,
     142 CBC        8067 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
     143            8067 :                 for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
     144                 :                     // Ignore all slots that don't contain an attached tenant
     145            8060 :                     let tenant = match &slot.1 {
     146            7553 :                         TenantSlot::Attached(t) => t,
     147             507 :                         _ => continue,
     148                 :                     };
     149                 : 
     150             592 :                     match selector {
     151            6961 :                         ShardSelector::First => return Some(*slot.0),
     152             592 :                         ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
     153             592 :                             return Some(*slot.0)
     154                 :                         }
     155 UBC           0 :                         ShardSelector::Page(key) => {
     156               0 :                             // First slot we see for this tenant, calculate the expected shard number
     157               0 :                             // for the key: we will use this for checking if this and subsequent
     158               0 :                             // slots contain the key, rather than recalculating the hash each time.
     159               0 :                             if want_shard.is_none() {
     160               0 :                                 want_shard = Some(tenant.shard_identity.get_shard_number(&key));
     161               0 :                             }
     162                 : 
     163               0 :                             if Some(tenant.shard_identity.number) == want_shard {
     164               0 :                                 return Some(*slot.0);
     165               0 :                             }
     166                 :                         }
     167               0 :                         _ => continue,
     168                 :                     }
     169                 :                 }
     170                 : 
     171                 :                 // Fall through: we didn't find an acceptable shard
     172 CBC         514 :                 None
     173                 :             }
     174                 :         }
     175            8067 :     }
     176                 : 
     177                 :     /// Only for use from DeleteTenantFlow.  This method directly removes a TenantSlot from the map.
     178                 :     ///
     179                 :     /// The normal way to remove a tenant is using a SlotGuard, which will gracefully remove the guarded
     180                 :     /// slot if the enclosed tenant is shutdown.
     181              57 :     pub(crate) fn remove(&mut self, tenant_shard_id: TenantShardId) -> TenantsMapRemoveResult {
     182              57 :         use std::collections::btree_map::Entry;
     183              57 :         match self {
     184 UBC           0 :             TenantsMap::Initializing => TenantsMapRemoveResult::Vacant,
     185 CBC          57 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => match m.entry(tenant_shard_id) {
     186              57 :                 Entry::Occupied(entry) => match entry.get() {
     187               1 :                     TenantSlot::InProgress(barrier) => {
     188               1 :                         TenantsMapRemoveResult::InProgress(barrier.clone())
     189                 :                     }
     190              56 :                     _ => TenantsMapRemoveResult::Occupied(entry.remove()),
     191                 :                 },
     192 UBC           0 :                 Entry::Vacant(_entry) => TenantsMapRemoveResult::Vacant,
     193                 :             },
     194                 :         }
     195 CBC          57 :     }
     196                 : 
     197              57 :     pub(crate) fn len(&self) -> usize {
     198              57 :         match self {
     199 UBC           0 :             TenantsMap::Initializing => 0,
     200 CBC          57 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.len(),
     201                 :         }
     202              57 :     }
     203                 : }
     204                 : 
     205                 : /// This is "safe" in that that it won't leave behind a partially deleted directory
     206                 : /// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting
     207                 : /// the contents.
     208                 : ///
     209                 : /// This is pageserver-specific, as it relies on future processes after a crash to check
     210                 : /// for TEMP_FILE_SUFFIX when loading things.
     211              11 : async fn safe_remove_tenant_dir_all(path: impl AsRef<Utf8Path>) -> std::io::Result<()> {
     212              31 :     let tmp_path = safe_rename_tenant_dir(path).await?;
     213              11 :     fs::remove_dir_all(tmp_path).await
     214              11 : }
     215                 : 
     216              76 : async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<Utf8PathBuf> {
     217              76 :     let parent = path
     218              76 :         .as_ref()
     219              76 :         .parent()
     220              76 :         // It is invalid to call this function with a relative path.  Tenant directories
     221              76 :         // should always have a parent.
     222              76 :         .ok_or(std::io::Error::new(
     223              76 :             std::io::ErrorKind::InvalidInput,
     224              76 :             "Path must be absolute",
     225              76 :         ))?;
     226              76 :     let rand_suffix = rand::thread_rng()
     227              76 :         .sample_iter(&Alphanumeric)
     228              76 :         .take(8)
     229              76 :         .map(char::from)
     230              76 :         .collect::<String>()
     231              76 :         + TEMP_FILE_SUFFIX;
     232              76 :     let tmp_path = path_with_suffix_extension(&path, &rand_suffix);
     233              76 :     fs::rename(path.as_ref(), &tmp_path).await?;
     234              76 :     fs::File::open(parent).await?.sync_all().await?;
     235              76 :     Ok(tmp_path)
     236              76 : }
     237                 : 
     238                 : static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
     239             558 :     Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
     240                 : 
     241                 : /// The TenantManager is responsible for storing and mutating the collection of all tenants
     242                 : /// that this pageserver process has state for.  Every Tenant and SecondaryTenant instance
     243                 : /// lives inside the TenantManager.
     244                 : ///
     245                 : /// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
     246                 : /// the same tenant twice concurrently, or trying to configure the same tenant into secondary
     247                 : /// and attached modes concurrently.
     248                 : pub struct TenantManager {
     249                 :     conf: &'static PageServerConf,
     250                 :     // TODO: currently this is a &'static pointing to TENANTs.  When we finish refactoring
     251                 :     // out of that static variable, the TenantManager can own this.
     252                 :     // See https://github.com/neondatabase/neon/issues/5796
     253                 :     tenants: &'static std::sync::RwLock<TenantsMap>,
     254                 :     resources: TenantSharedResources,
     255                 : }
     256                 : 
     257               1 : fn emergency_generations(
     258               1 :     tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
     259               1 : ) -> HashMap<TenantShardId, Generation> {
     260               1 :     tenant_confs
     261               1 :         .iter()
     262               1 :         .filter_map(|(tid, lc)| {
     263               1 :             let lc = match lc {
     264               1 :                 Ok(lc) => lc,
     265 UBC           0 :                 Err(_) => return None,
     266                 :             };
     267 CBC           1 :             let gen = match &lc.mode {
     268               1 :                 LocationMode::Attached(alc) => Some(alc.generation),
     269 UBC           0 :                 LocationMode::Secondary(_) => None,
     270                 :             };
     271                 : 
     272 CBC           1 :             gen.map(|g| (*tid, g))
     273               1 :         })
     274               1 :         .collect()
     275               1 : }
     276                 : 
     277             557 : async fn init_load_generations(
     278             557 :     conf: &'static PageServerConf,
     279             557 :     tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
     280             557 :     resources: &TenantSharedResources,
     281             557 :     cancel: &CancellationToken,
     282             557 : ) -> anyhow::Result<Option<HashMap<TenantShardId, Generation>>> {
     283             557 :     let generations = if conf.control_plane_emergency_mode {
     284               1 :         error!(
     285               1 :             "Emergency mode!  Tenants will be attached unsafely using their last known generation"
     286               1 :         );
     287               1 :         emergency_generations(tenant_confs)
     288             556 :     } else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
     289             555 :         info!("Calling control plane API to re-attach tenants");
     290                 :         // If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
     291            1686 :         match client.re_attach().await {
     292             555 :             Ok(tenants) => tenants,
     293                 :             Err(RetryForeverError::ShuttingDown) => {
     294 UBC           0 :                 anyhow::bail!("Shut down while waiting for control plane re-attach response")
     295                 :             }
     296                 :         }
     297                 :     } else {
     298 CBC           1 :         info!("Control plane API not configured, tenant generations are disabled");
     299               1 :         return Ok(None);
     300                 :     };
     301                 : 
     302                 :     // The deletion queue needs to know about the startup attachment state to decide which (if any) stored
     303                 :     // deletion list entries may still be valid.  We provide that by pushing a recovery operation into
     304                 :     // the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
     305                 :     // are processed, even though we don't block on recovery completing here.
     306                 :     //
     307                 :     // Must only do this if remote storage is enabled, otherwise deletion queue
     308                 :     // is not running and channel push will fail.
     309             556 :     if resources.remote_storage.is_some() {
     310             556 :         resources
     311             556 :             .deletion_queue_client
     312             556 :             .recover(generations.clone())?;
     313 UBC           0 :     }
     314                 : 
     315 CBC         556 :     Ok(Some(generations))
     316             557 : }
     317                 : 
     318                 : /// Given a directory discovered in the pageserver's tenants/ directory, attempt
     319                 : /// to load a tenant config from it.
     320                 : ///
     321                 : /// If file is missing, return Ok(None)
     322             221 : fn load_tenant_config(
     323             221 :     conf: &'static PageServerConf,
     324             221 :     dentry: Utf8DirEntry,
     325             221 : ) -> anyhow::Result<Option<(TenantShardId, anyhow::Result<LocationConf>)>> {
     326             221 :     let tenant_dir_path = dentry.path().to_path_buf();
     327             221 :     if crate::is_temporary(&tenant_dir_path) {
     328 UBC           0 :         info!("Found temporary tenant directory, removing: {tenant_dir_path}");
     329                 :         // No need to use safe_remove_tenant_dir_all because this is already
     330                 :         // a temporary path
     331               0 :         if let Err(e) = std::fs::remove_dir_all(&tenant_dir_path) {
     332               0 :             error!(
     333               0 :                 "Failed to remove temporary directory '{}': {:?}",
     334               0 :                 tenant_dir_path, e
     335               0 :             );
     336               0 :         }
     337               0 :         return Ok(None);
     338 CBC         221 :     }
     339                 : 
     340                 :     // This case happens if we crash during attachment before writing a config into the dir
     341             221 :     let is_empty = tenant_dir_path
     342             221 :         .is_empty_dir()
     343             221 :         .with_context(|| format!("Failed to check whether {tenant_dir_path:?} is an empty dir"))?;
     344             221 :     if is_empty {
     345               2 :         info!("removing empty tenant directory {tenant_dir_path:?}");
     346               2 :         if let Err(e) = std::fs::remove_dir(&tenant_dir_path) {
     347 UBC           0 :             error!(
     348               0 :                 "Failed to remove empty tenant directory '{}': {e:#}",
     349               0 :                 tenant_dir_path
     350               0 :             )
     351 CBC           2 :         }
     352               2 :         return Ok(None);
     353             219 :     }
     354             219 : 
     355             219 :     let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
     356             219 :     if tenant_ignore_mark_file.exists() {
     357               1 :         info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
     358               1 :         return Ok(None);
     359             218 :     }
     360                 : 
     361             218 :     let tenant_shard_id = match tenant_dir_path
     362             218 :         .file_name()
     363             218 :         .unwrap_or_default()
     364             218 :         .parse::<TenantShardId>()
     365                 :     {
     366             218 :         Ok(id) => id,
     367                 :         Err(_) => {
     368 UBC           0 :             warn!("Invalid tenant path (garbage in our repo directory?): {tenant_dir_path}",);
     369               0 :             return Ok(None);
     370                 :         }
     371                 :     };
     372                 : 
     373 CBC         218 :     Ok(Some((
     374             218 :         tenant_shard_id,
     375             218 :         Tenant::load_tenant_config(conf, &tenant_shard_id),
     376             218 :     )))
     377             221 : }
     378                 : 
     379                 : /// Initial stage of load: walk the local tenants directory, clean up any temp files,
     380                 : /// and load configurations for the tenants we found.
     381                 : ///
     382                 : /// Do this in parallel, because we expect 10k+ tenants, so serial execution can take
     383                 : /// seconds even on reasonably fast drives.
     384             557 : async fn init_load_tenant_configs(
     385             557 :     conf: &'static PageServerConf,
     386             557 : ) -> anyhow::Result<HashMap<TenantShardId, anyhow::Result<LocationConf>>> {
     387             557 :     let tenants_dir = conf.tenants_path();
     388                 : 
     389             557 :     let dentries = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<Utf8DirEntry>> {
     390             557 :         let dir_entries = tenants_dir
     391             557 :             .read_dir_utf8()
     392             557 :             .with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
     393                 : 
     394             557 :         Ok(dir_entries.collect::<Result<Vec<_>, std::io::Error>>()?)
     395             557 :     })
     396             557 :     .await??;
     397                 : 
     398             557 :     let mut configs = HashMap::new();
     399             557 : 
     400             557 :     let mut join_set = JoinSet::new();
     401             778 :     for dentry in dentries {
     402             221 :         join_set.spawn_blocking(move || load_tenant_config(conf, dentry));
     403             221 :     }
     404                 : 
     405             778 :     while let Some(r) = join_set.join_next().await {
     406             221 :         if let Some((tenant_id, tenant_config)) = r?? {
     407             218 :             configs.insert(tenant_id, tenant_config);
     408             218 :         }
     409                 :     }
     410                 : 
     411             557 :     Ok(configs)
     412             557 : }
     413                 : 
     414                 : /// Initialize repositories with locally available timelines.
     415                 : /// Timelines that are only partially available locally (remote storage has more data than this pageserver)
     416                 : /// are scheduled for download and added to the tenant once download is completed.
     417             557 : #[instrument(skip_all)]
     418                 : pub async fn init_tenant_mgr(
     419                 :     conf: &'static PageServerConf,
     420                 :     resources: TenantSharedResources,
     421                 :     init_order: InitializationOrder,
     422                 :     cancel: CancellationToken,
     423                 : ) -> anyhow::Result<TenantManager> {
     424                 :     let mut tenants = BTreeMap::new();
     425                 : 
     426                 :     let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
     427                 : 
     428                 :     // Scan local filesystem for attached tenants
     429                 :     let tenant_configs = init_load_tenant_configs(conf).await?;
     430                 : 
     431                 :     // Determine which tenants are to be attached
     432                 :     let tenant_generations =
     433                 :         init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
     434                 : 
     435             557 :     tracing::info!(
     436             557 :         "Attaching {} tenants at startup, warming up {} at a time",
     437             557 :         tenant_configs.len(),
     438             557 :         conf.concurrent_tenant_warmup.initial_permits()
     439             557 :     );
     440                 :     TENANT.startup_scheduled.inc_by(tenant_configs.len() as u64);
     441                 : 
     442                 :     // Construct `Tenant` objects and start them running
     443                 :     for (tenant_shard_id, location_conf) in tenant_configs {
     444                 :         let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
     445                 : 
     446                 :         let mut location_conf = match location_conf {
     447                 :             Ok(l) => l,
     448                 :             Err(e) => {
     449 UBC           0 :                 warn!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Marking tenant broken, failed to {e:#}");
     450                 : 
     451                 :                 tenants.insert(
     452                 :                     tenant_shard_id,
     453                 :                     TenantSlot::Attached(Tenant::create_broken_tenant(
     454                 :                         conf,
     455                 :                         tenant_shard_id,
     456                 :                         format!("{}", e),
     457                 :                     )),
     458                 :                 );
     459                 :                 continue;
     460                 :             }
     461                 :         };
     462                 : 
     463                 :         let generation = if let Some(generations) = &tenant_generations {
     464                 :             // We have a generation map: treat it as the authority for whether
     465                 :             // this tenant is really attached.
     466                 :             if let Some(gen) = generations.get(&tenant_shard_id) {
     467                 :                 *gen
     468                 :             } else {
     469                 :                 match &location_conf.mode {
     470                 :                     LocationMode::Secondary(secondary_config) => {
     471                 :                         // We do not require the control plane's permission for secondary mode
     472                 :                         // tenants, because they do no remote writes and hence require no
     473                 :                         // generation number
     474 CBC           5 :                         info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Loaded tenant in secondary mode");
     475                 :                         tenants.insert(
     476                 :                             tenant_shard_id,
     477                 :                             TenantSlot::Secondary(SecondaryTenant::new(
     478                 :                                 tenant_shard_id,
     479                 :                                 secondary_config,
     480                 :                             )),
     481                 :                         );
     482                 :                     }
     483                 :                     LocationMode::Attached(_) => {
     484                 :                         // TODO: augment re-attach API to enable the control plane to
     485                 :                         // instruct us about secondary attachments.  That way, instead of throwing
     486                 :                         // away local state, we can gracefully fall back to secondary here, if the control
     487                 :                         // plane tells us so.
     488                 :                         // (https://github.com/neondatabase/neon/issues/5377)
     489              11 :                         info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Detaching tenant, control plane omitted it in re-attach response");
     490                 :                         if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await {
     491 UBC           0 :                             error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
     492               0 :                                 "Failed to remove detached tenant directory '{tenant_dir_path}': {e:?}",
     493               0 :                             );
     494                 :                         }
     495                 :                     }
     496                 :                 };
     497                 : 
     498                 :                 continue;
     499                 :             }
     500                 :         } else {
     501                 :             // Legacy mode: no generation information, any tenant present
     502                 :             // on local disk may activate
     503               0 :             info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Starting tenant in legacy mode, no generation",);
     504                 :             Generation::none()
     505                 :         };
     506                 : 
     507                 :         // Presence of a generation number implies attachment: attach the tenant
     508                 :         // if it wasn't already, and apply the generation number.
     509                 :         location_conf.attach_in_generation(generation);
     510                 :         Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
     511                 : 
     512                 :         let shard_identity = location_conf.shard;
     513                 :         match tenant_spawn(
     514                 :             conf,
     515                 :             tenant_shard_id,
     516                 :             &tenant_dir_path,
     517                 :             resources.clone(),
     518                 :             AttachedTenantConf::try_from(location_conf)?,
     519                 :             shard_identity,
     520                 :             Some(init_order.clone()),
     521                 :             &TENANTS,
     522                 :             SpawnMode::Normal,
     523                 :             &ctx,
     524                 :         ) {
     525                 :             Ok(tenant) => {
     526                 :                 tenants.insert(tenant_shard_id, TenantSlot::Attached(tenant));
     527                 :             }
     528                 :             Err(e) => {
     529               0 :                 error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to start tenant: {e:#}");
     530                 :             }
     531                 :         }
     532                 :     }
     533                 : 
     534 CBC         557 :     info!("Processed {} local tenants at startup", tenants.len());
     535                 : 
     536                 :     let mut tenants_map = TENANTS.write().unwrap();
     537                 :     assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
     538                 :     METRICS.tenant_slots.set(tenants.len() as u64);
     539                 :     *tenants_map = TenantsMap::Open(tenants);
     540                 : 
     541                 :     Ok(TenantManager {
     542                 :         conf,
     543                 :         tenants: &TENANTS,
     544                 :         resources,
     545                 :     })
     546                 : }
     547                 : 
     548                 : /// Wrapper for Tenant::spawn that checks invariants before running, and inserts
     549                 : /// a broken tenant in the map if Tenant::spawn fails.
     550                 : #[allow(clippy::too_many_arguments)]
     551             736 : pub(crate) fn tenant_spawn(
     552             736 :     conf: &'static PageServerConf,
     553             736 :     tenant_shard_id: TenantShardId,
     554             736 :     tenant_path: &Utf8Path,
     555             736 :     resources: TenantSharedResources,
     556             736 :     location_conf: AttachedTenantConf,
     557             736 :     shard_identity: ShardIdentity,
     558             736 :     init_order: Option<InitializationOrder>,
     559             736 :     tenants: &'static std::sync::RwLock<TenantsMap>,
     560             736 :     mode: SpawnMode,
     561             736 :     ctx: &RequestContext,
     562             736 : ) -> anyhow::Result<Arc<Tenant>> {
     563             736 :     anyhow::ensure!(
     564             736 :         tenant_path.is_dir(),
     565 UBC           0 :         "Cannot load tenant from path {tenant_path:?}, it either does not exist or not a directory"
     566                 :     );
     567                 :     anyhow::ensure!(
     568 CBC         736 :         !crate::is_temporary(tenant_path),
     569 UBC           0 :         "Cannot load tenant from temporary path {tenant_path:?}"
     570                 :     );
     571                 :     anyhow::ensure!(
     572 CBC         736 :         !tenant_path.is_empty_dir().with_context(|| {
     573 UBC           0 :             format!("Failed to check whether {tenant_path:?} is an empty dir")
     574 CBC         736 :         })?,
     575 UBC           0 :         "Cannot load tenant from empty directory {tenant_path:?}"
     576                 :     );
     577                 : 
     578 CBC         736 :     let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
     579             736 :     anyhow::ensure!(
     580             736 :         !conf.tenant_ignore_mark_file_path(&tenant_shard_id).exists(),
     581 UBC           0 :         "Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
     582                 :     );
     583                 : 
     584 CBC         736 :     info!(
     585             736 :         tenant_id = %tenant_shard_id.tenant_id,
     586             736 :         shard_id = %tenant_shard_id.shard_slug(),
     587             736 :         generation = ?location_conf.location.generation,
     588             736 :         attach_mode = ?location_conf.location.attach_mode,
     589             736 :         "Attaching tenant"
     590             736 :     );
     591             736 :     let tenant = match Tenant::spawn(
     592             736 :         conf,
     593             736 :         tenant_shard_id,
     594             736 :         resources,
     595             736 :         location_conf,
     596             736 :         shard_identity,
     597             736 :         init_order,
     598             736 :         tenants,
     599             736 :         mode,
     600             736 :         ctx,
     601             736 :     ) {
     602             736 :         Ok(tenant) => tenant,
     603 UBC           0 :         Err(e) => {
     604               0 :             error!("Failed to spawn tenant {tenant_shard_id}, reason: {e:#}");
     605               0 :             Tenant::create_broken_tenant(conf, tenant_shard_id, format!("{e:#}"))
     606                 :         }
     607                 :     };
     608                 : 
     609 CBC         736 :     Ok(tenant)
     610             736 : }
     611                 : 
     612                 : ///
     613                 : /// Shut down all tenants. This runs as part of pageserver shutdown.
     614                 : ///
     615                 : /// NB: We leave the tenants in the map, so that they remain accessible through
     616                 : /// the management API until we shut it down. If we removed the shut-down tenants
     617                 : /// from the tenants map, the management API would return 404 for these tenants,
     618                 : /// because TenantsMap::get() now returns `None`.
     619                 : /// That could be easily misinterpreted by control plane, the consumer of the
     620                 : /// management API. For example, it could attach the tenant on a different pageserver.
     621                 : /// We would then be in split-brain once this pageserver restarts.
     622             159 : #[instrument(skip_all)]
     623                 : pub(crate) async fn shutdown_all_tenants() {
     624                 :     shutdown_all_tenants0(&TENANTS).await
     625                 : }
     626                 : 
     627             160 : async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
     628             160 :     use utils::completion;
     629             160 : 
     630             160 :     let mut join_set = JoinSet::new();
     631                 : 
     632                 :     // Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
     633             160 :     let (total_in_progress, total_attached) = {
     634             160 :         let mut m = tenants.write().unwrap();
     635             160 :         match &mut *m {
     636                 :             TenantsMap::Initializing => {
     637 UBC           0 :                 *m = TenantsMap::ShuttingDown(BTreeMap::default());
     638               0 :                 info!("tenants map is empty");
     639               0 :                 return;
     640                 :             }
     641 CBC         160 :             TenantsMap::Open(tenants) => {
     642             160 :                 let mut shutdown_state = BTreeMap::new();
     643             160 :                 let mut total_in_progress = 0;
     644             160 :                 let mut total_attached = 0;
     645                 : 
     646             185 :                 for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
     647             185 :                     match v {
     648             179 :                         TenantSlot::Attached(t) => {
     649             179 :                             shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
     650             179 :                             join_set.spawn(
     651             179 :                                 async move {
     652             179 :                                     let freeze_and_flush = true;
     653                 : 
     654             179 :                                     let res = {
     655             179 :                                         let (_guard, shutdown_progress) = completion::channel();
     656             408 :                                         t.shutdown(shutdown_progress, freeze_and_flush).await
     657                 :                                     };
     658                 : 
     659             179 :                                     if let Err(other_progress) = res {
     660                 :                                         // join the another shutdown in progress
     661 UBC           0 :                                         other_progress.wait().await;
     662 CBC         179 :                                     }
     663                 : 
     664                 :                                     // we cannot afford per tenant logging here, because if s3 is degraded, we are
     665                 :                                     // going to log too many lines
     666             179 :                                     debug!("tenant successfully stopped");
     667             179 :                                 }
     668             179 :                                 .instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug())),
     669                 :                             );
     670                 : 
     671             179 :                             total_attached += 1;
     672                 :                         }
     673               5 :                         TenantSlot::Secondary(state) => {
     674               5 :                             // We don't need to wait for this individually per-tenant: the
     675               5 :                             // downloader task will be waited on eventually, this cancel
     676               5 :                             // is just to encourage it to drop out if it is doing work
     677               5 :                             // for this tenant right now.
     678               5 :                             state.cancel.cancel();
     679               5 : 
     680               5 :                             shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state));
     681               5 :                         }
     682               1 :                         TenantSlot::InProgress(notify) => {
     683               1 :                             // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
     684               1 :                             // wait for their notifications to fire in this function.
     685               1 :                             join_set.spawn(async move {
     686               1 :                                 notify.wait().await;
     687               1 :                             });
     688               1 : 
     689               1 :                             total_in_progress += 1;
     690               1 :                         }
     691                 :                     }
     692                 :                 }
     693             160 :                 *m = TenantsMap::ShuttingDown(shutdown_state);
     694             160 :                 (total_in_progress, total_attached)
     695                 :             }
     696                 :             TenantsMap::ShuttingDown(_) => {
     697 UBC           0 :                 error!("already shutting down, this function isn't supposed to be called more than once");
     698               0 :                 return;
     699                 :             }
     700                 :         }
     701                 :     };
     702                 : 
     703 CBC         160 :     let started_at = std::time::Instant::now();
     704                 : 
     705             160 :     info!(
     706             160 :         "Waiting for {} InProgress tenants and {} Attached tenants to shut down",
     707             160 :         total_in_progress, total_attached
     708             160 :     );
     709                 : 
     710             160 :     let total = join_set.len();
     711             160 :     let mut panicked = 0;
     712             160 :     let mut buffering = true;
     713             160 :     const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
     714             160 :     let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
     715                 : 
     716             347 :     while !join_set.is_empty() {
     717             187 :         tokio::select! {
     718             180 :             Some(joined) = join_set.join_next() => {
     719                 :                 match joined {
     720                 :                     Ok(()) => {}
     721                 :                     Err(join_error) if join_error.is_cancelled() => {
     722                 :                         unreachable!("we are not cancelling any of the tasks");
     723                 :                     }
     724                 :                     Err(join_error) if join_error.is_panic() => {
     725                 :                         // cannot really do anything, as this panic is likely a bug
     726                 :                         panicked += 1;
     727                 :                     }
     728                 :                     Err(join_error) => {
     729 UBC           0 :                         warn!("unknown kind of JoinError: {join_error}");
     730                 :                     }
     731                 :                 }
     732                 :                 if !buffering {
     733                 :                     // buffer so that every 500ms since the first update (or starting) we'll log
     734                 :                     // how far away we are; this is because we will get SIGKILL'd at 10s, and we
     735                 :                     // are not able to log *then*.
     736                 :                     buffering = true;
     737                 :                     buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
     738                 :                 }
     739                 :             },
     740                 :             _ = &mut buffered, if buffering => {
     741                 :                 buffering = false;
     742 CBC           7 :                 info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
     743                 :             }
     744                 :         }
     745                 :     }
     746                 : 
     747             160 :     if panicked > 0 {
     748 UBC           0 :         warn!(
     749               0 :             panicked,
     750               0 :             total, "observed panicks while shutting down tenants"
     751               0 :         );
     752 CBC         160 :     }
     753                 : 
     754                 :     // caller will log how long we took
     755             160 : }
     756                 : 
     757             431 : pub(crate) async fn create_tenant(
     758             431 :     conf: &'static PageServerConf,
     759             431 :     tenant_conf: TenantConfOpt,
     760             431 :     tenant_shard_id: TenantShardId,
     761             431 :     generation: Generation,
     762             431 :     resources: TenantSharedResources,
     763             431 :     ctx: &RequestContext,
     764             431 : ) -> Result<Arc<Tenant>, TenantMapInsertError> {
     765             431 :     let location_conf = LocationConf::attached_single(tenant_conf, generation);
     766             431 :     info!("Creating tenant at location {location_conf:?}");
     767                 : 
     768             431 :     let slot_guard =
     769             431 :         tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
     770             861 :     let tenant_path = super::create_tenant_files(conf, &location_conf, &tenant_shard_id).await?;
     771                 : 
     772             430 :     let shard_identity = location_conf.shard;
     773             430 :     let created_tenant = tenant_spawn(
     774             430 :         conf,
     775             430 :         tenant_shard_id,
     776             430 :         &tenant_path,
     777             430 :         resources,
     778             430 :         AttachedTenantConf::try_from(location_conf)?,
     779             430 :         shard_identity,
     780             430 :         None,
     781             430 :         &TENANTS,
     782             430 :         SpawnMode::Create,
     783             430 :         ctx,
     784 UBC           0 :     )?;
     785                 :     // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
     786                 :     //      See https://github.com/neondatabase/neon/issues/4233
     787                 : 
     788 CBC         430 :     let created_tenant_id = created_tenant.tenant_id();
     789             430 :     debug_assert_eq!(created_tenant_id, tenant_shard_id.tenant_id);
     790                 : 
     791             430 :     slot_guard.upsert(TenantSlot::Attached(created_tenant.clone()))?;
     792                 : 
     793             430 :     Ok(created_tenant)
     794             431 : }
     795                 : 
     796 UBC           0 : #[derive(Debug, thiserror::Error)]
     797                 : pub(crate) enum SetNewTenantConfigError {
     798                 :     #[error(transparent)]
     799                 :     GetTenant(#[from] GetTenantError),
     800                 :     #[error(transparent)]
     801                 :     Persist(anyhow::Error),
     802                 : }
     803                 : 
     804 CBC          30 : pub(crate) async fn set_new_tenant_config(
     805              30 :     conf: &'static PageServerConf,
     806              30 :     new_tenant_conf: TenantConfOpt,
     807              30 :     tenant_id: TenantId,
     808              30 : ) -> Result<(), SetNewTenantConfigError> {
     809              30 :     // Legacy API: does not support sharding
     810              30 :     let tenant_shard_id = TenantShardId::unsharded(tenant_id);
     811                 : 
     812              30 :     info!("configuring tenant {tenant_id}");
     813              30 :     let tenant = get_tenant(tenant_shard_id, true)?;
     814                 : 
     815                 :     // This is a legacy API that only operates on attached tenants: the preferred
     816                 :     // API to use is the location_config/ endpoint, which lets the caller provide
     817                 :     // the full LocationConf.
     818              30 :     let location_conf = LocationConf::attached_single(new_tenant_conf, tenant.generation);
     819              30 : 
     820              30 :     Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf)
     821              60 :         .await
     822              30 :         .map_err(SetNewTenantConfigError::Persist)?;
     823              30 :     tenant.set_new_tenant_config(new_tenant_conf);
     824              30 :     Ok(())
     825              30 : }
     826                 : 
     827                 : impl TenantManager {
     828                 :     /// Convenience function so that anyone with a TenantManager can get at the global configuration, without
     829                 :     /// having to pass it around everywhere as a separate object.
     830            1121 :     pub(crate) fn get_conf(&self) -> &'static PageServerConf {
     831            1121 :         self.conf
     832            1121 :     }
     833                 : 
     834                 :     /// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or is not fitting to the query.
     835                 :     /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
     836             850 :     pub(crate) fn get_attached_tenant_shard(
     837             850 :         &self,
     838             850 :         tenant_shard_id: TenantShardId,
     839             850 :         active_only: bool,
     840             850 :     ) -> Result<Arc<Tenant>, GetTenantError> {
     841             850 :         let locked = self.tenants.read().unwrap();
     842                 : 
     843             850 :         let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
     844                 : 
     845             849 :         match peek_slot {
     846             849 :             Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
     847                 :                 TenantState::Broken {
     848 UBC           0 :                     reason,
     849               0 :                     backtrace: _,
     850               0 :                 } if active_only => Err(GetTenantError::Broken(reason)),
     851 CBC         848 :                 TenantState::Active => Ok(Arc::clone(tenant)),
     852                 :                 _ => {
     853               1 :                     if active_only {
     854 UBC           0 :                         Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
     855                 :                     } else {
     856 CBC           1 :                         Ok(Arc::clone(tenant))
     857                 :                     }
     858                 :                 }
     859                 :             },
     860                 :             Some(TenantSlot::InProgress(_)) => {
     861 UBC           0 :                 Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
     862                 :             }
     863                 :             None | Some(TenantSlot::Secondary(_)) => {
     864 CBC           1 :                 Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
     865                 :             }
     866                 :         }
     867             850 :     }
     868                 : 
     869               4 :     pub(crate) fn get_secondary_tenant_shard(
     870               4 :         &self,
     871               4 :         tenant_shard_id: TenantShardId,
     872               4 :     ) -> Option<Arc<SecondaryTenant>> {
     873               4 :         let locked = self.tenants.read().unwrap();
     874               4 : 
     875               4 :         let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
     876               4 :             .ok()
     877               4 :             .flatten();
     878                 : 
     879               4 :         match peek_slot {
     880               4 :             Some(TenantSlot::Secondary(s)) => Some(s.clone()),
     881 UBC           0 :             _ => None,
     882                 :         }
     883 CBC           4 :     }
     884                 : 
     885             117 :     #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
     886                 :     pub(crate) async fn upsert_location(
     887                 :         &self,
     888                 :         tenant_shard_id: TenantShardId,
     889                 :         new_location_config: LocationConf,
     890                 :         flush: Option<Duration>,
     891                 :         ctx: &RequestContext,
     892                 :     ) -> Result<(), anyhow::Error> {
     893                 :         debug_assert_current_span_has_tenant_id();
     894             117 :         info!("configuring tenant location to state {new_location_config:?}");
     895                 : 
     896                 :         enum FastPathModified {
     897                 :             Attached(Arc<Tenant>),
     898                 :             Secondary(Arc<SecondaryTenant>),
     899                 :         }
     900                 : 
     901                 :         // Special case fast-path for updates to existing slots: if our upsert is only updating configuration,
     902                 :         // then we do not need to set the slot to InProgress, we can just call into the
     903                 :         // existng tenant.
     904                 :         let fast_path_taken = {
     905                 :             let locked = self.tenants.read().unwrap();
     906                 :             let peek_slot =
     907                 :                 tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
     908                 :             match (&new_location_config.mode, peek_slot) {
     909                 :                 (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
     910                 :                     if attach_conf.generation == tenant.generation {
     911                 :                         // A transition from Attached to Attached in the same generation, we may
     912                 :                         // take our fast path and just provide the updated configuration
     913                 :                         // to the tenant.
     914                 :                         tenant.set_new_location_config(AttachedTenantConf::try_from(
     915                 :                             new_location_config.clone(),
     916                 :                         )?);
     917                 : 
     918                 :                         Some(FastPathModified::Attached(tenant.clone()))
     919                 :                     } else {
     920                 :                         // Different generations, fall through to general case
     921                 :                         None
     922                 :                     }
     923                 :                 }
     924                 :                 (
     925                 :                     LocationMode::Secondary(secondary_conf),
     926                 :                     Some(TenantSlot::Secondary(secondary_tenant)),
     927                 :                 ) => {
     928                 :                     secondary_tenant.set_config(secondary_conf);
     929                 :                     Some(FastPathModified::Secondary(secondary_tenant.clone()))
     930                 :                 }
     931                 :                 _ => {
     932                 :                     // Not an Attached->Attached transition, fall through to general case
     933                 :                     None
     934                 :                 }
     935                 :             }
     936                 :         };
     937                 : 
     938                 :         // Fast-path continued: having dropped out of the self.tenants lock, do the async
     939                 :         // phase of writing config and/or waiting for flush, before returning.
     940                 :         match fast_path_taken {
     941                 :             Some(FastPathModified::Attached(tenant)) => {
     942                 :                 Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
     943                 :                     .await
     944                 :                     .map_err(SetNewTenantConfigError::Persist)?;
     945                 : 
     946                 :                 // Transition to AttachedStale means we may well hold a valid generation
     947                 :                 // still, and have been requested to go stale as part of a migration.  If
     948                 :                 // the caller set `flush`, then flush to remote storage.
     949                 :                 if let LocationMode::Attached(AttachedLocationConfig {
     950                 :                     generation: _,
     951                 :                     attach_mode: AttachmentMode::Stale,
     952                 :                 }) = &new_location_config.mode
     953                 :                 {
     954                 :                     if let Some(flush_timeout) = flush {
     955                 :                         match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
     956                 :                             Ok(Err(e)) => {
     957                 :                                 return Err(e);
     958                 :                             }
     959                 :                             Ok(Ok(_)) => return Ok(()),
     960                 :                             Err(_) => {
     961 UBC           0 :                                 tracing::warn!(
     962               0 :                                 timeout_ms = flush_timeout.as_millis(),
     963               0 :                                 "Timed out waiting for flush to remote storage, proceeding anyway."
     964               0 :                             )
     965                 :                             }
     966                 :                         }
     967                 :                     }
     968                 :                 }
     969                 : 
     970                 :                 return Ok(());
     971                 :             }
     972                 :             Some(FastPathModified::Secondary(_secondary_tenant)) => {
     973                 :                 Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
     974                 :                     .await
     975                 :                     .map_err(SetNewTenantConfigError::Persist)?;
     976                 : 
     977                 :                 return Ok(());
     978                 :             }
     979                 :             None => {
     980                 :                 // Proceed with the general case procedure, where we will shutdown & remove any existing
     981                 :                 // slot contents and replace with a fresh one
     982                 :             }
     983                 :         };
     984                 : 
     985                 :         // General case for upserts to TenantsMap, excluding the case above: we will substitute an
     986                 :         // InProgress value to the slot while we make whatever changes are required.  The state for
     987                 :         // the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
     988                 :         // the state is ill-defined while we're in transition.  Transitions are async, but fast: we do
     989                 :         // not do significant I/O, and shutdowns should be prompt via cancellation tokens.
     990                 :         let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
     991                 : 
     992                 :         match slot_guard.get_old_value() {
     993                 :             Some(TenantSlot::Attached(tenant)) => {
     994                 :                 // The case where we keep a Tenant alive was covered above in the special case
     995                 :                 // for Attached->Attached transitions in the same generation.  By this point,
     996                 :                 // if we see an attached tenant we know it will be discarded and should be
     997                 :                 // shut down.
     998                 :                 let (_guard, progress) = utils::completion::channel();
     999                 : 
    1000                 :                 match tenant.get_attach_mode() {
    1001                 :                     AttachmentMode::Single | AttachmentMode::Multi => {
    1002                 :                         // Before we leave our state as the presumed holder of the latest generation,
    1003                 :                         // flush any outstanding deletions to reduce the risk of leaking objects.
    1004                 :                         self.resources.deletion_queue_client.flush_advisory()
    1005                 :                     }
    1006                 :                     AttachmentMode::Stale => {
    1007                 :                         // If we're stale there's not point trying to flush deletions
    1008                 :                     }
    1009                 :                 };
    1010                 : 
    1011 CBC          34 :                 info!("Shutting down attached tenant");
    1012                 :                 match tenant.shutdown(progress, false).await {
    1013                 :                     Ok(()) => {}
    1014                 :                     Err(barrier) => {
    1015 UBC           0 :                         info!("Shutdown already in progress, waiting for it to complete");
    1016                 :                         barrier.wait().await;
    1017                 :                     }
    1018                 :                 }
    1019                 :                 slot_guard.drop_old_value().expect("We just shut it down");
    1020                 :             }
    1021                 :             Some(TenantSlot::Secondary(state)) => {
    1022 CBC          16 :                 info!("Shutting down secondary tenant");
    1023                 :                 state.shutdown().await;
    1024                 :             }
    1025                 :             Some(TenantSlot::InProgress(_)) => {
    1026                 :                 // This should never happen: acquire_slot should error out
    1027                 :                 // if the contents of a slot were InProgress.
    1028                 :                 anyhow::bail!("Acquired an InProgress slot, this is a bug.")
    1029                 :             }
    1030                 :             None => {
    1031                 :                 // Slot was vacant, nothing needs shutting down.
    1032                 :             }
    1033                 :         }
    1034                 : 
    1035                 :         let tenant_path = self.conf.tenant_path(&tenant_shard_id);
    1036                 :         let timelines_path = self.conf.timelines_path(&tenant_shard_id);
    1037                 : 
    1038                 :         // Directory structure is the same for attached and secondary modes:
    1039                 :         // create it if it doesn't exist.  Timeline load/creation expects the
    1040                 :         // timelines/ subdir to already exist.
    1041                 :         //
    1042                 :         // Does not need to be fsync'd because local storage is just a cache.
    1043                 :         tokio::fs::create_dir_all(&timelines_path)
    1044                 :             .await
    1045 UBC           0 :             .with_context(|| format!("Creating {timelines_path}"))?;
    1046                 : 
    1047                 :         // Before activating either secondary or attached mode, persist the
    1048                 :         // configuration, so that on restart we will re-attach (or re-start
    1049                 :         // secondary) on the tenant.
    1050                 :         Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
    1051                 :             .await
    1052                 :             .map_err(SetNewTenantConfigError::Persist)?;
    1053                 : 
    1054                 :         let new_slot = match &new_location_config.mode {
    1055                 :             LocationMode::Secondary(secondary_config) => {
    1056                 :                 TenantSlot::Secondary(SecondaryTenant::new(tenant_shard_id, secondary_config))
    1057                 :             }
    1058                 :             LocationMode::Attached(_attach_config) => {
    1059                 :                 let shard_identity = new_location_config.shard;
    1060                 :                 let tenant = tenant_spawn(
    1061                 :                     self.conf,
    1062                 :                     tenant_shard_id,
    1063                 :                     &tenant_path,
    1064                 :                     self.resources.clone(),
    1065                 :                     AttachedTenantConf::try_from(new_location_config)?,
    1066                 :                     shard_identity,
    1067                 :                     None,
    1068                 :                     self.tenants,
    1069                 :                     SpawnMode::Normal,
    1070                 :                     ctx,
    1071                 :                 )?;
    1072                 : 
    1073                 :                 TenantSlot::Attached(tenant)
    1074                 :             }
    1075                 :         };
    1076                 : 
    1077                 :         slot_guard.upsert(new_slot)?;
    1078                 : 
    1079                 :         Ok(())
    1080                 :     }
    1081                 : 
    1082                 :     /// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
    1083                 :     /// LocationConf that was last used to attach it.  Optionally, the local file cache may be
    1084                 :     /// dropped before re-attaching.
    1085                 :     ///
    1086                 :     /// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
    1087                 :     /// where an issue is identified that would go away with a restart of the tenant.
    1088                 :     ///
    1089                 :     /// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
    1090                 :     /// to respect the cancellation tokens used in normal shutdown().
    1091               0 :     #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
    1092                 :     pub(crate) async fn reset_tenant(
    1093                 :         &self,
    1094                 :         tenant_shard_id: TenantShardId,
    1095                 :         drop_cache: bool,
    1096                 :         ctx: RequestContext,
    1097                 :     ) -> anyhow::Result<()> {
    1098                 :         let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
    1099                 :         let Some(old_slot) = slot_guard.get_old_value() else {
    1100                 :             anyhow::bail!("Tenant not found when trying to reset");
    1101                 :         };
    1102                 : 
    1103                 :         let Some(tenant) = old_slot.get_attached() else {
    1104                 :             slot_guard.revert();
    1105                 :             anyhow::bail!("Tenant is not in attached state");
    1106                 :         };
    1107                 : 
    1108                 :         let (_guard, progress) = utils::completion::channel();
    1109                 :         match tenant.shutdown(progress, false).await {
    1110                 :             Ok(()) => {
    1111                 :                 slot_guard.drop_old_value()?;
    1112                 :             }
    1113                 :             Err(_barrier) => {
    1114                 :                 slot_guard.revert();
    1115                 :                 anyhow::bail!("Cannot reset Tenant, already shutting down");
    1116                 :             }
    1117                 :         }
    1118                 : 
    1119                 :         let tenant_path = self.conf.tenant_path(&tenant_shard_id);
    1120                 :         let timelines_path = self.conf.timelines_path(&tenant_shard_id);
    1121                 :         let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
    1122                 : 
    1123                 :         if drop_cache {
    1124 CBC           1 :             tracing::info!("Dropping local file cache");
    1125                 : 
    1126                 :             match tokio::fs::read_dir(&timelines_path).await {
    1127                 :                 Err(e) => {
    1128 UBC           0 :                     tracing::warn!("Failed to list timelines while dropping cache: {}", e);
    1129                 :                 }
    1130                 :                 Ok(mut entries) => {
    1131                 :                     while let Some(entry) = entries.next_entry().await? {
    1132                 :                         tokio::fs::remove_dir_all(entry.path()).await?;
    1133                 :                     }
    1134                 :                 }
    1135                 :             }
    1136                 :         }
    1137                 : 
    1138                 :         let shard_identity = config.shard;
    1139                 :         let tenant = tenant_spawn(
    1140                 :             self.conf,
    1141                 :             tenant_shard_id,
    1142                 :             &tenant_path,
    1143                 :             self.resources.clone(),
    1144                 :             AttachedTenantConf::try_from(config)?,
    1145                 :             shard_identity,
    1146                 :             None,
    1147                 :             self.tenants,
    1148                 :             SpawnMode::Normal,
    1149                 :             &ctx,
    1150                 :         )?;
    1151                 : 
    1152                 :         slot_guard.upsert(TenantSlot::Attached(tenant))?;
    1153                 : 
    1154                 :         Ok(())
    1155                 :     }
    1156                 : 
    1157 CBC        1109 :     pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec<Arc<Tenant>> {
    1158            1109 :         let locked = self.tenants.read().unwrap();
    1159            1109 :         match &*locked {
    1160 UBC           0 :             TenantsMap::Initializing => Vec::new(),
    1161 CBC        1109 :             TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => map
    1162            1109 :                 .values()
    1163            1109 :                 .filter_map(|slot| {
    1164             908 :                     slot.get_attached()
    1165             908 :                         .and_then(|t| if t.is_active() { Some(t.clone()) } else { None })
    1166            1109 :                 })
    1167            1109 :                 .collect(),
    1168                 :         }
    1169            1109 :     }
    1170                 :     // Do some synchronous work for all tenant slots in Secondary state.  The provided
    1171                 :     // callback should be small and fast, as it will be called inside the global
    1172                 :     // TenantsMap lock.
    1173            1109 :     pub(crate) fn foreach_secondary_tenants<F>(&self, mut func: F)
    1174            1109 :     where
    1175            1109 :         // TODO: let the callback return a hint to drop out of the loop early
    1176            1109 :         F: FnMut(&TenantShardId, &Arc<SecondaryTenant>),
    1177            1109 :     {
    1178            1109 :         let locked = self.tenants.read().unwrap();
    1179                 : 
    1180            1109 :         let map = match &*locked {
    1181 UBC           0 :             TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
    1182 CBC        1109 :             TenantsMap::Open(m) => m,
    1183                 :         };
    1184                 : 
    1185            2017 :         for (tenant_id, slot) in map {
    1186             908 :             if let TenantSlot::Secondary(state) = slot {
    1187                 :                 // Only expose secondary tenants that are not currently shutting down
    1188              11 :                 if !state.cancel.is_cancelled() {
    1189              11 :                     func(tenant_id, state)
    1190 UBC           0 :                 }
    1191 CBC         897 :             }
    1192                 :         }
    1193            1109 :     }
    1194                 : 
    1195              91 :     pub(crate) async fn delete_tenant(
    1196              91 :         &self,
    1197              91 :         tenant_shard_id: TenantShardId,
    1198              91 :         activation_timeout: Duration,
    1199              91 :     ) -> Result<(), DeleteTenantError> {
    1200                 :         // We acquire a SlotGuard during this function to protect against concurrent
    1201                 :         // changes while the ::prepare phase of DeleteTenantFlow executes, but then
    1202                 :         // have to return the Tenant to the map while the background deletion runs.
    1203                 :         //
    1204                 :         // TODO: refactor deletion to happen outside the lifetime of a Tenant.
    1205                 :         // Currently, deletion requires a reference to the tenants map in order to
    1206                 :         // keep the Tenant in the map until deletion is complete, and then remove
    1207                 :         // it at the end.
    1208                 :         //
    1209                 :         // See https://github.com/neondatabase/neon/issues/5080
    1210                 : 
    1211              90 :         let slot_guard =
    1212              91 :             tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
    1213                 : 
    1214                 :         // unwrap is safe because we used MustExist mode when acquiring
    1215              90 :         let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
    1216              90 :             TenantSlot::Attached(tenant) => tenant.clone(),
    1217                 :             _ => {
    1218                 :                 // Express "not attached" as equivalent to "not found"
    1219 UBC           0 :                 return Err(DeleteTenantError::NotAttached);
    1220                 :             }
    1221                 :         };
    1222                 : 
    1223 CBC          90 :         match tenant.current_state() {
    1224              26 :             TenantState::Broken { .. } | TenantState::Stopping { .. } => {
    1225              26 :                 // If a tenant is broken or stopping, DeleteTenantFlow can
    1226              26 :                 // handle it: broken tenants proceed to delete, stopping tenants
    1227              26 :                 // are checked for deletion already in progress.
    1228              26 :             }
    1229                 :             _ => {
    1230              64 :                 tenant
    1231              64 :                     .wait_to_become_active(activation_timeout)
    1232               2 :                     .await
    1233              64 :                     .map_err(|e| match e {
    1234                 :                         GetActiveTenantError::WillNotBecomeActive(_) => {
    1235 UBC           0 :                             DeleteTenantError::InvalidState(tenant.current_state())
    1236                 :                         }
    1237               0 :                         GetActiveTenantError::Cancelled => DeleteTenantError::Cancelled,
    1238               0 :                         GetActiveTenantError::NotFound(_) => DeleteTenantError::NotAttached,
    1239                 :                         GetActiveTenantError::WaitForActiveTimeout {
    1240               0 :                             latest_state: _latest_state,
    1241               0 :                             wait_time: _wait_time,
    1242               0 :                         } => DeleteTenantError::InvalidState(tenant.current_state()),
    1243 CBC          64 :                     })?;
    1244                 :             }
    1245                 :         }
    1246                 : 
    1247              90 :         let result = DeleteTenantFlow::run(
    1248              90 :             self.conf,
    1249              90 :             self.resources.remote_storage.clone(),
    1250              90 :             &TENANTS,
    1251              90 :             tenant,
    1252              90 :         )
    1253             446 :         .await;
    1254                 : 
    1255                 :         // The Tenant goes back into the map in Stopping state, it will eventually be removed by DeleteTenantFLow
    1256              90 :         slot_guard.revert();
    1257              90 :         result
    1258              91 :     }
    1259                 : }
    1260                 : 
    1261            2056 : #[derive(Debug, thiserror::Error)]
    1262                 : pub(crate) enum GetTenantError {
    1263                 :     #[error("Tenant {0} not found")]
    1264                 :     NotFound(TenantId),
    1265                 :     #[error("Tenant {0} is not active")]
    1266                 :     NotActive(TenantId),
    1267                 :     /// Broken is logically a subset of NotActive, but a distinct error is useful as
    1268                 :     /// NotActive is usually a retryable state for API purposes, whereas Broken
    1269                 :     /// is a stuck error state
    1270                 :     #[error("Tenant is broken: {0}")]
    1271                 :     Broken(String),
    1272                 : 
    1273                 :     // Initializing or shutting down: cannot authoritatively say whether we have this tenant
    1274                 :     #[error("Tenant map is not available: {0}")]
    1275                 :     MapState(#[from] TenantMapError),
    1276                 : }
    1277                 : 
    1278                 : /// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
    1279                 : /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
    1280                 : ///
    1281                 : /// This method is cancel-safe.
    1282            6149 : pub(crate) fn get_tenant(
    1283            6149 :     tenant_shard_id: TenantShardId,
    1284            6149 :     active_only: bool,
    1285            6149 : ) -> Result<Arc<Tenant>, GetTenantError> {
    1286            6149 :     let locked = TENANTS.read().unwrap();
    1287                 : 
    1288            6149 :     let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
    1289                 : 
    1290            6084 :     match peek_slot {
    1291            6084 :         Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
    1292                 :             TenantState::Broken {
    1293               1 :                 reason,
    1294               1 :                 backtrace: _,
    1295               1 :             } if active_only => Err(GetTenantError::Broken(reason)),
    1296            5729 :             TenantState::Active => Ok(Arc::clone(tenant)),
    1297                 :             _ => {
    1298             354 :                 if active_only {
    1299               6 :                     Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
    1300                 :                 } else {
    1301             348 :                     Ok(Arc::clone(tenant))
    1302                 :                 }
    1303                 :             }
    1304                 :         },
    1305                 :         Some(TenantSlot::InProgress(_)) => {
    1306 UBC           0 :             Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
    1307                 :         }
    1308                 :         None | Some(TenantSlot::Secondary(_)) => {
    1309 CBC          65 :             Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
    1310                 :         }
    1311                 :     }
    1312            6149 : }
    1313                 : 
    1314            2056 : #[derive(thiserror::Error, Debug)]
    1315                 : pub(crate) enum GetActiveTenantError {
    1316                 :     /// We may time out either while TenantSlot is InProgress, or while the Tenant
    1317                 :     /// is in a non-Active state
    1318                 :     #[error(
    1319                 :         "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
    1320                 :     )]
    1321                 :     WaitForActiveTimeout {
    1322                 :         latest_state: Option<TenantState>,
    1323                 :         wait_time: Duration,
    1324                 :     },
    1325                 : 
    1326                 :     /// The TenantSlot is absent, or in secondary mode
    1327                 :     #[error(transparent)]
    1328                 :     NotFound(#[from] GetTenantError),
    1329                 : 
    1330                 :     /// Cancellation token fired while we were waiting
    1331                 :     #[error("cancelled")]
    1332                 :     Cancelled,
    1333                 : 
    1334                 :     /// Tenant exists, but is in a state that cannot become active (e.g. Stopping, Broken)
    1335                 :     #[error("will not become active.  Current state: {0}")]
    1336                 :     WillNotBecomeActive(TenantState),
    1337                 : }
    1338                 : 
    1339                 : /// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`]
    1340                 : /// state, then wait for up to `timeout`.  If the [`Tenant`] is not currently in [`TenantState::Active`],
    1341                 : /// then wait for up to `timeout` (minus however long we waited for the slot).
    1342            8067 : pub(crate) async fn get_active_tenant_with_timeout(
    1343            8067 :     tenant_id: TenantId,
    1344            8067 :     shard_selector: ShardSelector,
    1345            8067 :     timeout: Duration,
    1346            8067 :     cancel: &CancellationToken,
    1347            8067 : ) -> Result<Arc<Tenant>, GetActiveTenantError> {
    1348            8067 :     enum WaitFor {
    1349            8067 :         Barrier(utils::completion::Barrier),
    1350            8067 :         Tenant(Arc<Tenant>),
    1351            8067 :     }
    1352            8067 : 
    1353            8067 :     let wait_start = Instant::now();
    1354            8067 :     let deadline = wait_start + timeout;
    1355                 : 
    1356             141 :     let (wait_for, tenant_shard_id) = {
    1357            8067 :         let locked = TENANTS.read().unwrap();
    1358                 : 
    1359                 :         // Resolve TenantId to TenantShardId
    1360            8067 :         let tenant_shard_id = locked
    1361            8067 :             .resolve_attached_shard(&tenant_id, shard_selector)
    1362            8067 :             .ok_or(GetActiveTenantError::NotFound(GetTenantError::NotFound(
    1363            8067 :                 tenant_id,
    1364            8067 :             )))?;
    1365                 : 
    1366            7553 :         let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
    1367            7553 :             .map_err(GetTenantError::MapState)?;
    1368            7553 :         match peek_slot {
    1369            7553 :             Some(TenantSlot::Attached(tenant)) => {
    1370            7553 :                 match tenant.current_state() {
    1371                 :                     TenantState::Active => {
    1372                 :                         // Fast path: we don't need to do any async waiting.
    1373            7412 :                         return Ok(tenant.clone());
    1374                 :                     }
    1375                 :                     _ => {
    1376             141 :                         tenant.activate_now();
    1377             141 :                         (WaitFor::Tenant(tenant.clone()), tenant_shard_id)
    1378                 :                     }
    1379                 :                 }
    1380                 :             }
    1381                 :             Some(TenantSlot::Secondary(_)) => {
    1382 UBC           0 :                 return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
    1383               0 :                     tenant_id,
    1384               0 :                 )))
    1385                 :             }
    1386               0 :             Some(TenantSlot::InProgress(barrier)) => {
    1387               0 :                 (WaitFor::Barrier(barrier.clone()), tenant_shard_id)
    1388                 :             }
    1389                 :             None => {
    1390               0 :                 return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
    1391               0 :                     tenant_id,
    1392               0 :                 )))
    1393                 :             }
    1394                 :         }
    1395                 :     };
    1396                 : 
    1397 CBC         141 :     let tenant = match wait_for {
    1398 UBC           0 :         WaitFor::Barrier(barrier) => {
    1399               0 :             tracing::debug!("Waiting for tenant InProgress state to pass...");
    1400               0 :             timeout_cancellable(
    1401               0 :                 deadline.duration_since(Instant::now()),
    1402               0 :                 cancel,
    1403               0 :                 barrier.wait(),
    1404               0 :             )
    1405               0 :             .await
    1406               0 :             .map_err(|e| match e {
    1407               0 :                 TimeoutCancellableError::Timeout => GetActiveTenantError::WaitForActiveTimeout {
    1408               0 :                     latest_state: None,
    1409               0 :                     wait_time: wait_start.elapsed(),
    1410               0 :                 },
    1411               0 :                 TimeoutCancellableError::Cancelled => GetActiveTenantError::Cancelled,
    1412               0 :             })?;
    1413                 :             {
    1414               0 :                 let locked = TENANTS.read().unwrap();
    1415               0 :                 let peek_slot =
    1416               0 :                     tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
    1417               0 :                         .map_err(GetTenantError::MapState)?;
    1418               0 :                 match peek_slot {
    1419               0 :                     Some(TenantSlot::Attached(tenant)) => tenant.clone(),
    1420                 :                     _ => {
    1421               0 :                         return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
    1422               0 :                             tenant_id,
    1423               0 :                         )))
    1424                 :                     }
    1425                 :                 }
    1426                 :             }
    1427                 :         }
    1428 CBC         141 :         WaitFor::Tenant(tenant) => tenant,
    1429                 :     };
    1430                 : 
    1431 UBC           0 :     tracing::debug!("Waiting for tenant to enter active state...");
    1432 CBC         141 :     tenant
    1433             141 :         .wait_to_become_active(deadline.duration_since(Instant::now()))
    1434             427 :         .await?;
    1435             141 :     Ok(tenant)
    1436            8067 : }
    1437                 : 
    1438 UBC           0 : #[derive(Debug, thiserror::Error)]
    1439                 : pub(crate) enum DeleteTimelineError {
    1440                 :     #[error("Tenant {0}")]
    1441                 :     Tenant(#[from] GetTenantError),
    1442                 : 
    1443                 :     #[error("Timeline {0}")]
    1444                 :     Timeline(#[from] crate::tenant::DeleteTimelineError),
    1445                 : }
    1446                 : 
    1447               0 : #[derive(Debug, thiserror::Error)]
    1448                 : pub(crate) enum TenantStateError {
    1449                 :     #[error("Tenant {0} is stopping")]
    1450                 :     IsStopping(TenantId),
    1451                 :     #[error(transparent)]
    1452                 :     SlotError(#[from] TenantSlotError),
    1453                 :     #[error(transparent)]
    1454                 :     SlotUpsertError(#[from] TenantSlotUpsertError),
    1455                 :     #[error(transparent)]
    1456                 :     Other(#[from] anyhow::Error),
    1457                 : }
    1458                 : 
    1459 CBC          77 : pub(crate) async fn detach_tenant(
    1460              77 :     conf: &'static PageServerConf,
    1461              77 :     tenant_shard_id: TenantShardId,
    1462              77 :     detach_ignored: bool,
    1463              77 :     deletion_queue_client: &DeletionQueueClient,
    1464              77 : ) -> Result<(), TenantStateError> {
    1465              77 :     let tmp_path = detach_tenant0(
    1466              77 :         conf,
    1467              77 :         &TENANTS,
    1468              77 :         tenant_shard_id,
    1469              77 :         detach_ignored,
    1470              77 :         deletion_queue_client,
    1471              77 :     )
    1472             326 :     .await?;
    1473                 :     // Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
    1474                 :     // After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
    1475              65 :     let task_tenant_id = None;
    1476              65 :     task_mgr::spawn(
    1477              65 :         task_mgr::BACKGROUND_RUNTIME.handle(),
    1478              65 :         TaskKind::MgmtRequest,
    1479              65 :         task_tenant_id,
    1480              65 :         None,
    1481              65 :         "tenant_files_delete",
    1482              65 :         false,
    1483              65 :         async move {
    1484              65 :             fs::remove_dir_all(tmp_path.as_path())
    1485              65 :                 .await
    1486              65 :                 .with_context(|| format!("tenant directory {:?} deletion", tmp_path))
    1487              65 :         },
    1488              65 :     );
    1489              65 :     Ok(())
    1490              77 : }
    1491                 : 
    1492              77 : async fn detach_tenant0(
    1493              77 :     conf: &'static PageServerConf,
    1494              77 :     tenants: &std::sync::RwLock<TenantsMap>,
    1495              77 :     tenant_shard_id: TenantShardId,
    1496              77 :     detach_ignored: bool,
    1497              77 :     deletion_queue_client: &DeletionQueueClient,
    1498              77 : ) -> Result<Utf8PathBuf, TenantStateError> {
    1499              77 :     let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
    1500              65 :         let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
    1501              65 :         safe_rename_tenant_dir(&local_tenant_directory)
    1502             195 :             .await
    1503              77 :             .with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))
    1504              65 :     };
    1505                 : 
    1506              77 :     let removal_result = remove_tenant_from_memory(
    1507              77 :         tenants,
    1508              77 :         tenant_shard_id,
    1509              77 :         tenant_dir_rename_operation(tenant_shard_id),
    1510              77 :     )
    1511             323 :     .await;
    1512                 : 
    1513                 :     // Flush pending deletions, so that they have a good chance of passing validation
    1514                 :     // before this tenant is potentially re-attached elsewhere.
    1515              77 :     deletion_queue_client.flush_advisory();
    1516              77 : 
    1517              77 :     // Ignored tenants are not present in memory and will bail the removal from memory operation.
    1518              77 :     // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
    1519              77 :     if detach_ignored
    1520                 :         && matches!(
    1521              10 :             removal_result,
    1522                 :             Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
    1523                 :         )
    1524                 :     {
    1525              10 :         let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
    1526              10 :         if tenant_ignore_mark.exists() {
    1527               1 :             info!("Detaching an ignored tenant");
    1528               1 :             let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
    1529               3 :                 .await
    1530               1 :                 .with_context(|| {
    1531 UBC           0 :                     format!("Ignored tenant {tenant_shard_id} local directory rename")
    1532 CBC           1 :                 })?;
    1533               1 :             return Ok(tmp_path);
    1534               9 :         }
    1535              67 :     }
    1536                 : 
    1537              76 :     removal_result
    1538              77 : }
    1539                 : 
    1540               5 : pub(crate) async fn load_tenant(
    1541               5 :     conf: &'static PageServerConf,
    1542               5 :     tenant_id: TenantId,
    1543               5 :     generation: Generation,
    1544               5 :     broker_client: storage_broker::BrokerClientChannel,
    1545               5 :     remote_storage: Option<GenericRemoteStorage>,
    1546               5 :     deletion_queue_client: DeletionQueueClient,
    1547               5 :     ctx: &RequestContext,
    1548               5 : ) -> Result<(), TenantMapInsertError> {
    1549               5 :     // This is a legacy API (replaced by `/location_conf`).  It does not support sharding
    1550               5 :     let tenant_shard_id = TenantShardId::unsharded(tenant_id);
    1551                 : 
    1552               4 :     let slot_guard =
    1553               5 :         tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
    1554               4 :     let tenant_path = conf.tenant_path(&tenant_shard_id);
    1555               4 : 
    1556               4 :     let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
    1557               4 :     if tenant_ignore_mark.exists() {
    1558               4 :         std::fs::remove_file(&tenant_ignore_mark).with_context(|| {
    1559 UBC           0 :             format!(
    1560               0 :                 "Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"
    1561               0 :             )
    1562 CBC           4 :         })?;
    1563 UBC           0 :     }
    1564                 : 
    1565 CBC           4 :     let resources = TenantSharedResources {
    1566               4 :         broker_client,
    1567               4 :         remote_storage,
    1568               4 :         deletion_queue_client,
    1569               4 :     };
    1570                 : 
    1571               4 :     let mut location_conf =
    1572               4 :         Tenant::load_tenant_config(conf, &tenant_shard_id).map_err(TenantMapInsertError::Other)?;
    1573               4 :     location_conf.attach_in_generation(generation);
    1574               4 : 
    1575               8 :     Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
    1576                 : 
    1577               4 :     let shard_identity = location_conf.shard;
    1578               4 :     let new_tenant = tenant_spawn(
    1579               4 :         conf,
    1580               4 :         tenant_shard_id,
    1581               4 :         &tenant_path,
    1582               4 :         resources,
    1583               4 :         AttachedTenantConf::try_from(location_conf)?,
    1584               4 :         shard_identity,
    1585               4 :         None,
    1586               4 :         &TENANTS,
    1587               4 :         SpawnMode::Normal,
    1588               4 :         ctx,
    1589               4 :     )
    1590               4 :     .with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))?;
    1591                 : 
    1592               4 :     slot_guard.upsert(TenantSlot::Attached(new_tenant))?;
    1593               4 :     Ok(())
    1594               5 : }
    1595                 : 
    1596               6 : pub(crate) async fn ignore_tenant(
    1597               6 :     conf: &'static PageServerConf,
    1598               6 :     tenant_id: TenantId,
    1599               6 : ) -> Result<(), TenantStateError> {
    1600              21 :     ignore_tenant0(conf, &TENANTS, tenant_id).await
    1601               6 : }
    1602                 : 
    1603               6 : async fn ignore_tenant0(
    1604               6 :     conf: &'static PageServerConf,
    1605               6 :     tenants: &std::sync::RwLock<TenantsMap>,
    1606               6 :     tenant_id: TenantId,
    1607               6 : ) -> Result<(), TenantStateError> {
    1608               6 :     // This is a legacy API (replaced by `/location_conf`).  It does not support sharding
    1609               6 :     let tenant_shard_id = TenantShardId::unsharded(tenant_id);
    1610               6 : 
    1611               6 :     remove_tenant_from_memory(tenants, tenant_shard_id, async {
    1612               6 :         let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
    1613               6 :         fs::File::create(&ignore_mark_file)
    1614               6 :             .await
    1615               6 :             .context("Failed to create ignore mark file")
    1616               6 :             .and_then(|_| {
    1617               6 :                 crashsafe::fsync_file_and_parent(&ignore_mark_file)
    1618               6 :                     .context("Failed to fsync ignore mark file")
    1619               6 :             })
    1620               6 :             .with_context(|| format!("Failed to crate ignore mark for tenant {tenant_shard_id}"))?;
    1621               6 :         Ok(())
    1622               6 :     })
    1623              21 :     .await
    1624               6 : }
    1625                 : 
    1626 UBC           0 : #[derive(Debug, thiserror::Error)]
    1627                 : pub(crate) enum TenantMapListError {
    1628                 :     #[error("tenant map is still initiailizing")]
    1629                 :     Initializing,
    1630                 : }
    1631                 : 
    1632                 : ///
    1633                 : /// Get list of tenants, for the mgmt API
    1634                 : ///
    1635 CBC         239 : pub(crate) async fn list_tenants() -> Result<Vec<(TenantShardId, TenantState)>, TenantMapListError>
    1636             239 : {
    1637             239 :     let tenants = TENANTS.read().unwrap();
    1638             239 :     let m = match &*tenants {
    1639 UBC           0 :         TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
    1640 CBC         239 :         TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
    1641             239 :     };
    1642             239 :     Ok(m.iter()
    1643             240 :         .filter_map(|(id, tenant)| match tenant {
    1644             240 :             TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
    1645 UBC           0 :             TenantSlot::Secondary(_) => None,
    1646               0 :             TenantSlot::InProgress(_) => None,
    1647 CBC         240 :         })
    1648             239 :         .collect())
    1649             239 : }
    1650                 : 
    1651                 : /// Execute Attach mgmt API command.
    1652                 : ///
    1653                 : /// Downloading all the tenant data is performed in the background, this merely
    1654                 : /// spawns the background task and returns quickly.
    1655              44 : pub(crate) async fn attach_tenant(
    1656              44 :     conf: &'static PageServerConf,
    1657              44 :     tenant_id: TenantId,
    1658              44 :     generation: Generation,
    1659              44 :     tenant_conf: TenantConfOpt,
    1660              44 :     resources: TenantSharedResources,
    1661              44 :     ctx: &RequestContext,
    1662              44 : ) -> Result<(), TenantMapInsertError> {
    1663              44 :     // This is a legacy API (replaced by `/location_conf`).  It does not support sharding
    1664              44 :     let tenant_shard_id = TenantShardId::unsharded(tenant_id);
    1665                 : 
    1666              31 :     let slot_guard =
    1667              44 :         tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
    1668              31 :     let location_conf = LocationConf::attached_single(tenant_conf, generation);
    1669              58 :     let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_shard_id).await?;
    1670                 :     // TODO: tenant directory remains on disk if we bail out from here on.
    1671                 :     //       See https://github.com/neondatabase/neon/issues/4233
    1672                 : 
    1673              29 :     let shard_identity = location_conf.shard;
    1674              29 :     let attached_tenant = tenant_spawn(
    1675              29 :         conf,
    1676              29 :         tenant_shard_id,
    1677              29 :         &tenant_dir,
    1678              29 :         resources,
    1679              29 :         AttachedTenantConf::try_from(location_conf)?,
    1680              29 :         shard_identity,
    1681              29 :         None,
    1682              29 :         &TENANTS,
    1683              29 :         SpawnMode::Normal,
    1684              29 :         ctx,
    1685 UBC           0 :     )?;
    1686                 :     // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
    1687                 :     //      See https://github.com/neondatabase/neon/issues/4233
    1688                 : 
    1689 CBC          29 :     let attached_tenant_id = attached_tenant.tenant_id();
    1690              29 :     if tenant_id != attached_tenant_id {
    1691 UBC           0 :         return Err(TenantMapInsertError::Other(anyhow::anyhow!(
    1692               0 :             "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})",
    1693               0 :         )));
    1694 CBC          29 :     }
    1695              29 : 
    1696              29 :     slot_guard.upsert(TenantSlot::Attached(attached_tenant))?;
    1697              29 :     Ok(())
    1698              44 : }
    1699                 : 
    1700 UBC           0 : #[derive(Debug, thiserror::Error)]
    1701                 : pub(crate) enum TenantMapInsertError {
    1702                 :     #[error(transparent)]
    1703                 :     SlotError(#[from] TenantSlotError),
    1704                 :     #[error(transparent)]
    1705                 :     SlotUpsertError(#[from] TenantSlotUpsertError),
    1706                 :     #[error(transparent)]
    1707                 :     Other(#[from] anyhow::Error),
    1708                 : }
    1709                 : 
    1710                 : /// Superset of TenantMapError: issues that can occur when acquiring a slot
    1711                 : /// for a particular tenant ID.
    1712 CBC          14 : #[derive(Debug, thiserror::Error)]
    1713                 : pub enum TenantSlotError {
    1714                 :     /// When acquiring a slot with the expectation that the tenant already exists.
    1715                 :     #[error("Tenant {0} not found")]
    1716                 :     NotFound(TenantShardId),
    1717                 : 
    1718                 :     /// When acquiring a slot with the expectation that the tenant does not already exist.
    1719                 :     #[error("tenant {0} already exists, state: {1:?}")]
    1720                 :     AlreadyExists(TenantShardId, TenantState),
    1721                 : 
    1722                 :     #[error("tenant {0} already exists in but is not attached")]
    1723                 :     Conflict(TenantShardId),
    1724                 : 
    1725                 :     // Tried to read a slot that is currently being mutated by another administrative
    1726                 :     // operation.
    1727                 :     #[error("tenant has a state change in progress, try again later")]
    1728                 :     InProgress,
    1729                 : 
    1730                 :     #[error(transparent)]
    1731                 :     MapState(#[from] TenantMapError),
    1732                 : }
    1733                 : 
    1734                 : /// Superset of TenantMapError: issues that can occur when using a SlotGuard
    1735                 : /// to insert a new value.
    1736 UBC           0 : #[derive(Debug, thiserror::Error)]
    1737                 : pub enum TenantSlotUpsertError {
    1738                 :     /// An error where the slot is in an unexpected state, indicating a code bug
    1739                 :     #[error("Internal error updating Tenant")]
    1740                 :     InternalError(Cow<'static, str>),
    1741                 : 
    1742                 :     #[error(transparent)]
    1743                 :     MapState(#[from] TenantMapError),
    1744                 : }
    1745                 : 
    1746               0 : #[derive(Debug, thiserror::Error)]
    1747                 : enum TenantSlotDropError {
    1748                 :     /// It is only legal to drop a TenantSlot if its contents are fully shut down
    1749                 :     #[error("Tenant was not shut down")]
    1750                 :     NotShutdown,
    1751                 : }
    1752                 : 
    1753                 : /// Errors that can happen any time we are walking the tenant map to try and acquire
    1754                 : /// the TenantSlot for a particular tenant.
    1755               0 : #[derive(Debug, thiserror::Error)]
    1756                 : pub enum TenantMapError {
    1757                 :     // Tried to read while initializing
    1758                 :     #[error("tenant map is still initializing")]
    1759                 :     StillInitializing,
    1760                 : 
    1761                 :     // Tried to read while shutting down
    1762                 :     #[error("tenant map is shutting down")]
    1763                 :     ShuttingDown,
    1764                 : }
    1765                 : 
    1766                 : /// Guards a particular tenant_id's content in the TenantsMap.  While this
    1767                 : /// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
    1768                 : /// for this tenant, which acts as a marker for any operations targeting
    1769                 : /// this tenant to retry later, or wait for the InProgress state to end.
    1770                 : ///
    1771                 : /// This structure enforces the important invariant that we do not have overlapping
    1772                 : /// tasks that will try use local storage for a the same tenant ID: we enforce that
    1773                 : /// the previous contents of a slot have been shut down before the slot can be
    1774                 : /// left empty or used for something else
    1775                 : ///
    1776                 : /// Holders of a SlotGuard should explicitly dispose of it, using either `upsert`
    1777                 : /// to provide a new value, or `revert` to put the slot back into its initial
    1778                 : /// state.  If the SlotGuard is dropped without calling either of these, then
    1779                 : /// we will leave the slot empty if our `old_value` is already shut down, else
    1780                 : /// we will replace the slot with `old_value` (equivalent to doing a revert).
    1781                 : ///
    1782                 : /// The `old_value` may be dropped before the SlotGuard is dropped, by calling
    1783                 : /// `drop_old_value`.  It is an error to call this without shutting down
    1784                 : /// the conents of `old_value`.
    1785                 : pub struct SlotGuard {
    1786                 :     tenant_shard_id: TenantShardId,
    1787                 :     old_value: Option<TenantSlot>,
    1788                 :     upserted: bool,
    1789                 : 
    1790                 :     /// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will
    1791                 :     /// release any waiters as soon as this SlotGuard is dropped.
    1792                 :     _completion: utils::completion::Completion,
    1793                 : }
    1794                 : 
    1795                 : impl SlotGuard {
    1796 CBC         720 :     fn new(
    1797             720 :         tenant_shard_id: TenantShardId,
    1798             720 :         old_value: Option<TenantSlot>,
    1799             720 :         completion: utils::completion::Completion,
    1800             720 :     ) -> Self {
    1801             720 :         Self {
    1802             720 :             tenant_shard_id,
    1803             720 :             old_value,
    1804             720 :             upserted: false,
    1805             720 :             _completion: completion,
    1806             720 :         }
    1807             720 :     }
    1808                 : 
    1809                 :     /// Get any value that was present in the slot before we acquired ownership
    1810                 :     /// of it: in state transitions, this will be the old state.
    1811             254 :     fn get_old_value(&self) -> &Option<TenantSlot> {
    1812             254 :         &self.old_value
    1813             254 :     }
    1814                 : 
    1815                 :     /// Emplace a new value in the slot.  This consumes the guard, and after
    1816                 :     /// returning, the slot is no longer protected from concurrent changes.
    1817             646 :     fn upsert(mut self, new_value: TenantSlot) -> Result<(), TenantSlotUpsertError> {
    1818             646 :         if !self.old_value_is_shutdown() {
    1819                 :             // This is a bug: callers should never try to drop an old value without
    1820                 :             // shutting it down
    1821 UBC           0 :             return Err(TenantSlotUpsertError::InternalError(
    1822               0 :                 "Old TenantSlot value not shut down".into(),
    1823               0 :             ));
    1824 CBC         646 :         }
    1825                 : 
    1826             646 :         let replaced = {
    1827             646 :             let mut locked = TENANTS.write().unwrap();
    1828             646 : 
    1829             646 :             if let TenantSlot::InProgress(_) = new_value {
    1830                 :                 // It is never expected to try and upsert InProgress via this path: it should
    1831                 :                 // only be written via the tenant_map_acquire_slot path.  If we hit this it's a bug.
    1832 UBC           0 :                 return Err(TenantSlotUpsertError::InternalError(
    1833               0 :                     "Attempt to upsert an InProgress state".into(),
    1834               0 :                 ));
    1835 CBC         646 :             }
    1836                 : 
    1837             646 :             let m = match &mut *locked {
    1838 UBC           0 :                 TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
    1839                 :                 TenantsMap::ShuttingDown(_) => {
    1840               0 :                     return Err(TenantMapError::ShuttingDown.into());
    1841                 :                 }
    1842 CBC         646 :                 TenantsMap::Open(m) => m,
    1843             646 :             };
    1844             646 : 
    1845             646 :             let replaced = m.insert(self.tenant_shard_id, new_value);
    1846             646 :             self.upserted = true;
    1847             646 : 
    1848             646 :             METRICS.tenant_slots.set(m.len() as u64);
    1849             646 : 
    1850             646 :             replaced
    1851                 :         };
    1852                 : 
    1853                 :         // Sanity check: on an upsert we should always be replacing an InProgress marker
    1854             646 :         match replaced {
    1855                 :             Some(TenantSlot::InProgress(_)) => {
    1856                 :                 // Expected case: we find our InProgress in the map: nothing should have
    1857                 :                 // replaced it because the code that acquires slots will not grant another
    1858                 :                 // one for the same TenantId.
    1859             646 :                 Ok(())
    1860                 :             }
    1861                 :             None => {
    1862 UBC           0 :                 METRICS.unexpected_errors.inc();
    1863               0 :                 error!(
    1864               0 :                     tenant_shard_id = %self.tenant_shard_id,
    1865               0 :                     "Missing InProgress marker during tenant upsert, this is a bug."
    1866               0 :                 );
    1867               0 :                 Err(TenantSlotUpsertError::InternalError(
    1868               0 :                     "Missing InProgress marker during tenant upsert".into(),
    1869               0 :                 ))
    1870                 :             }
    1871               0 :             Some(slot) => {
    1872               0 :                 METRICS.unexpected_errors.inc();
    1873               0 :                 error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug.  Contents: {:?}", slot);
    1874               0 :                 Err(TenantSlotUpsertError::InternalError(
    1875               0 :                     "Unexpected contents of TenantSlot".into(),
    1876               0 :                 ))
    1877                 :             }
    1878                 :         }
    1879 CBC         646 :     }
    1880                 : 
    1881                 :     /// Replace the InProgress slot with whatever was in the guard when we started
    1882              90 :     fn revert(mut self) {
    1883              90 :         if let Some(value) = self.old_value.take() {
    1884              90 :             match self.upsert(value) {
    1885 UBC           0 :                 Err(TenantSlotUpsertError::InternalError(_)) => {
    1886               0 :                     // We already logged the error, nothing else we can do.
    1887               0 :                 }
    1888               0 :                 Err(TenantSlotUpsertError::MapState(_)) => {
    1889               0 :                     // If the map is shutting down, we need not replace anything
    1890               0 :                 }
    1891 CBC          90 :                 Ok(()) => {}
    1892                 :             }
    1893 UBC           0 :         }
    1894 CBC          90 :     }
    1895                 : 
    1896                 :     /// We may never drop our old value until it is cleanly shut down: otherwise we might leave
    1897                 :     /// rogue background tasks that would write to the local tenant directory that this guard
    1898                 :     /// is responsible for protecting
    1899             826 :     fn old_value_is_shutdown(&self) -> bool {
    1900             826 :         match self.old_value.as_ref() {
    1901             102 :             Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(),
    1902              21 :             Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(),
    1903                 :             Some(TenantSlot::InProgress(_)) => {
    1904                 :                 // A SlotGuard cannot be constructed for a slot that was already InProgress
    1905 UBC           0 :                 unreachable!()
    1906                 :             }
    1907 CBC         703 :             None => true,
    1908                 :         }
    1909             826 :     }
    1910                 : 
    1911                 :     /// The guard holder is done with the old value of the slot: they are obliged to already
    1912                 :     /// shut it down before we reach this point.
    1913             107 :     fn drop_old_value(&mut self) -> Result<(), TenantSlotDropError> {
    1914             107 :         if !self.old_value_is_shutdown() {
    1915 UBC           0 :             Err(TenantSlotDropError::NotShutdown)
    1916                 :         } else {
    1917 CBC         107 :             self.old_value.take();
    1918             107 :             Ok(())
    1919                 :         }
    1920             107 :     }
    1921                 : }
    1922                 : 
    1923                 : impl Drop for SlotGuard {
    1924             720 :     fn drop(&mut self) {
    1925             720 :         if self.upserted {
    1926             646 :             return;
    1927              74 :         }
    1928              74 :         // Our old value is already shutdown, or it never existed: it is safe
    1929              74 :         // for us to fully release the TenantSlot back into an empty state
    1930              74 : 
    1931              74 :         let mut locked = TENANTS.write().unwrap();
    1932                 : 
    1933              74 :         let m = match &mut *locked {
    1934                 :             TenantsMap::Initializing => {
    1935                 :                 // There is no map, this should never happen.
    1936               1 :                 return;
    1937                 :             }
    1938                 :             TenantsMap::ShuttingDown(_) => {
    1939                 :                 // When we transition to shutdown, InProgress elements are removed
    1940                 :                 // from the map, so we do not need to clean up our Inprogress marker.
    1941                 :                 // See [`shutdown_all_tenants0`]
    1942 UBC           0 :                 return;
    1943                 :             }
    1944 CBC          73 :             TenantsMap::Open(m) => m,
    1945              73 :         };
    1946              73 : 
    1947              73 :         use std::collections::btree_map::Entry;
    1948              73 :         match m.entry(self.tenant_shard_id) {
    1949              73 :             Entry::Occupied(mut entry) => {
    1950              73 :                 if !matches!(entry.get(), TenantSlot::InProgress(_)) {
    1951 UBC           0 :                     METRICS.unexpected_errors.inc();
    1952               0 :                     error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug.  Contents: {:?}", entry.get());
    1953 CBC          73 :                 }
    1954                 : 
    1955              73 :                 if self.old_value_is_shutdown() {
    1956              73 :                     entry.remove();
    1957              73 :                 } else {
    1958 UBC           0 :                     entry.insert(self.old_value.take().unwrap());
    1959               0 :                 }
    1960                 :             }
    1961                 :             Entry::Vacant(_) => {
    1962               0 :                 METRICS.unexpected_errors.inc();
    1963               0 :                 error!(
    1964               0 :                     tenant_shard_id = %self.tenant_shard_id,
    1965               0 :                     "Missing InProgress marker during SlotGuard drop, this is a bug."
    1966               0 :                 );
    1967                 :             }
    1968                 :         }
    1969                 : 
    1970 CBC          73 :         METRICS.tenant_slots.set(m.len() as u64);
    1971             720 :     }
    1972                 : }
    1973                 : 
    1974                 : enum TenantSlotPeekMode {
    1975                 :     /// In Read mode, peek will be permitted to see the slots even if the pageserver is shutting down
    1976                 :     Read,
    1977                 :     /// In Write mode, trying to peek at a slot while the pageserver is shutting down is an error
    1978                 :     Write,
    1979                 : }
    1980                 : 
    1981           14673 : fn tenant_map_peek_slot<'a>(
    1982           14673 :     tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
    1983           14673 :     tenant_shard_id: &TenantShardId,
    1984           14673 :     mode: TenantSlotPeekMode,
    1985           14673 : ) -> Result<Option<&'a TenantSlot>, TenantMapError> {
    1986           14673 :     let m = match tenants.deref() {
    1987 UBC           0 :         TenantsMap::Initializing => return Err(TenantMapError::StillInitializing),
    1988               0 :         TenantsMap::ShuttingDown(m) => match mode {
    1989               0 :             TenantSlotPeekMode::Read => m,
    1990                 :             TenantSlotPeekMode::Write => {
    1991               0 :                 return Err(TenantMapError::ShuttingDown);
    1992                 :             }
    1993                 :         },
    1994 CBC       14673 :         TenantsMap::Open(m) => m,
    1995                 :     };
    1996                 : 
    1997           14673 :     Ok(m.get(tenant_shard_id))
    1998           14673 : }
    1999                 : 
    2000                 : enum TenantSlotAcquireMode {
    2001                 :     /// Acquire the slot irrespective of current state, or whether it already exists
    2002                 :     Any,
    2003                 :     /// Return an error if trying to acquire a slot and it doesn't already exist
    2004                 :     MustExist,
    2005                 :     /// Return an error if trying to acquire a slot and it already exists
    2006                 :     MustNotExist,
    2007                 : }
    2008                 : 
    2009             664 : fn tenant_map_acquire_slot(
    2010             664 :     tenant_shard_id: &TenantShardId,
    2011             664 :     mode: TenantSlotAcquireMode,
    2012             664 : ) -> Result<SlotGuard, TenantSlotError> {
    2013             664 :     tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode)
    2014             664 : }
    2015                 : 
    2016             748 : fn tenant_map_acquire_slot_impl(
    2017             748 :     tenant_shard_id: &TenantShardId,
    2018             748 :     tenants: &std::sync::RwLock<TenantsMap>,
    2019             748 :     mode: TenantSlotAcquireMode,
    2020             748 : ) -> Result<SlotGuard, TenantSlotError> {
    2021             748 :     use TenantSlotAcquireMode::*;
    2022             748 :     METRICS.tenant_slot_writes.inc();
    2023             748 : 
    2024             748 :     let mut locked = tenants.write().unwrap();
    2025             748 :     let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard = %tenant_shard_id.shard_slug());
    2026             748 :     let _guard = span.enter();
    2027                 : 
    2028             748 :     let m = match &mut *locked {
    2029 UBC           0 :         TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
    2030               0 :         TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
    2031 CBC         748 :         TenantsMap::Open(m) => m,
    2032             748 :     };
    2033             748 : 
    2034             748 :     use std::collections::btree_map::Entry;
    2035             748 : 
    2036             748 :     let entry = m.entry(*tenant_shard_id);
    2037             748 : 
    2038             748 :     match entry {
    2039             521 :         Entry::Vacant(v) => match mode {
    2040                 :             MustExist => {
    2041              14 :                 tracing::debug!("Vacant && MustExist: return NotFound");
    2042              14 :                 Err(TenantSlotError::NotFound(*tenant_shard_id))
    2043                 :             }
    2044                 :             _ => {
    2045             507 :                 let (completion, barrier) = utils::completion::channel();
    2046             507 :                 v.insert(TenantSlot::InProgress(barrier));
    2047             507 :                 tracing::debug!("Vacant, inserted InProgress");
    2048             507 :                 Ok(SlotGuard::new(*tenant_shard_id, None, completion))
    2049                 :             }
    2050                 :         },
    2051             227 :         Entry::Occupied(mut o) => {
    2052             227 :             // Apply mode-driven checks
    2053             227 :             match (o.get(), mode) {
    2054                 :                 (TenantSlot::InProgress(_), _) => {
    2055 UBC           0 :                     tracing::debug!("Occupied, failing for InProgress");
    2056               0 :                     Err(TenantSlotError::InProgress)
    2057                 :                 }
    2058 CBC          14 :                 (slot, MustNotExist) => match slot {
    2059              14 :                     TenantSlot::Attached(tenant) => {
    2060              14 :                         tracing::debug!("Attached && MustNotExist, return AlreadyExists");
    2061              14 :                         Err(TenantSlotError::AlreadyExists(
    2062              14 :                             *tenant_shard_id,
    2063              14 :                             tenant.current_state(),
    2064              14 :                         ))
    2065                 :                     }
    2066                 :                     _ => {
    2067                 :                         // FIXME: the AlreadyExists error assumes that we have a Tenant
    2068                 :                         // to get the state from
    2069 UBC           0 :                         tracing::debug!("Occupied & MustNotExist, return AlreadyExists");
    2070               0 :                         Err(TenantSlotError::AlreadyExists(
    2071               0 :                             *tenant_shard_id,
    2072               0 :                             TenantState::Broken {
    2073               0 :                                 reason: "Present but not attached".to_string(),
    2074               0 :                                 backtrace: "".to_string(),
    2075               0 :                             },
    2076               0 :                         ))
    2077                 :                     }
    2078                 :                 },
    2079                 :                 _ => {
    2080                 :                     // Happy case: the slot was not in any state that violated our mode
    2081 CBC         213 :                     let (completion, barrier) = utils::completion::channel();
    2082             213 :                     let old_value = o.insert(TenantSlot::InProgress(barrier));
    2083             213 :                     tracing::debug!("Occupied, replaced with InProgress");
    2084             213 :                     Ok(SlotGuard::new(
    2085             213 :                         *tenant_shard_id,
    2086             213 :                         Some(old_value),
    2087             213 :                         completion,
    2088             213 :                     ))
    2089                 :                 }
    2090                 :             }
    2091                 :         }
    2092                 :     }
    2093             748 : }
    2094                 : 
    2095                 : /// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
    2096                 : /// Allows to remove other tenant resources manually, via `tenant_cleanup`.
    2097                 : /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
    2098                 : /// operation would be needed to remove it.
    2099              84 : async fn remove_tenant_from_memory<V, F>(
    2100              84 :     tenants: &std::sync::RwLock<TenantsMap>,
    2101              84 :     tenant_shard_id: TenantShardId,
    2102              84 :     tenant_cleanup: F,
    2103              84 : ) -> Result<V, TenantStateError>
    2104              84 : where
    2105              84 :     F: std::future::Future<Output = anyhow::Result<V>>,
    2106              84 : {
    2107                 :     use utils::completion;
    2108                 : 
    2109              71 :     let mut slot_guard =
    2110              84 :         tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
    2111                 : 
    2112                 :     // allow pageserver shutdown to await for our completion
    2113              71 :     let (_guard, progress) = completion::channel();
    2114                 : 
    2115                 :     // The SlotGuard allows us to manipulate the Tenant object without fear of some
    2116                 :     // concurrent API request doing something else for the same tenant ID.
    2117              71 :     let attached_tenant = match slot_guard.get_old_value() {
    2118              66 :         Some(TenantSlot::Attached(tenant)) => {
    2119              66 :             // whenever we remove a tenant from memory, we don't want to flush and wait for upload
    2120              66 :             let freeze_and_flush = false;
    2121              66 : 
    2122              66 :             // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
    2123              66 :             // that we can continue safely to cleanup.
    2124             146 :             match tenant.shutdown(progress, freeze_and_flush).await {
    2125              66 :                 Ok(()) => {}
    2126 UBC           0 :                 Err(_other) => {
    2127               0 :                     // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
    2128               0 :                     // wait for it but return an error right away because these are distinct requests.
    2129               0 :                     slot_guard.revert();
    2130               0 :                     return Err(TenantStateError::IsStopping(tenant_shard_id.tenant_id));
    2131                 :                 }
    2132                 :             }
    2133 CBC          66 :             Some(tenant)
    2134                 :         }
    2135               5 :         Some(TenantSlot::Secondary(secondary_state)) => {
    2136               5 :             tracing::info!("Shutting down in secondary mode");
    2137               5 :             secondary_state.shutdown().await;
    2138               5 :             None
    2139                 :         }
    2140                 :         Some(TenantSlot::InProgress(_)) => {
    2141                 :             // Acquiring a slot guarantees its old value was not InProgress
    2142 UBC           0 :             unreachable!();
    2143                 :         }
    2144               0 :         None => None,
    2145                 :     };
    2146                 : 
    2147 CBC          71 :     match tenant_cleanup
    2148             199 :         .await
    2149              71 :         .with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
    2150                 :     {
    2151              71 :         Ok(hook_value) => {
    2152              71 :             // Success: drop the old TenantSlot::Attached.
    2153              71 :             slot_guard
    2154              71 :                 .drop_old_value()
    2155              71 :                 .expect("We just called shutdown");
    2156              71 : 
    2157              71 :             Ok(hook_value)
    2158                 :         }
    2159 UBC           0 :         Err(e) => {
    2160                 :             // If we had a Tenant, set it to Broken and put it back in the TenantsMap
    2161               0 :             if let Some(attached_tenant) = attached_tenant {
    2162               0 :                 attached_tenant.set_broken(e.to_string()).await;
    2163               0 :             }
    2164                 :             // Leave the broken tenant in the map
    2165               0 :             slot_guard.revert();
    2166               0 : 
    2167               0 :             Err(TenantStateError::Other(e))
    2168                 :         }
    2169                 :     }
    2170 CBC          84 : }
    2171                 : 
    2172                 : use {
    2173                 :     crate::repository::GcResult, pageserver_api::models::TimelineGcRequest,
    2174                 :     utils::http::error::ApiError,
    2175                 : };
    2176                 : 
    2177             322 : pub(crate) async fn immediate_gc(
    2178             322 :     tenant_shard_id: TenantShardId,
    2179             322 :     timeline_id: TimelineId,
    2180             322 :     gc_req: TimelineGcRequest,
    2181             322 :     cancel: CancellationToken,
    2182             322 :     ctx: &RequestContext,
    2183             322 : ) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
    2184             322 :     let guard = TENANTS.read().unwrap();
    2185                 : 
    2186             322 :     let tenant = guard
    2187             322 :         .get(&tenant_shard_id)
    2188             322 :         .map(Arc::clone)
    2189             322 :         .with_context(|| format!("tenant {tenant_shard_id}"))
    2190             322 :         .map_err(|e| ApiError::NotFound(e.into()))?;
    2191                 : 
    2192             321 :     let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
    2193             321 :     // Use tenant's pitr setting
    2194             321 :     let pitr = tenant.get_pitr_interval();
    2195             321 : 
    2196             321 :     // Run in task_mgr to avoid race with tenant_detach operation
    2197             321 :     let ctx = ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
    2198             321 :     let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
    2199             321 :     // TODO: spawning is redundant now, need to hold the gate
    2200             321 :     task_mgr::spawn(
    2201             321 :         &tokio::runtime::Handle::current(),
    2202             321 :         TaskKind::GarbageCollector,
    2203             321 :         Some(tenant_shard_id),
    2204             321 :         Some(timeline_id),
    2205             321 :         &format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"),
    2206             321 :         false,
    2207             321 :         async move {
    2208 UBC           0 :             fail::fail_point!("immediate_gc_task_pre");
    2209                 : 
    2210                 :             #[allow(unused_mut)]
    2211 CBC         321 :             let mut result = tenant
    2212             321 :                 .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
    2213             321 :                 .instrument(info_span!("manual_gc", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))
    2214             274 :                 .await;
    2215                 :                 // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
    2216                 :                 // better once the types support it.
    2217                 : 
    2218                 :             #[cfg(feature = "testing")]
    2219                 :             {
    2220             321 :                 if let Ok(result) = result.as_mut() {
    2221                 :                     // why not futures unordered? it seems it needs very much the same task structure
    2222                 :                     // but would only run on single task.
    2223             320 :                     let mut js = tokio::task::JoinSet::new();
    2224             702 :                     for layer in std::mem::take(&mut result.doomed_layers) {
    2225             702 :                         js.spawn(layer.wait_drop());
    2226             702 :                     }
    2227             320 :                     tracing::info!(total = js.len(), "starting to wait for the gc'd layers to be dropped");
    2228            1022 :                     while let Some(res) = js.join_next().await {
    2229             702 :                         res.expect("wait_drop should not panic");
    2230             702 :                     }
    2231               1 :                 }
    2232                 : 
    2233             321 :                 let timeline = tenant.get_timeline(timeline_id, false).ok();
    2234             321 :                 let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref());
    2235                 : 
    2236             321 :                 if let Some(rtc) = rtc {
    2237                 :                     // layer drops schedule actions on remote timeline client to actually do the
    2238                 :                     // deletions; don't care just exit fast about the shutdown error
    2239             320 :                     drop(rtc.wait_completion().await);
    2240               1 :                 }
    2241                 :             }
    2242                 : 
    2243             321 :             match task_done.send(result) {
    2244             321 :                 Ok(_) => (),
    2245 UBC           0 :                 Err(result) => error!("failed to send gc result: {result:?}"),
    2246                 :             }
    2247 CBC         321 :             Ok(())
    2248             321 :         }
    2249             321 :     );
    2250             321 : 
    2251             321 :     // drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
    2252             321 :     drop(guard);
    2253             321 : 
    2254             321 :     Ok(wait_task_done)
    2255             322 : }
    2256                 : 
    2257                 : #[cfg(test)]
    2258                 : mod tests {
    2259                 :     use pageserver_api::shard::TenantShardId;
    2260                 :     use std::collections::BTreeMap;
    2261                 :     use std::sync::Arc;
    2262                 :     use tracing::{info_span, Instrument};
    2263                 : 
    2264                 :     use crate::tenant::mgr::TenantSlot;
    2265                 : 
    2266                 :     use super::{super::harness::TenantHarness, TenantsMap};
    2267                 : 
    2268               1 :     #[tokio::test(start_paused = true)]
    2269               1 :     async fn shutdown_awaits_in_progress_tenant() {
    2270                 :         // Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully
    2271                 :         // wait for it to complete before proceeding.
    2272                 : 
    2273               1 :         let (t, _ctx) = TenantHarness::create("shutdown_awaits_in_progress_tenant")
    2274               1 :             .unwrap()
    2275               1 :             .load()
    2276               1 :             .await;
    2277                 : 
    2278                 :         // harness loads it to active, which is forced and nothing is running on the tenant
    2279                 : 
    2280               1 :         let id = TenantShardId::unsharded(t.tenant_id());
    2281                 : 
    2282                 :         // tenant harness configures the logging and we cannot escape it
    2283               1 :         let _e = info_span!("testing", tenant_id = %id).entered();
    2284               1 : 
    2285               1 :         let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
    2286               1 :         let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
    2287               1 : 
    2288               1 :         // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
    2289               1 :         // permit it to proceed: that will stick the tenant in InProgress
    2290               1 : 
    2291               1 :         let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
    2292               1 :         let (until_cleanup_started, cleanup_started) = utils::completion::channel();
    2293               1 :         let mut remove_tenant_from_memory_task = {
    2294               1 :             let jh = tokio::spawn({
    2295               1 :                 let tenants = tenants.clone();
    2296               1 :                 async move {
    2297               1 :                     let cleanup = async move {
    2298               1 :                         drop(until_cleanup_started);
    2299               1 :                         can_complete_cleanup.wait().await;
    2300               1 :                         anyhow::Ok(())
    2301               1 :                     };
    2302               1 :                     super::remove_tenant_from_memory(&tenants, id, cleanup).await
    2303               1 :                 }
    2304               1 :                 .instrument(info_span!("foobar", tenant_id = %id))
    2305                 :             });
    2306                 : 
    2307                 :             // now the long cleanup should be in place, with the stopping state
    2308               1 :             cleanup_started.wait().await;
    2309               1 :             jh
    2310                 :         };
    2311                 : 
    2312               1 :         let mut shutdown_task = {
    2313               1 :             let (until_shutdown_started, shutdown_started) = utils::completion::channel();
    2314               1 : 
    2315               1 :             let shutdown_task = tokio::spawn(async move {
    2316               1 :                 drop(until_shutdown_started);
    2317               2 :                 super::shutdown_all_tenants0(&tenants).await;
    2318               1 :             });
    2319               1 : 
    2320               1 :             shutdown_started.wait().await;
    2321               1 :             shutdown_task
    2322               1 :         };
    2323               1 : 
    2324               1 :         let long_time = std::time::Duration::from_secs(15);
    2325               2 :         tokio::select! {
    2326               2 :             _ = &mut shutdown_task => unreachable!("shutdown should block on remove_tenant_from_memory completing"),
    2327               2 :             _ = &mut remove_tenant_from_memory_task => unreachable!("remove_tenant_from_memory_task should not complete until explicitly unblocked"),
    2328               2 :             _ = tokio::time::sleep(long_time) => {},
    2329               2 :         }
    2330                 : 
    2331               1 :         drop(until_cleanup_completed);
    2332               1 : 
    2333               1 :         // Now that we allow it to proceed, shutdown should complete immediately
    2334               1 :         remove_tenant_from_memory_task.await.unwrap().unwrap();
    2335               1 :         shutdown_task.await.unwrap();
    2336                 :     }
    2337                 : }
        

Generated by: LCOV version 2.1-beta