LCOV - code coverage report
Current view: top level - pageserver/src/tenant - mgr.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 81.0 % 1132 917
Test Date: 2024-02-07 07:37:29 Functions: 53.4 % 221 118

            Line data    Source code
       1              : //! This module acts as a switchboard to access different repositories managed by this
       2              : //! page server.
       3              : 
       4              : use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
       5              : use pageserver_api::key::Key;
       6              : use pageserver_api::models::ShardParameters;
       7              : use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId};
       8              : use rand::{distributions::Alphanumeric, Rng};
       9              : use std::borrow::Cow;
      10              : use std::cmp::Ordering;
      11              : use std::collections::{BTreeMap, HashMap};
      12              : use std::ops::Deref;
      13              : use std::sync::Arc;
      14              : use std::time::{Duration, Instant};
      15              : use tokio::fs;
      16              : use utils::timeout::{timeout_cancellable, TimeoutCancellableError};
      17              : 
      18              : use anyhow::Context;
      19              : use once_cell::sync::Lazy;
      20              : use tokio::task::JoinSet;
      21              : use tokio_util::sync::CancellationToken;
      22              : use tracing::*;
      23              : 
      24              : use remote_storage::GenericRemoteStorage;
      25              : use utils::crashsafe;
      26              : 
      27              : use crate::config::PageServerConf;
      28              : use crate::context::{DownloadBehavior, RequestContext};
      29              : use crate::control_plane_client::{
      30              :     ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError,
      31              : };
      32              : use crate::deletion_queue::DeletionQueueClient;
      33              : use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
      34              : use crate::task_mgr::{self, TaskKind};
      35              : use crate::tenant::config::{
      36              :     AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, SecondaryLocationConfig,
      37              :     TenantConfOpt,
      38              : };
      39              : use crate::tenant::delete::DeleteTenantFlow;
      40              : use crate::tenant::span::debug_assert_current_span_has_tenant_id;
      41              : use crate::tenant::{AttachedTenantConf, SpawnMode, Tenant, TenantState};
      42              : use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
      43              : 
      44              : use utils::crashsafe::path_with_suffix_extension;
      45              : use utils::fs_ext::PathExt;
      46              : use utils::generation::Generation;
      47              : use utils::id::{TenantId, TimelineId};
      48              : 
      49              : use super::delete::DeleteTenantError;
      50              : use super::secondary::SecondaryTenant;
      51              : use super::TenantSharedResources;
      52              : 
      53              : /// For a tenant that appears in TenantsMap, it may either be
      54              : /// - `Attached`: has a full Tenant object, is elegible to service
      55              : ///    reads and ingest WAL.
      56              : /// - `Secondary`: is only keeping a local cache warm.
      57              : ///
      58              : /// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
      59              : /// that way we avoid having to carefully switch a tenant's ingestion etc on and off during
      60              : /// its lifetime, and we can preserve some important safety invariants like `Tenant` always
      61              : /// having a properly acquired generation (Secondary doesn't need a generation)
      62            4 : #[derive(Clone)]
      63              : pub(crate) enum TenantSlot {
      64              :     Attached(Arc<Tenant>),
      65              :     Secondary(Arc<SecondaryTenant>),
      66              :     /// In this state, other administrative operations acting on the TenantId should
      67              :     /// block, or return a retry indicator equivalent to HTTP 503.
      68              :     InProgress(utils::completion::Barrier),
      69              : }
      70              : 
      71              : impl std::fmt::Debug for TenantSlot {
      72            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      73            0 :         match self {
      74            0 :             Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()),
      75            0 :             Self::Secondary(_) => write!(f, "Secondary"),
      76            0 :             Self::InProgress(_) => write!(f, "InProgress"),
      77              :         }
      78            0 :     }
      79              : }
      80              : 
      81              : impl TenantSlot {
      82              :     /// Return the `Tenant` in this slot if attached, else None
      83         1236 :     fn get_attached(&self) -> Option<&Arc<Tenant>> {
      84         1236 :         match self {
      85         1212 :             Self::Attached(t) => Some(t),
      86           11 :             Self::Secondary(_) => None,
      87           13 :             Self::InProgress(_) => None,
      88              :         }
      89         1236 :     }
      90              : }
      91              : 
      92              : /// The tenants known to the pageserver.
      93              : /// The enum variants are used to distinguish the different states that the pageserver can be in.
      94              : pub(crate) enum TenantsMap {
      95              :     /// [`init_tenant_mgr`] is not done yet.
      96              :     Initializing,
      97              :     /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
      98              :     /// New tenants can be added using [`tenant_map_acquire_slot`].
      99              :     Open(BTreeMap<TenantShardId, TenantSlot>),
     100              :     /// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
     101              :     /// Existing tenants are still accessible, but no new tenants can be created.
     102              :     ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
     103              : }
     104              : 
     105              : pub(crate) enum TenantsMapRemoveResult {
     106              :     Occupied(TenantSlot),
     107              :     Vacant,
     108              :     InProgress(utils::completion::Barrier),
     109              : }
     110              : 
     111              : /// When resolving a TenantId to a shard, we may be looking for the 0th
     112              : /// shard, or we might be looking for whichever shard holds a particular page.
     113              : pub(crate) enum ShardSelector {
     114              :     /// Only return the 0th shard, if it is present.  If a non-0th shard is present,
     115              :     /// ignore it.
     116              :     Zero,
     117              :     /// Pick the first shard we find for the TenantId
     118              :     First,
     119              :     /// Pick the shard that holds this key
     120              :     Page(Key),
     121              : }
     122              : 
     123              : impl TenantsMap {
     124              :     /// Convenience function for typical usage, where we want to get a `Tenant` object, for
     125              :     /// working with attached tenants.  If the TenantId is in the map but in Secondary state,
     126              :     /// None is returned.
     127          251 :     pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc<Tenant>> {
     128          251 :         match self {
     129            0 :             TenantsMap::Initializing => None,
     130          251 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
     131          251 :                 m.get(tenant_shard_id).and_then(|slot| slot.get_attached())
     132              :             }
     133              :         }
     134          251 :     }
     135              : 
     136              :     /// A page service client sends a TenantId, and to look up the correct Tenant we must
     137              :     /// resolve this to a fully qualified TenantShardId.
     138        20696 :     fn resolve_attached_shard(
     139        20696 :         &self,
     140        20696 :         tenant_id: &TenantId,
     141        20696 :         selector: ShardSelector,
     142        20696 :     ) -> Option<TenantShardId> {
     143        20696 :         let mut want_shard = None;
     144        20696 :         match self {
     145            0 :             TenantsMap::Initializing => None,
     146        20696 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
     147        20696 :                 for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
     148              :                     // Ignore all slots that don't contain an attached tenant
     149        20695 :                     let tenant = match &slot.1 {
     150        20684 :                         TenantSlot::Attached(t) => t,
     151           11 :                         _ => continue,
     152              :                     };
     153              : 
     154         2834 :                     match selector {
     155        10025 :                         ShardSelector::First => return Some(*slot.0),
     156         2834 :                         ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
     157         2834 :                             return Some(*slot.0)
     158              :                         }
     159         7825 :                         ShardSelector::Page(key) => {
     160         7825 :                             // First slot we see for this tenant, calculate the expected shard number
     161         7825 :                             // for the key: we will use this for checking if this and subsequent
     162         7825 :                             // slots contain the key, rather than recalculating the hash each time.
     163         7825 :                             if want_shard.is_none() {
     164         7825 :                                 want_shard = Some(tenant.shard_identity.get_shard_number(&key));
     165         7825 :                             }
     166              : 
     167         7825 :                             if Some(tenant.shard_identity.number) == want_shard {
     168         7825 :                                 return Some(*slot.0);
     169            0 :                             }
     170              :                         }
     171            0 :                         _ => continue,
     172              :                     }
     173              :                 }
     174              : 
     175              :                 // Fall through: we didn't find an acceptable shard
     176           12 :                 None
     177              :             }
     178              :         }
     179        20696 :     }
     180              : 
     181              :     /// Only for use from DeleteTenantFlow.  This method directly removes a TenantSlot from the map.
     182              :     ///
     183              :     /// The normal way to remove a tenant is using a SlotGuard, which will gracefully remove the guarded
     184              :     /// slot if the enclosed tenant is shutdown.
     185           71 :     pub(crate) fn remove(&mut self, tenant_shard_id: TenantShardId) -> TenantsMapRemoveResult {
     186           71 :         use std::collections::btree_map::Entry;
     187           71 :         match self {
     188            0 :             TenantsMap::Initializing => TenantsMapRemoveResult::Vacant,
     189           71 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => match m.entry(tenant_shard_id) {
     190           71 :                 Entry::Occupied(entry) => match entry.get() {
     191            1 :                     TenantSlot::InProgress(barrier) => {
     192            1 :                         TenantsMapRemoveResult::InProgress(barrier.clone())
     193              :                     }
     194           70 :                     _ => TenantsMapRemoveResult::Occupied(entry.remove()),
     195              :                 },
     196            0 :                 Entry::Vacant(_entry) => TenantsMapRemoveResult::Vacant,
     197              :             },
     198              :         }
     199           71 :     }
     200              : 
     201           71 :     pub(crate) fn len(&self) -> usize {
     202           71 :         match self {
     203            0 :             TenantsMap::Initializing => 0,
     204           71 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.len(),
     205              :         }
     206           71 :     }
     207              : }
     208              : 
     209              : /// This is "safe" in that that it won't leave behind a partially deleted directory
     210              : /// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting
     211              : /// the contents.
     212              : ///
     213              : /// This is pageserver-specific, as it relies on future processes after a crash to check
     214              : /// for TEMP_FILE_SUFFIX when loading things.
     215           12 : async fn safe_remove_tenant_dir_all(path: impl AsRef<Utf8Path>) -> std::io::Result<()> {
     216           35 :     let tmp_path = safe_rename_tenant_dir(path).await?;
     217           12 :     fs::remove_dir_all(tmp_path).await
     218           12 : }
     219              : 
     220           90 : async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<Utf8PathBuf> {
     221           90 :     let parent = path
     222           90 :         .as_ref()
     223           90 :         .parent()
     224           90 :         // It is invalid to call this function with a relative path.  Tenant directories
     225           90 :         // should always have a parent.
     226           90 :         .ok_or(std::io::Error::new(
     227           90 :             std::io::ErrorKind::InvalidInput,
     228           90 :             "Path must be absolute",
     229           90 :         ))?;
     230           90 :     let rand_suffix = rand::thread_rng()
     231           90 :         .sample_iter(&Alphanumeric)
     232           90 :         .take(8)
     233           90 :         .map(char::from)
     234           90 :         .collect::<String>()
     235           90 :         + TEMP_FILE_SUFFIX;
     236           90 :     let tmp_path = path_with_suffix_extension(&path, &rand_suffix);
     237           90 :     fs::rename(path.as_ref(), &tmp_path).await?;
     238           90 :     fs::File::open(parent).await?.sync_all().await?;
     239           90 :     Ok(tmp_path)
     240           90 : }
     241              : 
     242              : static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
     243          606 :     Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
     244              : 
     245              : /// The TenantManager is responsible for storing and mutating the collection of all tenants
     246              : /// that this pageserver process has state for.  Every Tenant and SecondaryTenant instance
     247              : /// lives inside the TenantManager.
     248              : ///
     249              : /// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
     250              : /// the same tenant twice concurrently, or trying to configure the same tenant into secondary
     251              : /// and attached modes concurrently.
     252              : pub struct TenantManager {
     253              :     conf: &'static PageServerConf,
     254              :     // TODO: currently this is a &'static pointing to TENANTs.  When we finish refactoring
     255              :     // out of that static variable, the TenantManager can own this.
     256              :     // See https://github.com/neondatabase/neon/issues/5796
     257              :     tenants: &'static std::sync::RwLock<TenantsMap>,
     258              :     resources: TenantSharedResources,
     259              : }
     260              : 
     261            1 : fn emergency_generations(
     262            1 :     tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
     263            1 : ) -> HashMap<TenantShardId, Generation> {
     264            1 :     tenant_confs
     265            1 :         .iter()
     266            1 :         .filter_map(|(tid, lc)| {
     267            1 :             let lc = match lc {
     268            1 :                 Ok(lc) => lc,
     269            0 :                 Err(_) => return None,
     270              :             };
     271            1 :             let gen = match &lc.mode {
     272            1 :                 LocationMode::Attached(alc) => Some(alc.generation),
     273            0 :                 LocationMode::Secondary(_) => None,
     274              :             };
     275              : 
     276            1 :             gen.map(|g| (*tid, g))
     277            1 :         })
     278            1 :         .collect()
     279            1 : }
     280              : 
     281          604 : async fn init_load_generations(
     282          604 :     conf: &'static PageServerConf,
     283          604 :     tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
     284          604 :     resources: &TenantSharedResources,
     285          604 :     cancel: &CancellationToken,
     286          604 : ) -> anyhow::Result<Option<HashMap<TenantShardId, Generation>>> {
     287          604 :     let generations = if conf.control_plane_emergency_mode {
     288            1 :         error!(
     289            1 :             "Emergency mode!  Tenants will be attached unsafely using their last known generation"
     290            1 :         );
     291            1 :         emergency_generations(tenant_confs)
     292          603 :     } else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
     293          602 :         info!("Calling control plane API to re-attach tenants");
     294              :         // If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
     295         1826 :         match client.re_attach().await {
     296          602 :             Ok(tenants) => tenants,
     297              :             Err(RetryForeverError::ShuttingDown) => {
     298            0 :                 anyhow::bail!("Shut down while waiting for control plane re-attach response")
     299              :             }
     300              :         }
     301              :     } else {
     302            1 :         info!("Control plane API not configured, tenant generations are disabled");
     303            1 :         return Ok(None);
     304              :     };
     305              : 
     306              :     // The deletion queue needs to know about the startup attachment state to decide which (if any) stored
     307              :     // deletion list entries may still be valid.  We provide that by pushing a recovery operation into
     308              :     // the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
     309              :     // are processed, even though we don't block on recovery completing here.
     310              :     //
     311              :     // Must only do this if remote storage is enabled, otherwise deletion queue
     312              :     // is not running and channel push will fail.
     313          603 :     if resources.remote_storage.is_some() {
     314          603 :         resources
     315          603 :             .deletion_queue_client
     316          603 :             .recover(generations.clone())?;
     317            0 :     }
     318              : 
     319          603 :     Ok(Some(generations))
     320          604 : }
     321              : 
     322              : /// Given a directory discovered in the pageserver's tenants/ directory, attempt
     323              : /// to load a tenant config from it.
     324              : ///
     325              : /// If file is missing, return Ok(None)
     326          229 : fn load_tenant_config(
     327          229 :     conf: &'static PageServerConf,
     328          229 :     dentry: Utf8DirEntry,
     329          229 : ) -> anyhow::Result<Option<(TenantShardId, anyhow::Result<LocationConf>)>> {
     330          229 :     let tenant_dir_path = dentry.path().to_path_buf();
     331          229 :     if crate::is_temporary(&tenant_dir_path) {
     332            0 :         info!("Found temporary tenant directory, removing: {tenant_dir_path}");
     333              :         // No need to use safe_remove_tenant_dir_all because this is already
     334              :         // a temporary path
     335            0 :         if let Err(e) = std::fs::remove_dir_all(&tenant_dir_path) {
     336            0 :             error!(
     337            0 :                 "Failed to remove temporary directory '{}': {:?}",
     338            0 :                 tenant_dir_path, e
     339            0 :             );
     340            0 :         }
     341            0 :         return Ok(None);
     342          229 :     }
     343              : 
     344              :     // This case happens if we crash during attachment before writing a config into the dir
     345          229 :     let is_empty = tenant_dir_path
     346          229 :         .is_empty_dir()
     347          229 :         .with_context(|| format!("Failed to check whether {tenant_dir_path:?} is an empty dir"))?;
     348          229 :     if is_empty {
     349            2 :         info!("removing empty tenant directory {tenant_dir_path:?}");
     350            2 :         if let Err(e) = std::fs::remove_dir(&tenant_dir_path) {
     351            0 :             error!(
     352            0 :                 "Failed to remove empty tenant directory '{}': {e:#}",
     353            0 :                 tenant_dir_path
     354            0 :             )
     355            2 :         }
     356            2 :         return Ok(None);
     357          227 :     }
     358          227 : 
     359          227 :     let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
     360          227 :     if tenant_ignore_mark_file.exists() {
     361            1 :         info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
     362            1 :         return Ok(None);
     363          226 :     }
     364              : 
     365          226 :     let tenant_shard_id = match tenant_dir_path
     366          226 :         .file_name()
     367          226 :         .unwrap_or_default()
     368          226 :         .parse::<TenantShardId>()
     369              :     {
     370          226 :         Ok(id) => id,
     371              :         Err(_) => {
     372            0 :             warn!("Invalid tenant path (garbage in our repo directory?): {tenant_dir_path}",);
     373            0 :             return Ok(None);
     374              :         }
     375              :     };
     376              : 
     377          226 :     Ok(Some((
     378          226 :         tenant_shard_id,
     379          226 :         Tenant::load_tenant_config(conf, &tenant_shard_id),
     380          226 :     )))
     381          229 : }
     382              : 
     383              : /// Initial stage of load: walk the local tenants directory, clean up any temp files,
     384              : /// and load configurations for the tenants we found.
     385              : ///
     386              : /// Do this in parallel, because we expect 10k+ tenants, so serial execution can take
     387              : /// seconds even on reasonably fast drives.
     388          604 : async fn init_load_tenant_configs(
     389          604 :     conf: &'static PageServerConf,
     390          604 : ) -> anyhow::Result<HashMap<TenantShardId, anyhow::Result<LocationConf>>> {
     391          604 :     let tenants_dir = conf.tenants_path();
     392              : 
     393          604 :     let dentries = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<Utf8DirEntry>> {
     394          604 :         let dir_entries = tenants_dir
     395          604 :             .read_dir_utf8()
     396          604 :             .with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
     397              : 
     398          604 :         Ok(dir_entries.collect::<Result<Vec<_>, std::io::Error>>()?)
     399          604 :     })
     400          602 :     .await??;
     401              : 
     402          604 :     let mut configs = HashMap::new();
     403          604 : 
     404          604 :     let mut join_set = JoinSet::new();
     405          833 :     for dentry in dentries {
     406          229 :         join_set.spawn_blocking(move || load_tenant_config(conf, dentry));
     407          229 :     }
     408              : 
     409          833 :     while let Some(r) = join_set.join_next().await {
     410          229 :         if let Some((tenant_id, tenant_config)) = r?? {
     411          226 :             configs.insert(tenant_id, tenant_config);
     412          226 :         }
     413              :     }
     414              : 
     415          604 :     Ok(configs)
     416          604 : }
     417              : 
     418              : /// Initialize repositories with locally available timelines.
     419              : /// Timelines that are only partially available locally (remote storage has more data than this pageserver)
     420              : /// are scheduled for download and added to the tenant once download is completed.
     421          604 : #[instrument(skip_all)]
     422              : pub async fn init_tenant_mgr(
     423              :     conf: &'static PageServerConf,
     424              :     resources: TenantSharedResources,
     425              :     init_order: InitializationOrder,
     426              :     cancel: CancellationToken,
     427              : ) -> anyhow::Result<TenantManager> {
     428              :     let mut tenants = BTreeMap::new();
     429              : 
     430              :     let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
     431              : 
     432              :     // Scan local filesystem for attached tenants
     433              :     let tenant_configs = init_load_tenant_configs(conf).await?;
     434              : 
     435              :     // Determine which tenants are to be attached
     436              :     let tenant_generations =
     437              :         init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
     438              : 
     439          604 :     tracing::info!(
     440          604 :         "Attaching {} tenants at startup, warming up {} at a time",
     441          604 :         tenant_configs.len(),
     442          604 :         conf.concurrent_tenant_warmup.initial_permits()
     443          604 :     );
     444              :     TENANT.startup_scheduled.inc_by(tenant_configs.len() as u64);
     445              : 
     446              :     // Construct `Tenant` objects and start them running
     447              :     for (tenant_shard_id, location_conf) in tenant_configs {
     448              :         let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
     449              : 
     450              :         let mut location_conf = match location_conf {
     451              :             Ok(l) => l,
     452              :             Err(e) => {
     453            0 :                 warn!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Marking tenant broken, failed to {e:#}");
     454              : 
     455              :                 tenants.insert(
     456              :                     tenant_shard_id,
     457              :                     TenantSlot::Attached(Tenant::create_broken_tenant(
     458              :                         conf,
     459              :                         tenant_shard_id,
     460              :                         format!("{}", e),
     461              :                     )),
     462              :                 );
     463              :                 continue;
     464              :             }
     465              :         };
     466              : 
     467              :         let generation = if let Some(generations) = &tenant_generations {
     468              :             // We have a generation map: treat it as the authority for whether
     469              :             // this tenant is really attached.
     470              :             if let Some(gen) = generations.get(&tenant_shard_id) {
     471              :                 if let LocationMode::Attached(attached) = &location_conf.mode {
     472              :                     if attached.generation > *gen {
     473            0 :                         tracing::error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
     474            0 :                             "Control plane gave decreasing generation ({gen:?}) in re-attach response for tenant that was attached in generation {:?}, demoting to secondary",
     475            0 :                             attached.generation
     476            0 :                         );
     477              : 
     478              :                         // We cannot safely attach this tenant given a bogus generation number, but let's avoid throwing away
     479              :                         // local disk content: demote to secondary rather than detaching.
     480              :                         tenants.insert(
     481              :                             tenant_shard_id,
     482              :                             TenantSlot::Secondary(SecondaryTenant::new(
     483              :                                 tenant_shard_id,
     484              :                                 location_conf.shard,
     485              :                                 location_conf.tenant_conf,
     486              :                                 &SecondaryLocationConfig { warm: false },
     487              :                             )),
     488              :                         );
     489              :                     }
     490              :                 }
     491              :                 *gen
     492              :             } else {
     493              :                 match &location_conf.mode {
     494              :                     LocationMode::Secondary(secondary_config) => {
     495              :                         // We do not require the control plane's permission for secondary mode
     496              :                         // tenants, because they do no remote writes and hence require no
     497              :                         // generation number
     498            7 :                         info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Loaded tenant in secondary mode");
     499              :                         tenants.insert(
     500              :                             tenant_shard_id,
     501              :                             TenantSlot::Secondary(SecondaryTenant::new(
     502              :                                 tenant_shard_id,
     503              :                                 location_conf.shard,
     504              :                                 location_conf.tenant_conf,
     505              :                                 secondary_config,
     506              :                             )),
     507              :                         );
     508              :                     }
     509              :                     LocationMode::Attached(_) => {
     510              :                         // TODO: augment re-attach API to enable the control plane to
     511              :                         // instruct us about secondary attachments.  That way, instead of throwing
     512              :                         // away local state, we can gracefully fall back to secondary here, if the control
     513              :                         // plane tells us so.
     514              :                         // (https://github.com/neondatabase/neon/issues/5377)
     515           12 :                         info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Detaching tenant, control plane omitted it in re-attach response");
     516              :                         if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await {
     517            0 :                             error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
     518            0 :                                 "Failed to remove detached tenant directory '{tenant_dir_path}': {e:?}",
     519            0 :                             );
     520              :                         }
     521              :                     }
     522              :                 };
     523              : 
     524              :                 continue;
     525              :             }
     526              :         } else {
     527              :             // Legacy mode: no generation information, any tenant present
     528              :             // on local disk may activate
     529            0 :             info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Starting tenant in legacy mode, no generation",);
     530              :             Generation::none()
     531              :         };
     532              : 
     533              :         // Presence of a generation number implies attachment: attach the tenant
     534              :         // if it wasn't already, and apply the generation number.
     535              :         location_conf.attach_in_generation(generation);
     536              :         Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
     537              : 
     538              :         let shard_identity = location_conf.shard;
     539              :         match tenant_spawn(
     540              :             conf,
     541              :             tenant_shard_id,
     542              :             &tenant_dir_path,
     543              :             resources.clone(),
     544              :             AttachedTenantConf::try_from(location_conf)?,
     545              :             shard_identity,
     546              :             Some(init_order.clone()),
     547              :             &TENANTS,
     548              :             SpawnMode::Normal,
     549              :             &ctx,
     550              :         ) {
     551              :             Ok(tenant) => {
     552              :                 tenants.insert(tenant_shard_id, TenantSlot::Attached(tenant));
     553              :             }
     554              :             Err(e) => {
     555            0 :                 error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to start tenant: {e:#}");
     556              :             }
     557              :         }
     558              :     }
     559              : 
     560          604 :     info!("Processed {} local tenants at startup", tenants.len());
     561              : 
     562              :     let mut tenants_map = TENANTS.write().unwrap();
     563              :     assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
     564              :     METRICS.tenant_slots.set(tenants.len() as u64);
     565              :     *tenants_map = TenantsMap::Open(tenants);
     566              : 
     567              :     Ok(TenantManager {
     568              :         conf,
     569              :         tenants: &TENANTS,
     570              :         resources,
     571              :     })
     572              : }
     573              : 
     574              : /// Wrapper for Tenant::spawn that checks invariants before running, and inserts
     575              : /// a broken tenant in the map if Tenant::spawn fails.
     576              : #[allow(clippy::too_many_arguments)]
     577          858 : pub(crate) fn tenant_spawn(
     578          858 :     conf: &'static PageServerConf,
     579          858 :     tenant_shard_id: TenantShardId,
     580          858 :     tenant_path: &Utf8Path,
     581          858 :     resources: TenantSharedResources,
     582          858 :     location_conf: AttachedTenantConf,
     583          858 :     shard_identity: ShardIdentity,
     584          858 :     init_order: Option<InitializationOrder>,
     585          858 :     tenants: &'static std::sync::RwLock<TenantsMap>,
     586          858 :     mode: SpawnMode,
     587          858 :     ctx: &RequestContext,
     588          858 : ) -> anyhow::Result<Arc<Tenant>> {
     589          858 :     anyhow::ensure!(
     590          858 :         tenant_path.is_dir(),
     591            0 :         "Cannot load tenant from path {tenant_path:?}, it either does not exist or not a directory"
     592              :     );
     593              :     anyhow::ensure!(
     594          858 :         !crate::is_temporary(tenant_path),
     595            0 :         "Cannot load tenant from temporary path {tenant_path:?}"
     596              :     );
     597              :     anyhow::ensure!(
     598          858 :         !tenant_path.is_empty_dir().with_context(|| {
     599            0 :             format!("Failed to check whether {tenant_path:?} is an empty dir")
     600          858 :         })?,
     601            0 :         "Cannot load tenant from empty directory {tenant_path:?}"
     602              :     );
     603              : 
     604          858 :     let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
     605          858 :     anyhow::ensure!(
     606          858 :         !conf.tenant_ignore_mark_file_path(&tenant_shard_id).exists(),
     607            0 :         "Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
     608              :     );
     609              : 
     610          858 :     let tenant = match Tenant::spawn(
     611          858 :         conf,
     612          858 :         tenant_shard_id,
     613          858 :         resources,
     614          858 :         location_conf,
     615          858 :         shard_identity,
     616          858 :         init_order,
     617          858 :         tenants,
     618          858 :         mode,
     619          858 :         ctx,
     620          858 :     ) {
     621          858 :         Ok(tenant) => tenant,
     622            0 :         Err(e) => {
     623            0 :             error!("Failed to spawn tenant {tenant_shard_id}, reason: {e:#}");
     624            0 :             Tenant::create_broken_tenant(conf, tenant_shard_id, format!("{e:#}"))
     625              :         }
     626              :     };
     627              : 
     628          858 :     Ok(tenant)
     629          858 : }
     630              : 
     631              : ///
     632              : /// Shut down all tenants. This runs as part of pageserver shutdown.
     633              : ///
     634              : /// NB: We leave the tenants in the map, so that they remain accessible through
     635              : /// the management API until we shut it down. If we removed the shut-down tenants
     636              : /// from the tenants map, the management API would return 404 for these tenants,
     637              : /// because TenantsMap::get() now returns `None`.
     638              : /// That could be easily misinterpreted by control plane, the consumer of the
     639              : /// management API. For example, it could attach the tenant on a different pageserver.
     640              : /// We would then be in split-brain once this pageserver restarts.
     641          172 : #[instrument(skip_all)]
     642              : pub(crate) async fn shutdown_all_tenants() {
     643              :     shutdown_all_tenants0(&TENANTS).await
     644              : }
     645              : 
     646          174 : async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
     647          174 :     use utils::completion;
     648          174 : 
     649          174 :     let mut join_set = JoinSet::new();
     650              : 
     651              :     // Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
     652          174 :     let (total_in_progress, total_attached) = {
     653          174 :         let mut m = tenants.write().unwrap();
     654          174 :         match &mut *m {
     655              :             TenantsMap::Initializing => {
     656            0 :                 *m = TenantsMap::ShuttingDown(BTreeMap::default());
     657            0 :                 info!("tenants map is empty");
     658            0 :                 return;
     659              :             }
     660          174 :             TenantsMap::Open(tenants) => {
     661          174 :                 let mut shutdown_state = BTreeMap::new();
     662          174 :                 let mut total_in_progress = 0;
     663          174 :                 let mut total_attached = 0;
     664              : 
     665          199 :                 for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
     666          199 :                     match v {
     667          183 :                         TenantSlot::Attached(t) => {
     668          183 :                             shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
     669          183 :                             join_set.spawn(
     670          183 :                                 async move {
     671          183 :                                     let freeze_and_flush = true;
     672              : 
     673          183 :                                     let res = {
     674          183 :                                         let (_guard, shutdown_progress) = completion::channel();
     675          405 :                                         t.shutdown(shutdown_progress, freeze_and_flush).await
     676              :                                     };
     677              : 
     678          183 :                                     if let Err(other_progress) = res {
     679              :                                         // join the another shutdown in progress
     680            0 :                                         other_progress.wait().await;
     681          183 :                                     }
     682              : 
     683              :                                     // we cannot afford per tenant logging here, because if s3 is degraded, we are
     684              :                                     // going to log too many lines
     685          183 :                                     debug!("tenant successfully stopped");
     686          183 :                                 }
     687          183 :                                 .instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())),
     688              :                             );
     689              : 
     690          183 :                             total_attached += 1;
     691              :                         }
     692            7 :                         TenantSlot::Secondary(state) => {
     693            7 :                             // We don't need to wait for this individually per-tenant: the
     694            7 :                             // downloader task will be waited on eventually, this cancel
     695            7 :                             // is just to encourage it to drop out if it is doing work
     696            7 :                             // for this tenant right now.
     697            7 :                             state.cancel.cancel();
     698            7 : 
     699            7 :                             shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state));
     700            7 :                         }
     701            9 :                         TenantSlot::InProgress(notify) => {
     702            9 :                             // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
     703            9 :                             // wait for their notifications to fire in this function.
     704            9 :                             join_set.spawn(async move {
     705            9 :                                 notify.wait().await;
     706            9 :                             });
     707            9 : 
     708            9 :                             total_in_progress += 1;
     709            9 :                         }
     710              :                     }
     711              :                 }
     712          174 :                 *m = TenantsMap::ShuttingDown(shutdown_state);
     713          174 :                 (total_in_progress, total_attached)
     714              :             }
     715              :             TenantsMap::ShuttingDown(_) => {
     716            0 :                 error!("already shutting down, this function isn't supposed to be called more than once");
     717            0 :                 return;
     718              :             }
     719              :         }
     720              :     };
     721              : 
     722          174 :     let started_at = std::time::Instant::now();
     723              : 
     724          174 :     info!(
     725          174 :         "Waiting for {} InProgress tenants and {} Attached tenants to shut down",
     726          174 :         total_in_progress, total_attached
     727          174 :     );
     728              : 
     729          174 :     let total = join_set.len();
     730          174 :     let mut panicked = 0;
     731          174 :     let mut buffering = true;
     732          174 :     const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
     733          174 :     let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
     734              : 
     735          373 :     while !join_set.is_empty() {
     736          199 :         tokio::select! {
     737          192 :             Some(joined) = join_set.join_next() => {
     738              :                 match joined {
     739              :                     Ok(()) => {},
     740              :                     Err(join_error) if join_error.is_cancelled() => {
     741              :                         unreachable!("we are not cancelling any of the tasks");
     742              :                     }
     743              :                     Err(join_error) if join_error.is_panic() => {
     744              :                         // cannot really do anything, as this panic is likely a bug
     745              :                         panicked += 1;
     746              :                     }
     747              :                     Err(join_error) => {
     748            0 :                         warn!("unknown kind of JoinError: {join_error}");
     749              :                     }
     750              :                 }
     751              :                 if !buffering {
     752              :                     // buffer so that every 500ms since the first update (or starting) we'll log
     753              :                     // how far away we are; this is because we will get SIGKILL'd at 10s, and we
     754              :                     // are not able to log *then*.
     755              :                     buffering = true;
     756              :                     buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
     757              :                 }
     758              :             },
     759              :             _ = &mut buffered, if buffering => {
     760              :                 buffering = false;
     761            7 :                 info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
     762              :             }
     763              :         }
     764              :     }
     765              : 
     766          174 :     if panicked > 0 {
     767            0 :         warn!(
     768            0 :             panicked,
     769            0 :             total, "observed panicks while shutting down tenants"
     770            0 :         );
     771          174 :     }
     772              : 
     773              :     // caller will log how long we took
     774          174 : }
     775              : 
     776            0 : #[derive(Debug, thiserror::Error)]
     777              : pub(crate) enum SetNewTenantConfigError {
     778              :     #[error(transparent)]
     779              :     GetTenant(#[from] GetTenantError),
     780              :     #[error(transparent)]
     781              :     Persist(anyhow::Error),
     782              :     #[error(transparent)]
     783              :     Other(anyhow::Error),
     784              : }
     785              : 
     786           30 : pub(crate) async fn set_new_tenant_config(
     787           30 :     conf: &'static PageServerConf,
     788           30 :     new_tenant_conf: TenantConfOpt,
     789           30 :     tenant_id: TenantId,
     790           30 : ) -> Result<(), SetNewTenantConfigError> {
     791           30 :     // Legacy API: does not support sharding
     792           30 :     let tenant_shard_id = TenantShardId::unsharded(tenant_id);
     793              : 
     794           30 :     info!("configuring tenant {tenant_id}");
     795           30 :     let tenant = get_tenant(tenant_shard_id, true)?;
     796              : 
     797           30 :     if tenant.tenant_shard_id().shard_count > ShardCount(0) {
     798              :         // Note that we use ShardParameters::default below.
     799            0 :         return Err(SetNewTenantConfigError::Other(anyhow::anyhow!(
     800            0 :             "This API may only be used on single-sharded tenants, use the /location_config API for sharded tenants"
     801            0 :         )));
     802           30 :     }
     803           30 : 
     804           30 :     // This is a legacy API that only operates on attached tenants: the preferred
     805           30 :     // API to use is the location_config/ endpoint, which lets the caller provide
     806           30 :     // the full LocationConf.
     807           30 :     let location_conf = LocationConf::attached_single(
     808           30 :         new_tenant_conf,
     809           30 :         tenant.generation,
     810           30 :         &ShardParameters::default(),
     811           30 :     );
     812           30 : 
     813           30 :     Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf)
     814           60 :         .await
     815           30 :         .map_err(SetNewTenantConfigError::Persist)?;
     816           30 :     tenant.set_new_tenant_config(new_tenant_conf);
     817           30 :     Ok(())
     818           30 : }
     819              : 
     820           18 : #[derive(thiserror::Error, Debug)]
     821              : pub(crate) enum UpsertLocationError {
     822              :     #[error("Bad config request: {0}")]
     823              :     BadRequest(anyhow::Error),
     824              : 
     825              :     #[error("Cannot change config in this state: {0}")]
     826              :     Unavailable(#[from] TenantMapError),
     827              : 
     828              :     #[error("Tenant is already being modified")]
     829              :     InProgress,
     830              : 
     831              :     #[error("Failed to flush: {0}")]
     832              :     Flush(anyhow::Error),
     833              : 
     834              :     #[error("Internal error: {0}")]
     835              :     Other(#[from] anyhow::Error),
     836              : }
     837              : 
     838              : impl TenantManager {
     839              :     /// Convenience function so that anyone with a TenantManager can get at the global configuration, without
     840              :     /// having to pass it around everywhere as a separate object.
     841         1237 :     pub(crate) fn get_conf(&self) -> &'static PageServerConf {
     842         1237 :         self.conf
     843         1237 :     }
     844              : 
     845              :     /// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or is not fitting to the query.
     846              :     /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
     847          969 :     pub(crate) fn get_attached_tenant_shard(
     848          969 :         &self,
     849          969 :         tenant_shard_id: TenantShardId,
     850          969 :         active_only: bool,
     851          969 :     ) -> Result<Arc<Tenant>, GetTenantError> {
     852          969 :         let locked = self.tenants.read().unwrap();
     853              : 
     854          969 :         let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
     855              : 
     856          967 :         match peek_slot {
     857          965 :             Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
     858              :                 TenantState::Broken {
     859            0 :                     reason,
     860            0 :                     backtrace: _,
     861            0 :                 } if active_only => Err(GetTenantError::Broken(reason)),
     862          865 :                 TenantState::Active => Ok(Arc::clone(tenant)),
     863              :                 _ => {
     864          100 :                     if active_only {
     865            0 :                         Err(GetTenantError::NotActive(tenant_shard_id))
     866              :                     } else {
     867          100 :                         Ok(Arc::clone(tenant))
     868              :                     }
     869              :                 }
     870              :             },
     871            2 :             Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
     872              :             None | Some(TenantSlot::Secondary(_)) => {
     873            2 :                 Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
     874              :             }
     875              :         }
     876          969 :     }
     877              : 
     878            6 :     pub(crate) fn get_secondary_tenant_shard(
     879            6 :         &self,
     880            6 :         tenant_shard_id: TenantShardId,
     881            6 :     ) -> Option<Arc<SecondaryTenant>> {
     882            6 :         let locked = self.tenants.read().unwrap();
     883            6 : 
     884            6 :         let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
     885            6 :             .ok()
     886            6 :             .flatten();
     887              : 
     888            6 :         match peek_slot {
     889            6 :             Some(TenantSlot::Secondary(s)) => Some(s.clone()),
     890            0 :             _ => None,
     891              :         }
     892            6 :     }
     893              : 
     894              :     /// Whether the `TenantManager` is responsible for the tenant shard
     895            1 :     pub(crate) fn manages_tenant_shard(&self, tenant_shard_id: TenantShardId) -> bool {
     896            1 :         let locked = self.tenants.read().unwrap();
     897            1 : 
     898            1 :         let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
     899            1 :             .ok()
     900            1 :             .flatten();
     901            1 : 
     902            1 :         peek_slot.is_some()
     903            1 :     }
     904              : 
     905          774 :     #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
     906              :     pub(crate) async fn upsert_location(
     907              :         &self,
     908              :         tenant_shard_id: TenantShardId,
     909              :         new_location_config: LocationConf,
     910              :         flush: Option<Duration>,
     911              :         mut spawn_mode: SpawnMode,
     912              :         ctx: &RequestContext,
     913              :     ) -> Result<Option<Arc<Tenant>>, UpsertLocationError> {
     914              :         debug_assert_current_span_has_tenant_id();
     915          774 :         info!("configuring tenant location to state {new_location_config:?}");
     916              : 
     917              :         enum FastPathModified {
     918              :             Attached(Arc<Tenant>),
     919              :             Secondary(Arc<SecondaryTenant>),
     920              :         }
     921              : 
     922              :         // Special case fast-path for updates to existing slots: if our upsert is only updating configuration,
     923              :         // then we do not need to set the slot to InProgress, we can just call into the
     924              :         // existng tenant.
     925              :         let fast_path_taken = {
     926              :             let locked = self.tenants.read().unwrap();
     927              :             let peek_slot =
     928              :                 tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
     929              :             match (&new_location_config.mode, peek_slot) {
     930              :                 (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
     931              :                     match attach_conf.generation.cmp(&tenant.generation) {
     932              :                         Ordering::Equal => {
     933              :                             // A transition from Attached to Attached in the same generation, we may
     934              :                             // take our fast path and just provide the updated configuration
     935              :                             // to the tenant.
     936              :                             tenant.set_new_location_config(
     937              :                                 AttachedTenantConf::try_from(new_location_config.clone())
     938              :                                     .map_err(UpsertLocationError::BadRequest)?,
     939              :                             );
     940              : 
     941              :                             Some(FastPathModified::Attached(tenant.clone()))
     942              :                         }
     943              :                         Ordering::Less => {
     944              :                             return Err(UpsertLocationError::BadRequest(anyhow::anyhow!(
     945              :                                 "Generation {:?} is less than existing {:?}",
     946              :                                 attach_conf.generation,
     947              :                                 tenant.generation
     948              :                             )));
     949              :                         }
     950              :                         Ordering::Greater => {
     951              :                             // Generation advanced, fall through to general case of replacing `Tenant` object
     952              :                             None
     953              :                         }
     954              :                     }
     955              :                 }
     956              :                 (
     957              :                     LocationMode::Secondary(secondary_conf),
     958              :                     Some(TenantSlot::Secondary(secondary_tenant)),
     959              :                 ) => {
     960              :                     secondary_tenant.set_config(secondary_conf);
     961              :                     secondary_tenant.set_tenant_conf(&new_location_config.tenant_conf);
     962              :                     Some(FastPathModified::Secondary(secondary_tenant.clone()))
     963              :                 }
     964              :                 _ => {
     965              :                     // Not an Attached->Attached transition, fall through to general case
     966              :                     None
     967              :                 }
     968              :             }
     969              :         };
     970              : 
     971              :         // Fast-path continued: having dropped out of the self.tenants lock, do the async
     972              :         // phase of writing config and/or waiting for flush, before returning.
     973              :         match fast_path_taken {
     974              :             Some(FastPathModified::Attached(tenant)) => {
     975              :                 Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
     976              :                     .await?;
     977              : 
     978              :                 // Transition to AttachedStale means we may well hold a valid generation
     979              :                 // still, and have been requested to go stale as part of a migration.  If
     980              :                 // the caller set `flush`, then flush to remote storage.
     981              :                 if let LocationMode::Attached(AttachedLocationConfig {
     982              :                     generation: _,
     983              :                     attach_mode: AttachmentMode::Stale,
     984              :                 }) = &new_location_config.mode
     985              :                 {
     986              :                     if let Some(flush_timeout) = flush {
     987              :                         match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
     988              :                             Ok(Err(e)) => {
     989              :                                 return Err(UpsertLocationError::Flush(e));
     990              :                             }
     991              :                             Ok(Ok(_)) => return Ok(Some(tenant)),
     992              :                             Err(_) => {
     993            0 :                                 tracing::warn!(
     994            0 :                                 timeout_ms = flush_timeout.as_millis(),
     995            0 :                                 "Timed out waiting for flush to remote storage, proceeding anyway."
     996            0 :                             )
     997              :                             }
     998              :                         }
     999              :                     }
    1000              :                 }
    1001              : 
    1002              :                 return Ok(Some(tenant));
    1003              :             }
    1004              :             Some(FastPathModified::Secondary(_secondary_tenant)) => {
    1005              :                 Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
    1006              :                     .await?;
    1007              : 
    1008              :                 return Ok(None);
    1009              :             }
    1010              :             None => {
    1011              :                 // Proceed with the general case procedure, where we will shutdown & remove any existing
    1012              :                 // slot contents and replace with a fresh one
    1013              :             }
    1014              :         };
    1015              : 
    1016              :         // General case for upserts to TenantsMap, excluding the case above: we will substitute an
    1017              :         // InProgress value to the slot while we make whatever changes are required.  The state for
    1018              :         // the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
    1019              :         // the state is ill-defined while we're in transition.  Transitions are async, but fast: we do
    1020              :         // not do significant I/O, and shutdowns should be prompt via cancellation tokens.
    1021              :         let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)
    1022           18 :             .map_err(|e| match e {
    1023              :                 TenantSlotError::AlreadyExists(_, _) | TenantSlotError::NotFound(_) => {
    1024            0 :                     unreachable!("Called with mode Any")
    1025              :                 }
    1026           18 :                 TenantSlotError::InProgress => UpsertLocationError::InProgress,
    1027            0 :                 TenantSlotError::MapState(s) => UpsertLocationError::Unavailable(s),
    1028           18 :             })?;
    1029              : 
    1030              :         match slot_guard.get_old_value() {
    1031              :             Some(TenantSlot::Attached(tenant)) => {
    1032              :                 // The case where we keep a Tenant alive was covered above in the special case
    1033              :                 // for Attached->Attached transitions in the same generation.  By this point,
    1034              :                 // if we see an attached tenant we know it will be discarded and should be
    1035              :                 // shut down.
    1036              :                 let (_guard, progress) = utils::completion::channel();
    1037              : 
    1038              :                 match tenant.get_attach_mode() {
    1039              :                     AttachmentMode::Single | AttachmentMode::Multi => {
    1040              :                         // Before we leave our state as the presumed holder of the latest generation,
    1041              :                         // flush any outstanding deletions to reduce the risk of leaking objects.
    1042              :                         self.resources.deletion_queue_client.flush_advisory()
    1043              :                     }
    1044              :                     AttachmentMode::Stale => {
    1045              :                         // If we're stale there's not point trying to flush deletions
    1046              :                     }
    1047              :                 };
    1048              : 
    1049           74 :                 info!("Shutting down attached tenant");
    1050              :                 match tenant.shutdown(progress, false).await {
    1051              :                     Ok(()) => {}
    1052              :                     Err(barrier) => {
    1053            0 :                         info!("Shutdown already in progress, waiting for it to complete");
    1054              :                         barrier.wait().await;
    1055              :                     }
    1056              :                 }
    1057              :                 slot_guard.drop_old_value().expect("We just shut it down");
    1058              : 
    1059              :                 // Edge case: if we were called with SpawnMode::Create, but a Tenant already existed, then
    1060              :                 // the caller thinks they're creating but the tenant already existed.  We must switch to
    1061              :                 // Normal mode so that when starting this Tenant we properly probe remote storage for timelines,
    1062              :                 // rather than assuming it to be empty.
    1063              :                 spawn_mode = SpawnMode::Normal;
    1064              :             }
    1065              :             Some(TenantSlot::Secondary(state)) => {
    1066           17 :                 info!("Shutting down secondary tenant");
    1067              :                 state.shutdown().await;
    1068              :             }
    1069              :             Some(TenantSlot::InProgress(_)) => {
    1070              :                 // This should never happen: acquire_slot should error out
    1071              :                 // if the contents of a slot were InProgress.
    1072              :                 return Err(UpsertLocationError::Other(anyhow::anyhow!(
    1073              :                     "Acquired an InProgress slot, this is a bug."
    1074              :                 )));
    1075              :             }
    1076              :             None => {
    1077              :                 // Slot was vacant, nothing needs shutting down.
    1078              :             }
    1079              :         }
    1080              : 
    1081              :         let tenant_path = self.conf.tenant_path(&tenant_shard_id);
    1082              :         let timelines_path = self.conf.timelines_path(&tenant_shard_id);
    1083              : 
    1084              :         // Directory structure is the same for attached and secondary modes:
    1085              :         // create it if it doesn't exist.  Timeline load/creation expects the
    1086              :         // timelines/ subdir to already exist.
    1087              :         //
    1088              :         // Does not need to be fsync'd because local storage is just a cache.
    1089              :         tokio::fs::create_dir_all(&timelines_path)
    1090              :             .await
    1091            0 :             .with_context(|| format!("Creating {timelines_path}"))?;
    1092              : 
    1093              :         // Before activating either secondary or attached mode, persist the
    1094              :         // configuration, so that on restart we will re-attach (or re-start
    1095              :         // secondary) on the tenant.
    1096              :         Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config).await?;
    1097              : 
    1098              :         let new_slot = match &new_location_config.mode {
    1099              :             LocationMode::Secondary(secondary_config) => {
    1100              :                 let shard_identity = new_location_config.shard;
    1101              :                 TenantSlot::Secondary(SecondaryTenant::new(
    1102              :                     tenant_shard_id,
    1103              :                     shard_identity,
    1104              :                     new_location_config.tenant_conf,
    1105              :                     secondary_config,
    1106              :                 ))
    1107              :             }
    1108              :             LocationMode::Attached(_attach_config) => {
    1109              :                 let shard_identity = new_location_config.shard;
    1110              : 
    1111              :                 // Testing hack: if we are configured with no control plane, then drop the generation
    1112              :                 // from upserts.  This enables creating generation-less tenants even though neon_local
    1113              :                 // always uses generations when calling the location conf API.
    1114              :                 let attached_conf = if cfg!(feature = "testing") {
    1115              :                     let mut conf = AttachedTenantConf::try_from(new_location_config)?;
    1116              :                     if self.conf.control_plane_api.is_none() {
    1117              :                         conf.location.generation = Generation::none();
    1118              :                     }
    1119              :                     conf
    1120              :                 } else {
    1121              :                     AttachedTenantConf::try_from(new_location_config)?
    1122              :                 };
    1123              : 
    1124              :                 let tenant = tenant_spawn(
    1125              :                     self.conf,
    1126              :                     tenant_shard_id,
    1127              :                     &tenant_path,
    1128              :                     self.resources.clone(),
    1129              :                     attached_conf,
    1130              :                     shard_identity,
    1131              :                     None,
    1132              :                     self.tenants,
    1133              :                     spawn_mode,
    1134              :                     ctx,
    1135              :                 )?;
    1136              : 
    1137              :                 TenantSlot::Attached(tenant)
    1138              :             }
    1139              :         };
    1140              : 
    1141              :         let attached_tenant = if let TenantSlot::Attached(tenant) = &new_slot {
    1142              :             Some(tenant.clone())
    1143              :         } else {
    1144              :             None
    1145              :         };
    1146              : 
    1147              :         match slot_guard.upsert(new_slot) {
    1148              :             Err(TenantSlotUpsertError::InternalError(e)) => {
    1149              :                 Err(UpsertLocationError::Other(anyhow::anyhow!(e)))
    1150              :             }
    1151              :             Err(TenantSlotUpsertError::MapState(e)) => Err(UpsertLocationError::Unavailable(e)),
    1152              :             Err(TenantSlotUpsertError::ShuttingDown((new_slot, _completion))) => {
    1153              :                 // If we just called tenant_spawn() on a new tenant, and can't insert it into our map, then
    1154              :                 // we must not leak it: this would violate the invariant that after shutdown_all_tenants, all tenants
    1155              :                 // are shutdown.
    1156              :                 //
    1157              :                 // We must shut it down inline here.
    1158              :                 match new_slot {
    1159              :                     TenantSlot::InProgress(_) => {
    1160              :                         // Unreachable because we never insert an InProgress
    1161              :                         unreachable!()
    1162              :                     }
    1163              :                     TenantSlot::Attached(tenant) => {
    1164              :                         let (_guard, progress) = utils::completion::channel();
    1165            7 :                         info!("Shutting down just-spawned tenant, because tenant manager is shut down");
    1166              :                         match tenant.shutdown(progress, false).await {
    1167              :                             Ok(()) => {
    1168            7 :                                 info!("Finished shutting down just-spawned tenant");
    1169              :                             }
    1170              :                             Err(barrier) => {
    1171            0 :                                 info!("Shutdown already in progress, waiting for it to complete");
    1172              :                                 barrier.wait().await;
    1173              :                             }
    1174              :                         }
    1175              :                     }
    1176              :                     TenantSlot::Secondary(secondary_tenant) => {
    1177              :                         secondary_tenant.shutdown().await;
    1178              :                     }
    1179              :                 }
    1180              : 
    1181              :                 Err(UpsertLocationError::Unavailable(
    1182              :                     TenantMapError::ShuttingDown,
    1183              :                 ))
    1184              :             }
    1185              :             Ok(()) => Ok(attached_tenant),
    1186              :         }
    1187              :     }
    1188              : 
    1189              :     /// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
    1190              :     /// LocationConf that was last used to attach it.  Optionally, the local file cache may be
    1191              :     /// dropped before re-attaching.
    1192              :     ///
    1193              :     /// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
    1194              :     /// where an issue is identified that would go away with a restart of the tenant.
    1195              :     ///
    1196              :     /// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
    1197              :     /// to respect the cancellation tokens used in normal shutdown().
    1198            0 :     #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
    1199              :     pub(crate) async fn reset_tenant(
    1200              :         &self,
    1201              :         tenant_shard_id: TenantShardId,
    1202              :         drop_cache: bool,
    1203              :         ctx: RequestContext,
    1204              :     ) -> anyhow::Result<()> {
    1205              :         let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
    1206              :         let Some(old_slot) = slot_guard.get_old_value() else {
    1207              :             anyhow::bail!("Tenant not found when trying to reset");
    1208              :         };
    1209              : 
    1210              :         let Some(tenant) = old_slot.get_attached() else {
    1211              :             slot_guard.revert();
    1212              :             anyhow::bail!("Tenant is not in attached state");
    1213              :         };
    1214              : 
    1215              :         let (_guard, progress) = utils::completion::channel();
    1216              :         match tenant.shutdown(progress, false).await {
    1217              :             Ok(()) => {
    1218              :                 slot_guard.drop_old_value()?;
    1219              :             }
    1220              :             Err(_barrier) => {
    1221              :                 slot_guard.revert();
    1222              :                 anyhow::bail!("Cannot reset Tenant, already shutting down");
    1223              :             }
    1224              :         }
    1225              : 
    1226              :         let tenant_path = self.conf.tenant_path(&tenant_shard_id);
    1227              :         let timelines_path = self.conf.timelines_path(&tenant_shard_id);
    1228              :         let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
    1229              : 
    1230              :         if drop_cache {
    1231            1 :             tracing::info!("Dropping local file cache");
    1232              : 
    1233              :             match tokio::fs::read_dir(&timelines_path).await {
    1234              :                 Err(e) => {
    1235            0 :                     tracing::warn!("Failed to list timelines while dropping cache: {}", e);
    1236              :                 }
    1237              :                 Ok(mut entries) => {
    1238              :                     while let Some(entry) = entries.next_entry().await? {
    1239              :                         tokio::fs::remove_dir_all(entry.path()).await?;
    1240              :                     }
    1241              :                 }
    1242              :             }
    1243              :         }
    1244              : 
    1245              :         let shard_identity = config.shard;
    1246              :         let tenant = tenant_spawn(
    1247              :             self.conf,
    1248              :             tenant_shard_id,
    1249              :             &tenant_path,
    1250              :             self.resources.clone(),
    1251              :             AttachedTenantConf::try_from(config)?,
    1252              :             shard_identity,
    1253              :             None,
    1254              :             self.tenants,
    1255              :             SpawnMode::Normal,
    1256              :             &ctx,
    1257              :         )?;
    1258              : 
    1259              :         slot_guard.upsert(TenantSlot::Attached(tenant))?;
    1260              : 
    1261              :         Ok(())
    1262              :     }
    1263              : 
    1264         1180 :     pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec<Arc<Tenant>> {
    1265         1180 :         let locked = self.tenants.read().unwrap();
    1266         1180 :         match &*locked {
    1267            0 :             TenantsMap::Initializing => Vec::new(),
    1268         1180 :             TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => map
    1269         1180 :                 .values()
    1270         1180 :                 .filter_map(|slot| {
    1271          984 :                     slot.get_attached()
    1272          984 :                         .and_then(|t| if t.is_active() { Some(t.clone()) } else { None })
    1273         1180 :                 })
    1274         1180 :                 .collect(),
    1275              :         }
    1276         1180 :     }
    1277              :     // Do some synchronous work for all tenant slots in Secondary state.  The provided
    1278              :     // callback should be small and fast, as it will be called inside the global
    1279              :     // TenantsMap lock.
    1280         1196 :     pub(crate) fn foreach_secondary_tenants<F>(&self, mut func: F)
    1281         1196 :     where
    1282         1196 :         // TODO: let the callback return a hint to drop out of the loop early
    1283         1196 :         F: FnMut(&TenantShardId, &Arc<SecondaryTenant>),
    1284         1196 :     {
    1285         1196 :         let locked = self.tenants.read().unwrap();
    1286              : 
    1287         1196 :         let map = match &*locked {
    1288            0 :             TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
    1289         1196 :             TenantsMap::Open(m) => m,
    1290              :         };
    1291              : 
    1292         2218 :         for (tenant_id, slot) in map {
    1293         1022 :             if let TenantSlot::Secondary(state) = slot {
    1294              :                 // Only expose secondary tenants that are not currently shutting down
    1295           13 :                 if !state.cancel.is_cancelled() {
    1296           13 :                     func(tenant_id, state)
    1297            0 :                 }
    1298         1009 :             }
    1299              :         }
    1300         1196 :     }
    1301              : 
    1302              :     /// Total list of all tenant slots: this includes attached, secondary, and InProgress.
    1303            5 :     pub(crate) fn list(&self) -> Vec<(TenantShardId, TenantSlot)> {
    1304            5 :         let locked = self.tenants.read().unwrap();
    1305            5 :         match &*locked {
    1306            0 :             TenantsMap::Initializing => Vec::new(),
    1307            5 :             TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
    1308            5 :                 map.iter().map(|(k, v)| (*k, v.clone())).collect()
    1309              :             }
    1310              :         }
    1311            5 :     }
    1312              : 
    1313          120 :     pub(crate) async fn delete_tenant(
    1314          120 :         &self,
    1315          120 :         tenant_shard_id: TenantShardId,
    1316          120 :         activation_timeout: Duration,
    1317          120 :     ) -> Result<(), DeleteTenantError> {
    1318          120 :         super::span::debug_assert_current_span_has_tenant_id();
    1319              :         // We acquire a SlotGuard during this function to protect against concurrent
    1320              :         // changes while the ::prepare phase of DeleteTenantFlow executes, but then
    1321              :         // have to return the Tenant to the map while the background deletion runs.
    1322              :         //
    1323              :         // TODO: refactor deletion to happen outside the lifetime of a Tenant.
    1324              :         // Currently, deletion requires a reference to the tenants map in order to
    1325              :         // keep the Tenant in the map until deletion is complete, and then remove
    1326              :         // it at the end.
    1327              :         //
    1328              :         // See https://github.com/neondatabase/neon/issues/5080
    1329              : 
    1330          104 :         let slot_guard =
    1331          120 :             tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
    1332              : 
    1333              :         // unwrap is safe because we used MustExist mode when acquiring
    1334          104 :         let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
    1335          104 :             TenantSlot::Attached(tenant) => tenant.clone(),
    1336              :             _ => {
    1337              :                 // Express "not attached" as equivalent to "not found"
    1338            0 :                 return Err(DeleteTenantError::NotAttached);
    1339              :             }
    1340              :         };
    1341              : 
    1342          104 :         match tenant.current_state() {
    1343           26 :             TenantState::Broken { .. } | TenantState::Stopping { .. } => {
    1344           26 :                 // If a tenant is broken or stopping, DeleteTenantFlow can
    1345           26 :                 // handle it: broken tenants proceed to delete, stopping tenants
    1346           26 :                 // are checked for deletion already in progress.
    1347           26 :             }
    1348              :             _ => {
    1349           78 :                 tenant
    1350           78 :                     .wait_to_become_active(activation_timeout)
    1351            2 :                     .await
    1352           78 :                     .map_err(|e| match e {
    1353              :                         GetActiveTenantError::WillNotBecomeActive(_) => {
    1354            0 :                             DeleteTenantError::InvalidState(tenant.current_state())
    1355              :                         }
    1356            0 :                         GetActiveTenantError::Cancelled => DeleteTenantError::Cancelled,
    1357            0 :                         GetActiveTenantError::NotFound(_) => DeleteTenantError::NotAttached,
    1358              :                         GetActiveTenantError::WaitForActiveTimeout {
    1359            0 :                             latest_state: _latest_state,
    1360            0 :                             wait_time: _wait_time,
    1361            0 :                         } => DeleteTenantError::InvalidState(tenant.current_state()),
    1362           78 :                     })?;
    1363              :             }
    1364              :         }
    1365              : 
    1366          104 :         let result = DeleteTenantFlow::run(
    1367          104 :             self.conf,
    1368          104 :             self.resources.remote_storage.clone(),
    1369          104 :             &TENANTS,
    1370          104 :             tenant,
    1371          104 :         )
    1372          528 :         .await;
    1373              : 
    1374              :         // The Tenant goes back into the map in Stopping state, it will eventually be removed by DeleteTenantFLow
    1375          104 :         slot_guard.revert();
    1376          104 :         result
    1377          120 :     }
    1378              : }
    1379              : 
    1380           12 : #[derive(Debug, thiserror::Error)]
    1381              : pub(crate) enum GetTenantError {
    1382              :     /// NotFound is a TenantId rather than TenantShardId, because this error type is used from
    1383              :     /// getters that use a TenantId and a ShardSelector, not just getters that target a specific shard.
    1384              :     #[error("Tenant {0} not found")]
    1385              :     NotFound(TenantId),
    1386              : 
    1387              :     #[error("Tenant {0} is not active")]
    1388              :     NotActive(TenantShardId),
    1389              :     /// Broken is logically a subset of NotActive, but a distinct error is useful as
    1390              :     /// NotActive is usually a retryable state for API purposes, whereas Broken
    1391              :     /// is a stuck error state
    1392              :     #[error("Tenant is broken: {0}")]
    1393              :     Broken(String),
    1394              : 
    1395              :     // Initializing or shutting down: cannot authoritatively say whether we have this tenant
    1396              :     #[error("Tenant map is not available: {0}")]
    1397              :     MapState(#[from] TenantMapError),
    1398              : }
    1399              : 
    1400              : /// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
    1401              : /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
    1402              : ///
    1403              : /// This method is cancel-safe.
    1404         6066 : pub(crate) fn get_tenant(
    1405         6066 :     tenant_shard_id: TenantShardId,
    1406         6066 :     active_only: bool,
    1407         6066 : ) -> Result<Arc<Tenant>, GetTenantError> {
    1408         6066 :     let locked = TENANTS.read().unwrap();
    1409              : 
    1410         6066 :     let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
    1411              : 
    1412         5999 :     match peek_slot {
    1413         5999 :         Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
    1414              :             TenantState::Broken {
    1415            1 :                 reason,
    1416            1 :                 backtrace: _,
    1417            1 :             } if active_only => Err(GetTenantError::Broken(reason)),
    1418         5637 :             TenantState::Active => Ok(Arc::clone(tenant)),
    1419              :             _ => {
    1420          361 :                 if active_only {
    1421            5 :                     Err(GetTenantError::NotActive(tenant_shard_id))
    1422              :                 } else {
    1423          356 :                     Ok(Arc::clone(tenant))
    1424              :                 }
    1425              :             }
    1426              :         },
    1427            0 :         Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
    1428              :         None | Some(TenantSlot::Secondary(_)) => {
    1429           67 :             Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
    1430              :         }
    1431              :     }
    1432         6066 : }
    1433              : 
    1434           13 : #[derive(thiserror::Error, Debug)]
    1435              : pub(crate) enum GetActiveTenantError {
    1436              :     /// We may time out either while TenantSlot is InProgress, or while the Tenant
    1437              :     /// is in a non-Active state
    1438              :     #[error(
    1439              :         "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
    1440              :     )]
    1441              :     WaitForActiveTimeout {
    1442              :         latest_state: Option<TenantState>,
    1443              :         wait_time: Duration,
    1444              :     },
    1445              : 
    1446              :     /// The TenantSlot is absent, or in secondary mode
    1447              :     #[error(transparent)]
    1448              :     NotFound(#[from] GetTenantError),
    1449              : 
    1450              :     /// Cancellation token fired while we were waiting
    1451              :     #[error("cancelled")]
    1452              :     Cancelled,
    1453              : 
    1454              :     /// Tenant exists, but is in a state that cannot become active (e.g. Stopping, Broken)
    1455              :     #[error("will not become active.  Current state: {0}")]
    1456              :     WillNotBecomeActive(TenantState),
    1457              : }
    1458              : 
    1459              : /// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`]
    1460              : /// state, then wait for up to `timeout`.  If the [`Tenant`] is not currently in [`TenantState::Active`],
    1461              : /// then wait for up to `timeout` (minus however long we waited for the slot).
    1462        20696 : pub(crate) async fn get_active_tenant_with_timeout(
    1463        20696 :     tenant_id: TenantId,
    1464        20696 :     shard_selector: ShardSelector,
    1465        20696 :     timeout: Duration,
    1466        20696 :     cancel: &CancellationToken,
    1467        20696 : ) -> Result<Arc<Tenant>, GetActiveTenantError> {
    1468        20696 :     enum WaitFor {
    1469        20696 :         Barrier(utils::completion::Barrier),
    1470        20696 :         Tenant(Arc<Tenant>),
    1471        20696 :     }
    1472        20696 : 
    1473        20696 :     let wait_start = Instant::now();
    1474        20696 :     let deadline = wait_start + timeout;
    1475              : 
    1476          133 :     let (wait_for, tenant_shard_id) = {
    1477        20696 :         let locked = TENANTS.read().unwrap();
    1478              : 
    1479              :         // Resolve TenantId to TenantShardId
    1480        20696 :         let tenant_shard_id = locked
    1481        20696 :             .resolve_attached_shard(&tenant_id, shard_selector)
    1482        20696 :             .ok_or(GetActiveTenantError::NotFound(GetTenantError::NotFound(
    1483        20696 :                 tenant_id,
    1484        20696 :             )))?;
    1485              : 
    1486        20684 :         let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
    1487        20684 :             .map_err(GetTenantError::MapState)?;
    1488        20684 :         match peek_slot {
    1489        20684 :             Some(TenantSlot::Attached(tenant)) => {
    1490        20684 :                 match tenant.current_state() {
    1491              :                     TenantState::Active => {
    1492              :                         // Fast path: we don't need to do any async waiting.
    1493        20551 :                         return Ok(tenant.clone());
    1494              :                     }
    1495              :                     _ => {
    1496          133 :                         tenant.activate_now();
    1497          133 :                         (WaitFor::Tenant(tenant.clone()), tenant_shard_id)
    1498              :                     }
    1499              :                 }
    1500              :             }
    1501              :             Some(TenantSlot::Secondary(_)) => {
    1502            0 :                 return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
    1503            0 :                     tenant_shard_id,
    1504            0 :                 )))
    1505              :             }
    1506            0 :             Some(TenantSlot::InProgress(barrier)) => {
    1507            0 :                 (WaitFor::Barrier(barrier.clone()), tenant_shard_id)
    1508              :             }
    1509              :             None => {
    1510            0 :                 return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
    1511            0 :                     tenant_id,
    1512            0 :                 )))
    1513              :             }
    1514              :         }
    1515              :     };
    1516              : 
    1517          133 :     let tenant = match wait_for {
    1518            0 :         WaitFor::Barrier(barrier) => {
    1519            0 :             tracing::debug!("Waiting for tenant InProgress state to pass...");
    1520            0 :             timeout_cancellable(
    1521            0 :                 deadline.duration_since(Instant::now()),
    1522            0 :                 cancel,
    1523            0 :                 barrier.wait(),
    1524            0 :             )
    1525            0 :             .await
    1526            0 :             .map_err(|e| match e {
    1527            0 :                 TimeoutCancellableError::Timeout => GetActiveTenantError::WaitForActiveTimeout {
    1528            0 :                     latest_state: None,
    1529            0 :                     wait_time: wait_start.elapsed(),
    1530            0 :                 },
    1531            0 :                 TimeoutCancellableError::Cancelled => GetActiveTenantError::Cancelled,
    1532            0 :             })?;
    1533              :             {
    1534            0 :                 let locked = TENANTS.read().unwrap();
    1535            0 :                 let peek_slot =
    1536            0 :                     tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
    1537            0 :                         .map_err(GetTenantError::MapState)?;
    1538            0 :                 match peek_slot {
    1539            0 :                     Some(TenantSlot::Attached(tenant)) => tenant.clone(),
    1540              :                     _ => {
    1541            0 :                         return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
    1542            0 :                             tenant_shard_id,
    1543            0 :                         )))
    1544              :                     }
    1545              :                 }
    1546              :             }
    1547              :         }
    1548          133 :         WaitFor::Tenant(tenant) => tenant,
    1549              :     };
    1550              : 
    1551            0 :     tracing::debug!("Waiting for tenant to enter active state...");
    1552          133 :     tenant
    1553          133 :         .wait_to_become_active(deadline.duration_since(Instant::now()))
    1554          398 :         .await?;
    1555          133 :     Ok(tenant)
    1556        20696 : }
    1557              : 
    1558            0 : #[derive(Debug, thiserror::Error)]
    1559              : pub(crate) enum DeleteTimelineError {
    1560              :     #[error("Tenant {0}")]
    1561              :     Tenant(#[from] GetTenantError),
    1562              : 
    1563              :     #[error("Timeline {0}")]
    1564              :     Timeline(#[from] crate::tenant::DeleteTimelineError),
    1565              : }
    1566              : 
    1567            0 : #[derive(Debug, thiserror::Error)]
    1568              : pub(crate) enum TenantStateError {
    1569              :     #[error("Tenant {0} is stopping")]
    1570              :     IsStopping(TenantShardId),
    1571              :     #[error(transparent)]
    1572              :     SlotError(#[from] TenantSlotError),
    1573              :     #[error(transparent)]
    1574              :     SlotUpsertError(#[from] TenantSlotUpsertError),
    1575              :     #[error(transparent)]
    1576              :     Other(#[from] anyhow::Error),
    1577              : }
    1578              : 
    1579           88 : pub(crate) async fn detach_tenant(
    1580           88 :     conf: &'static PageServerConf,
    1581           88 :     tenant_shard_id: TenantShardId,
    1582           88 :     detach_ignored: bool,
    1583           88 :     deletion_queue_client: &DeletionQueueClient,
    1584           88 : ) -> Result<(), TenantStateError> {
    1585           88 :     let tmp_path = detach_tenant0(
    1586           88 :         conf,
    1587           88 :         &TENANTS,
    1588           88 :         tenant_shard_id,
    1589           88 :         detach_ignored,
    1590           88 :         deletion_queue_client,
    1591           88 :     )
    1592          393 :     .await?;
    1593              :     // Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
    1594              :     // After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
    1595           78 :     let task_tenant_id = None;
    1596           78 :     task_mgr::spawn(
    1597           78 :         task_mgr::BACKGROUND_RUNTIME.handle(),
    1598           78 :         TaskKind::MgmtRequest,
    1599           78 :         task_tenant_id,
    1600           78 :         None,
    1601           78 :         "tenant_files_delete",
    1602           78 :         false,
    1603           78 :         async move {
    1604           78 :             fs::remove_dir_all(tmp_path.as_path())
    1605           78 :                 .await
    1606           78 :                 .with_context(|| format!("tenant directory {:?} deletion", tmp_path))
    1607           78 :         },
    1608           78 :     );
    1609           78 :     Ok(())
    1610           88 : }
    1611              : 
    1612           88 : async fn detach_tenant0(
    1613           88 :     conf: &'static PageServerConf,
    1614           88 :     tenants: &std::sync::RwLock<TenantsMap>,
    1615           88 :     tenant_shard_id: TenantShardId,
    1616           88 :     detach_ignored: bool,
    1617           88 :     deletion_queue_client: &DeletionQueueClient,
    1618           88 : ) -> Result<Utf8PathBuf, TenantStateError> {
    1619           88 :     let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
    1620           78 :         let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
    1621           78 :         safe_rename_tenant_dir(&local_tenant_directory)
    1622          234 :             .await
    1623           88 :             .with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))
    1624           78 :     };
    1625              : 
    1626           88 :     let removal_result = remove_tenant_from_memory(
    1627           88 :         tenants,
    1628           88 :         tenant_shard_id,
    1629           88 :         tenant_dir_rename_operation(tenant_shard_id),
    1630           88 :     )
    1631          390 :     .await;
    1632              : 
    1633              :     // Flush pending deletions, so that they have a good chance of passing validation
    1634              :     // before this tenant is potentially re-attached elsewhere.
    1635           88 :     deletion_queue_client.flush_advisory();
    1636           88 : 
    1637           88 :     // Ignored tenants are not present in memory and will bail the removal from memory operation.
    1638           88 :     // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
    1639           88 :     if detach_ignored
    1640              :         && matches!(
    1641            8 :             removal_result,
    1642              :             Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
    1643              :         )
    1644              :     {
    1645            8 :         let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
    1646            8 :         if tenant_ignore_mark.exists() {
    1647            1 :             info!("Detaching an ignored tenant");
    1648            1 :             let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
    1649            3 :                 .await
    1650            1 :                 .with_context(|| {
    1651            0 :                     format!("Ignored tenant {tenant_shard_id} local directory rename")
    1652            1 :                 })?;
    1653            1 :             return Ok(tmp_path);
    1654            7 :         }
    1655           80 :     }
    1656              : 
    1657           87 :     removal_result
    1658           88 : }
    1659              : 
    1660            4 : pub(crate) async fn load_tenant(
    1661            4 :     conf: &'static PageServerConf,
    1662            4 :     tenant_id: TenantId,
    1663            4 :     generation: Generation,
    1664            4 :     broker_client: storage_broker::BrokerClientChannel,
    1665            4 :     remote_storage: Option<GenericRemoteStorage>,
    1666            4 :     deletion_queue_client: DeletionQueueClient,
    1667            4 :     ctx: &RequestContext,
    1668            4 : ) -> Result<(), TenantMapInsertError> {
    1669            4 :     // This is a legacy API (replaced by `/location_conf`).  It does not support sharding
    1670            4 :     let tenant_shard_id = TenantShardId::unsharded(tenant_id);
    1671              : 
    1672            3 :     let slot_guard =
    1673            4 :         tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
    1674            3 :     let tenant_path = conf.tenant_path(&tenant_shard_id);
    1675            3 : 
    1676            3 :     let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
    1677            3 :     if tenant_ignore_mark.exists() {
    1678            3 :         std::fs::remove_file(&tenant_ignore_mark).with_context(|| {
    1679            0 :             format!(
    1680            0 :                 "Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"
    1681            0 :             )
    1682            3 :         })?;
    1683            0 :     }
    1684              : 
    1685            3 :     let resources = TenantSharedResources {
    1686            3 :         broker_client,
    1687            3 :         remote_storage,
    1688            3 :         deletion_queue_client,
    1689            3 :     };
    1690              : 
    1691            3 :     let mut location_conf =
    1692            3 :         Tenant::load_tenant_config(conf, &tenant_shard_id).map_err(TenantMapInsertError::Other)?;
    1693            3 :     location_conf.attach_in_generation(generation);
    1694            3 : 
    1695            6 :     Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
    1696              : 
    1697            3 :     let shard_identity = location_conf.shard;
    1698            3 :     let new_tenant = tenant_spawn(
    1699            3 :         conf,
    1700            3 :         tenant_shard_id,
    1701            3 :         &tenant_path,
    1702            3 :         resources,
    1703            3 :         AttachedTenantConf::try_from(location_conf)?,
    1704            3 :         shard_identity,
    1705            3 :         None,
    1706            3 :         &TENANTS,
    1707            3 :         SpawnMode::Normal,
    1708            3 :         ctx,
    1709            3 :     )
    1710            3 :     .with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))?;
    1711              : 
    1712            3 :     slot_guard.upsert(TenantSlot::Attached(new_tenant))?;
    1713            3 :     Ok(())
    1714            4 : }
    1715              : 
    1716            5 : pub(crate) async fn ignore_tenant(
    1717            5 :     conf: &'static PageServerConf,
    1718            5 :     tenant_id: TenantId,
    1719            5 : ) -> Result<(), TenantStateError> {
    1720           15 :     ignore_tenant0(conf, &TENANTS, tenant_id).await
    1721            5 : }
    1722              : 
    1723            0 : #[instrument(skip_all, fields(shard_id))]
    1724              : async fn ignore_tenant0(
    1725              :     conf: &'static PageServerConf,
    1726              :     tenants: &std::sync::RwLock<TenantsMap>,
    1727              :     tenant_id: TenantId,
    1728              : ) -> Result<(), TenantStateError> {
    1729              :     // This is a legacy API (replaced by `/location_conf`).  It does not support sharding
    1730              :     let tenant_shard_id = TenantShardId::unsharded(tenant_id);
    1731              :     tracing::Span::current().record(
    1732              :         "shard_id",
    1733              :         tracing::field::display(tenant_shard_id.shard_slug()),
    1734              :     );
    1735              : 
    1736            5 :     remove_tenant_from_memory(tenants, tenant_shard_id, async {
    1737            5 :         let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
    1738            5 :         fs::File::create(&ignore_mark_file)
    1739            5 :             .await
    1740            5 :             .context("Failed to create ignore mark file")
    1741            5 :             .and_then(|_| {
    1742            5 :                 crashsafe::fsync_file_and_parent(&ignore_mark_file)
    1743            5 :                     .context("Failed to fsync ignore mark file")
    1744            5 :             })
    1745            5 :             .with_context(|| format!("Failed to crate ignore mark for tenant {tenant_shard_id}"))?;
    1746            5 :         Ok(())
    1747            5 :     })
    1748              :     .await
    1749              : }
    1750              : 
    1751            0 : #[derive(Debug, thiserror::Error)]
    1752              : pub(crate) enum TenantMapListError {
    1753              :     #[error("tenant map is still initiailizing")]
    1754              :     Initializing,
    1755              : }
    1756              : 
    1757              : ///
    1758              : /// Get list of tenants, for the mgmt API
    1759              : ///
    1760          232 : pub(crate) async fn list_tenants(
    1761          232 : ) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
    1762          232 :     let tenants = TENANTS.read().unwrap();
    1763          232 :     let m = match &*tenants {
    1764            0 :         TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
    1765          232 :         TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
    1766          232 :     };
    1767          232 :     Ok(m.iter()
    1768          242 :         .filter_map(|(id, tenant)| match tenant {
    1769          240 :             TenantSlot::Attached(tenant) => {
    1770          240 :                 Some((*id, tenant.current_state(), tenant.generation()))
    1771              :             }
    1772            2 :             TenantSlot::Secondary(_) => None,
    1773            0 :             TenantSlot::InProgress(_) => None,
    1774          242 :         })
    1775          232 :         .collect())
    1776          232 : }
    1777              : 
    1778            0 : #[derive(Debug, thiserror::Error)]
    1779              : pub(crate) enum TenantMapInsertError {
    1780              :     #[error(transparent)]
    1781              :     SlotError(#[from] TenantSlotError),
    1782              :     #[error(transparent)]
    1783              :     SlotUpsertError(#[from] TenantSlotUpsertError),
    1784              :     #[error(transparent)]
    1785              :     Other(#[from] anyhow::Error),
    1786              : }
    1787              : 
    1788              : /// Superset of TenantMapError: issues that can occur when acquiring a slot
    1789              : /// for a particular tenant ID.
    1790            1 : #[derive(Debug, thiserror::Error)]
    1791              : pub(crate) enum TenantSlotError {
    1792              :     /// When acquiring a slot with the expectation that the tenant already exists.
    1793              :     #[error("Tenant {0} not found")]
    1794              :     NotFound(TenantShardId),
    1795              : 
    1796              :     /// When acquiring a slot with the expectation that the tenant does not already exist.
    1797              :     #[error("tenant {0} already exists, state: {1:?}")]
    1798              :     AlreadyExists(TenantShardId, TenantState),
    1799              : 
    1800              :     // Tried to read a slot that is currently being mutated by another administrative
    1801              :     // operation.
    1802              :     #[error("tenant has a state change in progress, try again later")]
    1803              :     InProgress,
    1804              : 
    1805              :     #[error(transparent)]
    1806              :     MapState(#[from] TenantMapError),
    1807              : }
    1808              : 
    1809              : /// Superset of TenantMapError: issues that can occur when using a SlotGuard
    1810              : /// to insert a new value.
    1811            0 : #[derive(thiserror::Error)]
    1812              : pub(crate) enum TenantSlotUpsertError {
    1813              :     /// An error where the slot is in an unexpected state, indicating a code bug
    1814              :     #[error("Internal error updating Tenant")]
    1815              :     InternalError(Cow<'static, str>),
    1816              : 
    1817              :     #[error(transparent)]
    1818              :     MapState(TenantMapError),
    1819              : 
    1820              :     // If we encounter TenantManager shutdown during upsert, we must carry the Completion
    1821              :     // from the SlotGuard, so that the caller can hold it while they clean up: otherwise
    1822              :     // TenantManager shutdown might race ahead before we're done cleaning up any Tenant that
    1823              :     // was protected by the SlotGuard.
    1824              :     #[error("Shutting down")]
    1825              :     ShuttingDown((TenantSlot, utils::completion::Completion)),
    1826              : }
    1827              : 
    1828              : impl std::fmt::Debug for TenantSlotUpsertError {
    1829            0 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
    1830            0 :         match self {
    1831            0 :             Self::InternalError(reason) => write!(f, "Internal Error {reason}"),
    1832            0 :             Self::MapState(map_error) => write!(f, "Tenant map state: {map_error:?}"),
    1833            0 :             Self::ShuttingDown(_completion) => write!(f, "Tenant map shutting down"),
    1834              :         }
    1835            0 :     }
    1836              : }
    1837              : 
    1838            0 : #[derive(Debug, thiserror::Error)]
    1839              : enum TenantSlotDropError {
    1840              :     /// It is only legal to drop a TenantSlot if its contents are fully shut down
    1841              :     #[error("Tenant was not shut down")]
    1842              :     NotShutdown,
    1843              : }
    1844              : 
    1845              : /// Errors that can happen any time we are walking the tenant map to try and acquire
    1846              : /// the TenantSlot for a particular tenant.
    1847            0 : #[derive(Debug, thiserror::Error)]
    1848              : pub enum TenantMapError {
    1849              :     // Tried to read while initializing
    1850              :     #[error("tenant map is still initializing")]
    1851              :     StillInitializing,
    1852              : 
    1853              :     // Tried to read while shutting down
    1854              :     #[error("tenant map is shutting down")]
    1855              :     ShuttingDown,
    1856              : }
    1857              : 
    1858              : /// Guards a particular tenant_id's content in the TenantsMap.  While this
    1859              : /// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
    1860              : /// for this tenant, which acts as a marker for any operations targeting
    1861              : /// this tenant to retry later, or wait for the InProgress state to end.
    1862              : ///
    1863              : /// This structure enforces the important invariant that we do not have overlapping
    1864              : /// tasks that will try use local storage for a the same tenant ID: we enforce that
    1865              : /// the previous contents of a slot have been shut down before the slot can be
    1866              : /// left empty or used for something else
    1867              : ///
    1868              : /// Holders of a SlotGuard should explicitly dispose of it, using either `upsert`
    1869              : /// to provide a new value, or `revert` to put the slot back into its initial
    1870              : /// state.  If the SlotGuard is dropped without calling either of these, then
    1871              : /// we will leave the slot empty if our `old_value` is already shut down, else
    1872              : /// we will replace the slot with `old_value` (equivalent to doing a revert).
    1873              : ///
    1874              : /// The `old_value` may be dropped before the SlotGuard is dropped, by calling
    1875              : /// `drop_old_value`.  It is an error to call this without shutting down
    1876              : /// the conents of `old_value`.
    1877              : pub struct SlotGuard {
    1878              :     tenant_shard_id: TenantShardId,
    1879              :     old_value: Option<TenantSlot>,
    1880              :     upserted: bool,
    1881              : 
    1882              :     /// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will
    1883              :     /// release any waiters as soon as this SlotGuard is dropped.
    1884              :     completion: utils::completion::Completion,
    1885              : }
    1886              : 
    1887              : impl SlotGuard {
    1888          865 :     fn new(
    1889          865 :         tenant_shard_id: TenantShardId,
    1890          865 :         old_value: Option<TenantSlot>,
    1891          865 :         completion: utils::completion::Completion,
    1892          865 :     ) -> Self {
    1893          865 :         Self {
    1894          865 :             tenant_shard_id,
    1895          865 :             old_value,
    1896          865 :             upserted: false,
    1897          865 :             completion,
    1898          865 :         }
    1899          865 :     }
    1900              : 
    1901              :     /// Get any value that was present in the slot before we acquired ownership
    1902              :     /// of it: in state transitions, this will be the old state.
    1903          862 :     fn get_old_value(&self) -> &Option<TenantSlot> {
    1904          862 :         &self.old_value
    1905          862 :     }
    1906              : 
    1907              :     /// Emplace a new value in the slot.  This consumes the guard, and after
    1908              :     /// returning, the slot is no longer protected from concurrent changes.
    1909          780 :     fn upsert(mut self, new_value: TenantSlot) -> Result<(), TenantSlotUpsertError> {
    1910          780 :         if !self.old_value_is_shutdown() {
    1911              :             // This is a bug: callers should never try to drop an old value without
    1912              :             // shutting it down
    1913            0 :             return Err(TenantSlotUpsertError::InternalError(
    1914            0 :                 "Old TenantSlot value not shut down".into(),
    1915            0 :             ));
    1916          780 :         }
    1917              : 
    1918          773 :         let replaced = {
    1919          780 :             let mut locked = TENANTS.write().unwrap();
    1920          780 : 
    1921          780 :             if let TenantSlot::InProgress(_) = new_value {
    1922              :                 // It is never expected to try and upsert InProgress via this path: it should
    1923              :                 // only be written via the tenant_map_acquire_slot path.  If we hit this it's a bug.
    1924            0 :                 return Err(TenantSlotUpsertError::InternalError(
    1925            0 :                     "Attempt to upsert an InProgress state".into(),
    1926            0 :                 ));
    1927          780 :             }
    1928              : 
    1929          780 :             let m = match &mut *locked {
    1930              :                 TenantsMap::Initializing => {
    1931            0 :                     return Err(TenantSlotUpsertError::MapState(
    1932            0 :                         TenantMapError::StillInitializing,
    1933            0 :                     ))
    1934              :                 }
    1935              :                 TenantsMap::ShuttingDown(_) => {
    1936            7 :                     return Err(TenantSlotUpsertError::ShuttingDown((
    1937            7 :                         new_value,
    1938            7 :                         self.completion.clone(),
    1939            7 :                     )));
    1940              :                 }
    1941          773 :                 TenantsMap::Open(m) => m,
    1942          773 :             };
    1943          773 : 
    1944          773 :             let replaced = m.insert(self.tenant_shard_id, new_value);
    1945          773 :             self.upserted = true;
    1946          773 : 
    1947          773 :             METRICS.tenant_slots.set(m.len() as u64);
    1948          773 : 
    1949          773 :             replaced
    1950              :         };
    1951              : 
    1952              :         // Sanity check: on an upsert we should always be replacing an InProgress marker
    1953          773 :         match replaced {
    1954              :             Some(TenantSlot::InProgress(_)) => {
    1955              :                 // Expected case: we find our InProgress in the map: nothing should have
    1956              :                 // replaced it because the code that acquires slots will not grant another
    1957              :                 // one for the same TenantId.
    1958          773 :                 Ok(())
    1959              :             }
    1960              :             None => {
    1961            0 :                 METRICS.unexpected_errors.inc();
    1962            0 :                 error!(
    1963            0 :                     tenant_shard_id = %self.tenant_shard_id,
    1964            0 :                     "Missing InProgress marker during tenant upsert, this is a bug."
    1965            0 :                 );
    1966            0 :                 Err(TenantSlotUpsertError::InternalError(
    1967            0 :                     "Missing InProgress marker during tenant upsert".into(),
    1968            0 :                 ))
    1969              :             }
    1970            0 :             Some(slot) => {
    1971            0 :                 METRICS.unexpected_errors.inc();
    1972            0 :                 error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug.  Contents: {:?}", slot);
    1973            0 :                 Err(TenantSlotUpsertError::InternalError(
    1974            0 :                     "Unexpected contents of TenantSlot".into(),
    1975            0 :                 ))
    1976              :             }
    1977              :         }
    1978          780 :     }
    1979              : 
    1980              :     /// Replace the InProgress slot with whatever was in the guard when we started
    1981          104 :     fn revert(mut self) {
    1982          104 :         if let Some(value) = self.old_value.take() {
    1983          104 :             match self.upsert(value) {
    1984            0 :                 Err(TenantSlotUpsertError::InternalError(_)) => {
    1985            0 :                     // We already logged the error, nothing else we can do.
    1986            0 :                 }
    1987              :                 Err(
    1988              :                     TenantSlotUpsertError::MapState(_) | TenantSlotUpsertError::ShuttingDown(_),
    1989            0 :                 ) => {
    1990            0 :                     // If the map is shutting down, we need not replace anything
    1991            0 :                 }
    1992          104 :                 Ok(()) => {}
    1993              :             }
    1994            0 :         }
    1995          104 :     }
    1996              : 
    1997              :     /// We may never drop our old value until it is cleanly shut down: otherwise we might leave
    1998              :     /// rogue background tasks that would write to the local tenant directory that this guard
    1999              :     /// is responsible for protecting
    2000         1023 :     fn old_value_is_shutdown(&self) -> bool {
    2001         1023 :         match self.old_value.as_ref() {
    2002          155 :             Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(),
    2003           22 :             Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(),
    2004              :             Some(TenantSlot::InProgress(_)) => {
    2005              :                 // A SlotGuard cannot be constructed for a slot that was already InProgress
    2006            0 :                 unreachable!()
    2007              :             }
    2008          846 :             None => true,
    2009              :         }
    2010         1023 :     }
    2011              : 
    2012              :     /// The guard holder is done with the old value of the slot: they are obliged to already
    2013              :     /// shut it down before we reach this point.
    2014          160 :     fn drop_old_value(&mut self) -> Result<(), TenantSlotDropError> {
    2015          160 :         if !self.old_value_is_shutdown() {
    2016            0 :             Err(TenantSlotDropError::NotShutdown)
    2017              :         } else {
    2018          160 :             self.old_value.take();
    2019          160 :             Ok(())
    2020              :         }
    2021          160 :     }
    2022              : }
    2023              : 
    2024              : impl Drop for SlotGuard {
    2025          865 :     fn drop(&mut self) {
    2026          865 :         if self.upserted {
    2027          773 :             return;
    2028           92 :         }
    2029           92 :         // Our old value is already shutdown, or it never existed: it is safe
    2030           92 :         // for us to fully release the TenantSlot back into an empty state
    2031           92 : 
    2032           92 :         let mut locked = TENANTS.write().unwrap();
    2033              : 
    2034           92 :         let m = match &mut *locked {
    2035              :             TenantsMap::Initializing => {
    2036              :                 // There is no map, this should never happen.
    2037            2 :                 return;
    2038              :             }
    2039              :             TenantsMap::ShuttingDown(_) => {
    2040              :                 // When we transition to shutdown, InProgress elements are removed
    2041              :                 // from the map, so we do not need to clean up our Inprogress marker.
    2042              :                 // See [`shutdown_all_tenants0`]
    2043            7 :                 return;
    2044              :             }
    2045           83 :             TenantsMap::Open(m) => m,
    2046           83 :         };
    2047           83 : 
    2048           83 :         use std::collections::btree_map::Entry;
    2049           83 :         match m.entry(self.tenant_shard_id) {
    2050           83 :             Entry::Occupied(mut entry) => {
    2051           83 :                 if !matches!(entry.get(), TenantSlot::InProgress(_)) {
    2052            0 :                     METRICS.unexpected_errors.inc();
    2053            0 :                     error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug.  Contents: {:?}", entry.get());
    2054           83 :                 }
    2055              : 
    2056           83 :                 if self.old_value_is_shutdown() {
    2057           83 :                     entry.remove();
    2058           83 :                 } else {
    2059            0 :                     entry.insert(self.old_value.take().unwrap());
    2060            0 :                 }
    2061              :             }
    2062              :             Entry::Vacant(_) => {
    2063            0 :                 METRICS.unexpected_errors.inc();
    2064            0 :                 error!(
    2065            0 :                     tenant_shard_id = %self.tenant_shard_id,
    2066            0 :                     "Missing InProgress marker during SlotGuard drop, this is a bug."
    2067            0 :                 );
    2068              :             }
    2069              :         }
    2070              : 
    2071           83 :         METRICS.tenant_slots.set(m.len() as u64);
    2072          865 :     }
    2073              : }
    2074              : 
    2075              : enum TenantSlotPeekMode {
    2076              :     /// In Read mode, peek will be permitted to see the slots even if the pageserver is shutting down
    2077              :     Read,
    2078              :     /// In Write mode, trying to peek at a slot while the pageserver is shutting down is an error
    2079              :     Write,
    2080              : }
    2081              : 
    2082        28500 : fn tenant_map_peek_slot<'a>(
    2083        28500 :     tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
    2084        28500 :     tenant_shard_id: &TenantShardId,
    2085        28500 :     mode: TenantSlotPeekMode,
    2086        28500 : ) -> Result<Option<&'a TenantSlot>, TenantMapError> {
    2087        28500 :     match tenants.deref() {
    2088            0 :         TenantsMap::Initializing => Err(TenantMapError::StillInitializing),
    2089           18 :         TenantsMap::ShuttingDown(m) => match mode {
    2090              :             TenantSlotPeekMode::Read => Ok(Some(
    2091              :                 // When reading in ShuttingDown state, we must translate None results
    2092              :                 // into a ShuttingDown error, because absence of a tenant shard ID in the map
    2093              :                 // isn't a reliable indicator of the tenant being gone: it might have been
    2094              :                 // InProgress when shutdown started, and cleaned up from that state such
    2095              :                 // that it's now no longer in the map.  Callers will have to wait until
    2096              :                 // we next start up to get a proper answer.  This avoids incorrect 404 API responses.
    2097            0 :                 m.get(tenant_shard_id).ok_or(TenantMapError::ShuttingDown)?,
    2098              :             )),
    2099           18 :             TenantSlotPeekMode::Write => Err(TenantMapError::ShuttingDown),
    2100              :         },
    2101        28482 :         TenantsMap::Open(m) => Ok(m.get(tenant_shard_id)),
    2102              :     }
    2103        28500 : }
    2104              : 
    2105              : enum TenantSlotAcquireMode {
    2106              :     /// Acquire the slot irrespective of current state, or whether it already exists
    2107              :     Any,
    2108              :     /// Return an error if trying to acquire a slot and it doesn't already exist
    2109              :     MustExist,
    2110              :     /// Return an error if trying to acquire a slot and it already exists
    2111              :     MustNotExist,
    2112              : }
    2113              : 
    2114          816 : fn tenant_map_acquire_slot(
    2115          816 :     tenant_shard_id: &TenantShardId,
    2116          816 :     mode: TenantSlotAcquireMode,
    2117          816 : ) -> Result<SlotGuard, TenantSlotError> {
    2118          816 :     tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode)
    2119          816 : }
    2120              : 
    2121          911 : fn tenant_map_acquire_slot_impl(
    2122          911 :     tenant_shard_id: &TenantShardId,
    2123          911 :     tenants: &std::sync::RwLock<TenantsMap>,
    2124          911 :     mode: TenantSlotAcquireMode,
    2125          911 : ) -> Result<SlotGuard, TenantSlotError> {
    2126          911 :     use TenantSlotAcquireMode::*;
    2127          911 :     METRICS.tenant_slot_writes.inc();
    2128          911 : 
    2129          911 :     let mut locked = tenants.write().unwrap();
    2130          911 :     let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug());
    2131          911 :     let _guard = span.enter();
    2132              : 
    2133          911 :     let m = match &mut *locked {
    2134            0 :         TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
    2135            0 :         TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
    2136          911 :         TenantsMap::Open(m) => m,
    2137          911 :     };
    2138          911 : 
    2139          911 :     use std::collections::btree_map::Entry;
    2140          911 : 
    2141          911 :     let entry = m.entry(*tenant_shard_id);
    2142          911 : 
    2143          911 :     match entry {
    2144          609 :         Entry::Vacant(v) => match mode {
    2145              :             MustExist => {
    2146           25 :                 tracing::debug!("Vacant && MustExist: return NotFound");
    2147           25 :                 Err(TenantSlotError::NotFound(*tenant_shard_id))
    2148              :             }
    2149              :             _ => {
    2150          584 :                 let (completion, barrier) = utils::completion::channel();
    2151          584 :                 v.insert(TenantSlot::InProgress(barrier));
    2152          584 :                 tracing::debug!("Vacant, inserted InProgress");
    2153          584 :                 Ok(SlotGuard::new(*tenant_shard_id, None, completion))
    2154              :             }
    2155              :         },
    2156          302 :         Entry::Occupied(mut o) => {
    2157          302 :             // Apply mode-driven checks
    2158          302 :             match (o.get(), mode) {
    2159              :                 (TenantSlot::InProgress(_), _) => {
    2160           20 :                     tracing::debug!("Occupied, failing for InProgress");
    2161           20 :                     Err(TenantSlotError::InProgress)
    2162              :                 }
    2163            1 :                 (slot, MustNotExist) => match slot {
    2164            1 :                     TenantSlot::Attached(tenant) => {
    2165            1 :                         tracing::debug!("Attached && MustNotExist, return AlreadyExists");
    2166            1 :                         Err(TenantSlotError::AlreadyExists(
    2167            1 :                             *tenant_shard_id,
    2168            1 :                             tenant.current_state(),
    2169            1 :                         ))
    2170              :                     }
    2171              :                     _ => {
    2172              :                         // FIXME: the AlreadyExists error assumes that we have a Tenant
    2173              :                         // to get the state from
    2174            0 :                         tracing::debug!("Occupied & MustNotExist, return AlreadyExists");
    2175            0 :                         Err(TenantSlotError::AlreadyExists(
    2176            0 :                             *tenant_shard_id,
    2177            0 :                             TenantState::Broken {
    2178            0 :                                 reason: "Present but not attached".to_string(),
    2179            0 :                                 backtrace: "".to_string(),
    2180            0 :                             },
    2181            0 :                         ))
    2182              :                     }
    2183              :                 },
    2184              :                 _ => {
    2185              :                     // Happy case: the slot was not in any state that violated our mode
    2186          281 :                     let (completion, barrier) = utils::completion::channel();
    2187          281 :                     let old_value = o.insert(TenantSlot::InProgress(barrier));
    2188          281 :                     tracing::debug!("Occupied, replaced with InProgress");
    2189          281 :                     Ok(SlotGuard::new(
    2190          281 :                         *tenant_shard_id,
    2191          281 :                         Some(old_value),
    2192          281 :                         completion,
    2193          281 :                     ))
    2194              :                 }
    2195              :             }
    2196              :         }
    2197              :     }
    2198          911 : }
    2199              : 
    2200              : /// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
    2201              : /// Allows to remove other tenant resources manually, via `tenant_cleanup`.
    2202              : /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
    2203              : /// operation would be needed to remove it.
    2204           95 : async fn remove_tenant_from_memory<V, F>(
    2205           95 :     tenants: &std::sync::RwLock<TenantsMap>,
    2206           95 :     tenant_shard_id: TenantShardId,
    2207           95 :     tenant_cleanup: F,
    2208           95 : ) -> Result<V, TenantStateError>
    2209           95 : where
    2210           95 :     F: std::future::Future<Output = anyhow::Result<V>>,
    2211           95 : {
    2212              :     use utils::completion;
    2213              : 
    2214           84 :     let mut slot_guard =
    2215           95 :         tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
    2216              : 
    2217              :     // allow pageserver shutdown to await for our completion
    2218           84 :     let (_guard, progress) = completion::channel();
    2219              : 
    2220              :     // The SlotGuard allows us to manipulate the Tenant object without fear of some
    2221              :     // concurrent API request doing something else for the same tenant ID.
    2222           84 :     let attached_tenant = match slot_guard.get_old_value() {
    2223           79 :         Some(TenantSlot::Attached(tenant)) => {
    2224           79 :             // whenever we remove a tenant from memory, we don't want to flush and wait for upload
    2225           79 :             let freeze_and_flush = false;
    2226           79 : 
    2227           79 :             // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
    2228           79 :             // that we can continue safely to cleanup.
    2229          169 :             match tenant.shutdown(progress, freeze_and_flush).await {
    2230           79 :                 Ok(()) => {}
    2231            0 :                 Err(_other) => {
    2232            0 :                     // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
    2233            0 :                     // wait for it but return an error right away because these are distinct requests.
    2234            0 :                     slot_guard.revert();
    2235            0 :                     return Err(TenantStateError::IsStopping(tenant_shard_id));
    2236              :                 }
    2237              :             }
    2238           79 :             Some(tenant)
    2239              :         }
    2240            5 :         Some(TenantSlot::Secondary(secondary_state)) => {
    2241            5 :             tracing::info!("Shutting down in secondary mode");
    2242            5 :             secondary_state.shutdown().await;
    2243            5 :             None
    2244              :         }
    2245              :         Some(TenantSlot::InProgress(_)) => {
    2246              :             // Acquiring a slot guarantees its old value was not InProgress
    2247            0 :             unreachable!();
    2248              :         }
    2249            0 :         None => None,
    2250              :     };
    2251              : 
    2252           84 :     match tenant_cleanup
    2253          238 :         .await
    2254           84 :         .with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
    2255              :     {
    2256           84 :         Ok(hook_value) => {
    2257           84 :             // Success: drop the old TenantSlot::Attached.
    2258           84 :             slot_guard
    2259           84 :                 .drop_old_value()
    2260           84 :                 .expect("We just called shutdown");
    2261           84 : 
    2262           84 :             Ok(hook_value)
    2263              :         }
    2264            0 :         Err(e) => {
    2265              :             // If we had a Tenant, set it to Broken and put it back in the TenantsMap
    2266            0 :             if let Some(attached_tenant) = attached_tenant {
    2267            0 :                 attached_tenant.set_broken(e.to_string()).await;
    2268            0 :             }
    2269              :             // Leave the broken tenant in the map
    2270            0 :             slot_guard.revert();
    2271            0 : 
    2272            0 :             Err(TenantStateError::Other(e))
    2273              :         }
    2274              :     }
    2275           95 : }
    2276              : 
    2277              : use {
    2278              :     crate::repository::GcResult, pageserver_api::models::TimelineGcRequest,
    2279              :     utils::http::error::ApiError,
    2280              : };
    2281              : 
    2282          251 : pub(crate) async fn immediate_gc(
    2283          251 :     tenant_shard_id: TenantShardId,
    2284          251 :     timeline_id: TimelineId,
    2285          251 :     gc_req: TimelineGcRequest,
    2286          251 :     cancel: CancellationToken,
    2287          251 :     ctx: &RequestContext,
    2288          251 : ) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
    2289          251 :     let guard = TENANTS.read().unwrap();
    2290              : 
    2291          251 :     let tenant = guard
    2292          251 :         .get(&tenant_shard_id)
    2293          251 :         .map(Arc::clone)
    2294          251 :         .with_context(|| format!("tenant {tenant_shard_id}"))
    2295          251 :         .map_err(|e| ApiError::NotFound(e.into()))?;
    2296              : 
    2297          250 :     let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
    2298          250 :     // Use tenant's pitr setting
    2299          250 :     let pitr = tenant.get_pitr_interval();
    2300          250 : 
    2301          250 :     // Run in task_mgr to avoid race with tenant_detach operation
    2302          250 :     let ctx = ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
    2303          250 :     let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
    2304          250 :     // TODO: spawning is redundant now, need to hold the gate
    2305          250 :     task_mgr::spawn(
    2306          250 :         &tokio::runtime::Handle::current(),
    2307          250 :         TaskKind::GarbageCollector,
    2308          250 :         Some(tenant_shard_id),
    2309          250 :         Some(timeline_id),
    2310          250 :         &format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"),
    2311          250 :         false,
    2312          250 :         async move {
    2313            0 :             fail::fail_point!("immediate_gc_task_pre");
    2314              : 
    2315              :             #[allow(unused_mut)]
    2316          250 :             let mut result = tenant
    2317          250 :                 .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
    2318          250 :                 .instrument(info_span!("manual_gc", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))
    2319          200 :                 .await;
    2320              :                 // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
    2321              :                 // better once the types support it.
    2322              : 
    2323              :             #[cfg(feature = "testing")]
    2324              :             {
    2325          250 :                 if let Ok(result) = result.as_mut() {
    2326              :                     // why not futures unordered? it seems it needs very much the same task structure
    2327              :                     // but would only run on single task.
    2328          249 :                     let mut js = tokio::task::JoinSet::new();
    2329         1031 :                     for layer in std::mem::take(&mut result.doomed_layers) {
    2330         1031 :                         js.spawn(layer.wait_drop());
    2331         1031 :                     }
    2332          249 :                     tracing::info!(total = js.len(), "starting to wait for the gc'd layers to be dropped");
    2333         1280 :                     while let Some(res) = js.join_next().await {
    2334         1031 :                         res.expect("wait_drop should not panic");
    2335         1031 :                     }
    2336            1 :                 }
    2337              : 
    2338          250 :                 let timeline = tenant.get_timeline(timeline_id, false).ok();
    2339          250 :                 let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref());
    2340              : 
    2341          250 :                 if let Some(rtc) = rtc {
    2342              :                     // layer drops schedule actions on remote timeline client to actually do the
    2343              :                     // deletions; don't care just exit fast about the shutdown error
    2344          249 :                     drop(rtc.wait_completion().await);
    2345            1 :                 }
    2346              :             }
    2347              : 
    2348          250 :             match task_done.send(result) {
    2349          250 :                 Ok(_) => (),
    2350            0 :                 Err(result) => error!("failed to send gc result: {result:?}"),
    2351              :             }
    2352          250 :             Ok(())
    2353          250 :         }
    2354          250 :     );
    2355          250 : 
    2356          250 :     // drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
    2357          250 :     drop(guard);
    2358          250 : 
    2359          250 :     Ok(wait_task_done)
    2360          251 : }
    2361              : 
    2362              : #[cfg(test)]
    2363              : mod tests {
    2364              :     use std::collections::BTreeMap;
    2365              :     use std::sync::Arc;
    2366              :     use tracing::Instrument;
    2367              : 
    2368              :     use crate::tenant::mgr::TenantSlot;
    2369              : 
    2370              :     use super::{super::harness::TenantHarness, TenantsMap};
    2371              : 
    2372            2 :     #[tokio::test(start_paused = true)]
    2373            2 :     async fn shutdown_awaits_in_progress_tenant() {
    2374            2 :         // Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully
    2375            2 :         // wait for it to complete before proceeding.
    2376            2 : 
    2377            2 :         let h = TenantHarness::create("shutdown_awaits_in_progress_tenant").unwrap();
    2378            2 :         let (t, _ctx) = h.load().await;
    2379              : 
    2380              :         // harness loads it to active, which is forced and nothing is running on the tenant
    2381              : 
    2382            2 :         let id = t.tenant_shard_id();
    2383            2 : 
    2384            2 :         // tenant harness configures the logging and we cannot escape it
    2385            2 :         let span = h.span();
    2386            2 :         let _e = span.enter();
    2387            2 : 
    2388            2 :         let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
    2389            2 :         let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
    2390            2 : 
    2391            2 :         // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
    2392            2 :         // permit it to proceed: that will stick the tenant in InProgress
    2393            2 : 
    2394            2 :         let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
    2395            2 :         let (until_cleanup_started, cleanup_started) = utils::completion::channel();
    2396            2 :         let mut remove_tenant_from_memory_task = {
    2397            2 :             let jh = tokio::spawn({
    2398            2 :                 let tenants = tenants.clone();
    2399            2 :                 async move {
    2400            2 :                     let cleanup = async move {
    2401            2 :                         drop(until_cleanup_started);
    2402            2 :                         can_complete_cleanup.wait().await;
    2403            2 :                         anyhow::Ok(())
    2404            2 :                     };
    2405            2 :                     super::remove_tenant_from_memory(&tenants, id, cleanup).await
    2406            2 :                 }
    2407            2 :                 .instrument(h.span())
    2408            2 :             });
    2409            2 : 
    2410            2 :             // now the long cleanup should be in place, with the stopping state
    2411            2 :             cleanup_started.wait().await;
    2412            2 :             jh
    2413              :         };
    2414              : 
    2415            2 :         let mut shutdown_task = {
    2416            2 :             let (until_shutdown_started, shutdown_started) = utils::completion::channel();
    2417            2 : 
    2418            2 :             let shutdown_task = tokio::spawn(async move {
    2419            2 :                 drop(until_shutdown_started);
    2420            4 :                 super::shutdown_all_tenants0(&tenants).await;
    2421            2 :             });
    2422            2 : 
    2423            2 :             shutdown_started.wait().await;
    2424            2 :             shutdown_task
    2425            2 :         };
    2426            2 : 
    2427            2 :         let long_time = std::time::Duration::from_secs(15);
    2428            4 :         tokio::select! {
    2429            4 :             _ = &mut shutdown_task => unreachable!("shutdown should block on remove_tenant_from_memory completing"),
    2430            4 :             _ = &mut remove_tenant_from_memory_task => unreachable!("remove_tenant_from_memory_task should not complete until explicitly unblocked"),
    2431            4 :             _ = tokio::time::sleep(long_time) => {},
    2432            4 :         }
    2433              : 
    2434            2 :         drop(until_cleanup_completed);
    2435            2 : 
    2436            2 :         // Now that we allow it to proceed, shutdown should complete immediately
    2437            2 :         remove_tenant_from_memory_task.await.unwrap().unwrap();
    2438            2 :         shutdown_task.await.unwrap();
    2439              :     }
    2440              : }
        

Generated by: LCOV version 2.1-beta