LCOV - code coverage report
Current view: top level - pageserver/src/tenant - mgr.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 16.1 % 1283 206
Test Date: 2024-02-29 11:57:12 Functions: 8.3 % 252 21

            Line data    Source code
       1              : //! This module acts as a switchboard to access different repositories managed by this
       2              : //! page server.
       3              : 
       4              : use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
       5              : use futures::stream::StreamExt;
       6              : use itertools::Itertools;
       7              : use pageserver_api::key::Key;
       8              : use pageserver_api::models::ShardParameters;
       9              : use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId};
      10              : use rand::{distributions::Alphanumeric, Rng};
      11              : use std::borrow::Cow;
      12              : use std::cmp::Ordering;
      13              : use std::collections::{BTreeMap, HashMap};
      14              : use std::ops::Deref;
      15              : use std::sync::Arc;
      16              : use std::time::{Duration, Instant};
      17              : use tokio::fs;
      18              : use utils::timeout::{timeout_cancellable, TimeoutCancellableError};
      19              : 
      20              : use anyhow::Context;
      21              : use once_cell::sync::Lazy;
      22              : use tokio::task::JoinSet;
      23              : use tokio_util::sync::CancellationToken;
      24              : use tracing::*;
      25              : 
      26              : use remote_storage::GenericRemoteStorage;
      27              : use utils::{completion, crashsafe};
      28              : 
      29              : use crate::config::PageServerConf;
      30              : use crate::context::{DownloadBehavior, RequestContext};
      31              : use crate::control_plane_client::{
      32              :     ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError,
      33              : };
      34              : use crate::deletion_queue::DeletionQueueClient;
      35              : use crate::http::routes::ACTIVE_TENANT_TIMEOUT;
      36              : use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
      37              : use crate::task_mgr::{self, TaskKind};
      38              : use crate::tenant::config::{
      39              :     AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, SecondaryLocationConfig,
      40              :     TenantConfOpt,
      41              : };
      42              : use crate::tenant::delete::DeleteTenantFlow;
      43              : use crate::tenant::span::debug_assert_current_span_has_tenant_id;
      44              : use crate::tenant::{AttachedTenantConf, SpawnMode, Tenant, TenantState};
      45              : use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TEMP_FILE_SUFFIX};
      46              : 
      47              : use utils::crashsafe::path_with_suffix_extension;
      48              : use utils::fs_ext::PathExt;
      49              : use utils::generation::Generation;
      50              : use utils::id::{TenantId, TimelineId};
      51              : 
      52              : use super::delete::DeleteTenantError;
      53              : use super::secondary::SecondaryTenant;
      54              : use super::TenantSharedResources;
      55              : 
      56              : /// For a tenant that appears in TenantsMap, it may either be
      57              : /// - `Attached`: has a full Tenant object, is elegible to service
      58              : ///    reads and ingest WAL.
      59              : /// - `Secondary`: is only keeping a local cache warm.
      60              : ///
      61              : /// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
      62              : /// that way we avoid having to carefully switch a tenant's ingestion etc on and off during
      63              : /// its lifetime, and we can preserve some important safety invariants like `Tenant` always
      64              : /// having a properly acquired generation (Secondary doesn't need a generation)
      65            0 : #[derive(Clone)]
      66              : pub(crate) enum TenantSlot {
      67              :     Attached(Arc<Tenant>),
      68              :     Secondary(Arc<SecondaryTenant>),
      69              :     /// In this state, other administrative operations acting on the TenantId should
      70              :     /// block, or return a retry indicator equivalent to HTTP 503.
      71              :     InProgress(utils::completion::Barrier),
      72              : }
      73              : 
      74              : impl std::fmt::Debug for TenantSlot {
      75            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      76            0 :         match self {
      77            0 :             Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()),
      78            0 :             Self::Secondary(_) => write!(f, "Secondary"),
      79            0 :             Self::InProgress(_) => write!(f, "InProgress"),
      80              :         }
      81            0 :     }
      82              : }
      83              : 
      84              : impl TenantSlot {
      85              :     /// Return the `Tenant` in this slot if attached, else None
      86            0 :     fn get_attached(&self) -> Option<&Arc<Tenant>> {
      87            0 :         match self {
      88            0 :             Self::Attached(t) => Some(t),
      89            0 :             Self::Secondary(_) => None,
      90            0 :             Self::InProgress(_) => None,
      91              :         }
      92            0 :     }
      93              : }
      94              : 
      95              : /// The tenants known to the pageserver.
      96              : /// The enum variants are used to distinguish the different states that the pageserver can be in.
      97              : pub(crate) enum TenantsMap {
      98              :     /// [`init_tenant_mgr`] is not done yet.
      99              :     Initializing,
     100              :     /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
     101              :     /// New tenants can be added using [`tenant_map_acquire_slot`].
     102              :     Open(BTreeMap<TenantShardId, TenantSlot>),
     103              :     /// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
     104              :     /// Existing tenants are still accessible, but no new tenants can be created.
     105              :     ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
     106              : }
     107              : 
     108              : pub(crate) enum TenantsMapRemoveResult {
     109              :     Occupied(TenantSlot),
     110              :     Vacant,
     111              :     InProgress(utils::completion::Barrier),
     112              : }
     113              : 
     114              : /// When resolving a TenantId to a shard, we may be looking for the 0th
     115              : /// shard, or we might be looking for whichever shard holds a particular page.
     116              : pub(crate) enum ShardSelector {
     117              :     /// Only return the 0th shard, if it is present.  If a non-0th shard is present,
     118              :     /// ignore it.
     119              :     Zero,
     120              :     /// Pick the first shard we find for the TenantId
     121              :     First,
     122              :     /// Pick the shard that holds this key
     123              :     Page(Key),
     124              : }
     125              : 
     126              : impl TenantsMap {
     127              :     /// Convenience function for typical usage, where we want to get a `Tenant` object, for
     128              :     /// working with attached tenants.  If the TenantId is in the map but in Secondary state,
     129              :     /// None is returned.
     130            0 :     pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc<Tenant>> {
     131            0 :         match self {
     132            0 :             TenantsMap::Initializing => None,
     133            0 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
     134            0 :                 m.get(tenant_shard_id).and_then(|slot| slot.get_attached())
     135              :             }
     136              :         }
     137            0 :     }
     138              : 
     139              :     /// A page service client sends a TenantId, and to look up the correct Tenant we must
     140              :     /// resolve this to a fully qualified TenantShardId.
     141            0 :     fn resolve_attached_shard(
     142            0 :         &self,
     143            0 :         tenant_id: &TenantId,
     144            0 :         selector: ShardSelector,
     145            0 :     ) -> Option<TenantShardId> {
     146            0 :         let mut want_shard = None;
     147            0 :         match self {
     148            0 :             TenantsMap::Initializing => None,
     149            0 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
     150            0 :                 for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
     151              :                     // Ignore all slots that don't contain an attached tenant
     152            0 :                     let tenant = match &slot.1 {
     153            0 :                         TenantSlot::Attached(t) => t,
     154            0 :                         _ => continue,
     155              :                     };
     156              : 
     157            0 :                     match selector {
     158            0 :                         ShardSelector::First => return Some(*slot.0),
     159            0 :                         ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
     160            0 :                             return Some(*slot.0)
     161              :                         }
     162            0 :                         ShardSelector::Page(key) => {
     163            0 :                             // First slot we see for this tenant, calculate the expected shard number
     164            0 :                             // for the key: we will use this for checking if this and subsequent
     165            0 :                             // slots contain the key, rather than recalculating the hash each time.
     166            0 :                             if want_shard.is_none() {
     167            0 :                                 want_shard = Some(tenant.shard_identity.get_shard_number(&key));
     168            0 :                             }
     169              : 
     170            0 :                             if Some(tenant.shard_identity.number) == want_shard {
     171            0 :                                 return Some(*slot.0);
     172            0 :                             }
     173              :                         }
     174            0 :                         _ => continue,
     175              :                     }
     176              :                 }
     177              : 
     178              :                 // Fall through: we didn't find an acceptable shard
     179            0 :                 None
     180              :             }
     181              :         }
     182            0 :     }
     183              : 
     184              :     /// Only for use from DeleteTenantFlow.  This method directly removes a TenantSlot from the map.
     185              :     ///
     186              :     /// The normal way to remove a tenant is using a SlotGuard, which will gracefully remove the guarded
     187              :     /// slot if the enclosed tenant is shutdown.
     188            0 :     pub(crate) fn remove(&mut self, tenant_shard_id: TenantShardId) -> TenantsMapRemoveResult {
     189            0 :         use std::collections::btree_map::Entry;
     190            0 :         match self {
     191            0 :             TenantsMap::Initializing => TenantsMapRemoveResult::Vacant,
     192            0 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => match m.entry(tenant_shard_id) {
     193            0 :                 Entry::Occupied(entry) => match entry.get() {
     194            0 :                     TenantSlot::InProgress(barrier) => {
     195            0 :                         TenantsMapRemoveResult::InProgress(barrier.clone())
     196              :                     }
     197            0 :                     _ => TenantsMapRemoveResult::Occupied(entry.remove()),
     198              :                 },
     199            0 :                 Entry::Vacant(_entry) => TenantsMapRemoveResult::Vacant,
     200              :             },
     201              :         }
     202            0 :     }
     203              : 
     204            0 :     pub(crate) fn len(&self) -> usize {
     205            0 :         match self {
     206            0 :             TenantsMap::Initializing => 0,
     207            0 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.len(),
     208              :         }
     209            0 :     }
     210              : }
     211              : 
     212              : /// This is "safe" in that that it won't leave behind a partially deleted directory
     213              : /// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting
     214              : /// the contents.
     215              : ///
     216              : /// This is pageserver-specific, as it relies on future processes after a crash to check
     217              : /// for TEMP_FILE_SUFFIX when loading things.
     218            0 : async fn safe_remove_tenant_dir_all(path: impl AsRef<Utf8Path>) -> std::io::Result<()> {
     219            0 :     let tmp_path = safe_rename_tenant_dir(path).await?;
     220            0 :     fs::remove_dir_all(tmp_path).await
     221            0 : }
     222              : 
     223            0 : async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<Utf8PathBuf> {
     224            0 :     let parent = path
     225            0 :         .as_ref()
     226            0 :         .parent()
     227            0 :         // It is invalid to call this function with a relative path.  Tenant directories
     228            0 :         // should always have a parent.
     229            0 :         .ok_or(std::io::Error::new(
     230            0 :             std::io::ErrorKind::InvalidInput,
     231            0 :             "Path must be absolute",
     232            0 :         ))?;
     233            0 :     let rand_suffix = rand::thread_rng()
     234            0 :         .sample_iter(&Alphanumeric)
     235            0 :         .take(8)
     236            0 :         .map(char::from)
     237            0 :         .collect::<String>()
     238            0 :         + TEMP_FILE_SUFFIX;
     239            0 :     let tmp_path = path_with_suffix_extension(&path, &rand_suffix);
     240            0 :     fs::rename(path.as_ref(), &tmp_path).await?;
     241            0 :     fs::File::open(parent).await?.sync_all().await?;
     242            0 :     Ok(tmp_path)
     243            0 : }
     244              : 
     245              : static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
     246            2 :     Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
     247              : 
     248              : /// The TenantManager is responsible for storing and mutating the collection of all tenants
     249              : /// that this pageserver process has state for.  Every Tenant and SecondaryTenant instance
     250              : /// lives inside the TenantManager.
     251              : ///
     252              : /// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
     253              : /// the same tenant twice concurrently, or trying to configure the same tenant into secondary
     254              : /// and attached modes concurrently.
     255              : pub struct TenantManager {
     256              :     conf: &'static PageServerConf,
     257              :     // TODO: currently this is a &'static pointing to TENANTs.  When we finish refactoring
     258              :     // out of that static variable, the TenantManager can own this.
     259              :     // See https://github.com/neondatabase/neon/issues/5796
     260              :     tenants: &'static std::sync::RwLock<TenantsMap>,
     261              :     resources: TenantSharedResources,
     262              : }
     263              : 
     264            0 : fn emergency_generations(
     265            0 :     tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
     266            0 : ) -> HashMap<TenantShardId, Generation> {
     267            0 :     tenant_confs
     268            0 :         .iter()
     269            0 :         .filter_map(|(tid, lc)| {
     270            0 :             let lc = match lc {
     271            0 :                 Ok(lc) => lc,
     272            0 :                 Err(_) => return None,
     273              :             };
     274            0 :             let gen = match &lc.mode {
     275            0 :                 LocationMode::Attached(alc) => Some(alc.generation),
     276            0 :                 LocationMode::Secondary(_) => None,
     277              :             };
     278              : 
     279            0 :             gen.map(|g| (*tid, g))
     280            0 :         })
     281            0 :         .collect()
     282            0 : }
     283              : 
     284            0 : async fn init_load_generations(
     285            0 :     conf: &'static PageServerConf,
     286            0 :     tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
     287            0 :     resources: &TenantSharedResources,
     288            0 :     cancel: &CancellationToken,
     289            0 : ) -> anyhow::Result<Option<HashMap<TenantShardId, Generation>>> {
     290            0 :     let generations = if conf.control_plane_emergency_mode {
     291            0 :         error!(
     292            0 :             "Emergency mode!  Tenants will be attached unsafely using their last known generation"
     293            0 :         );
     294            0 :         emergency_generations(tenant_confs)
     295            0 :     } else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
     296            0 :         info!("Calling control plane API to re-attach tenants");
     297              :         // If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
     298            0 :         match client.re_attach().await {
     299            0 :             Ok(tenants) => tenants,
     300              :             Err(RetryForeverError::ShuttingDown) => {
     301            0 :                 anyhow::bail!("Shut down while waiting for control plane re-attach response")
     302              :             }
     303              :         }
     304              :     } else {
     305            0 :         info!("Control plane API not configured, tenant generations are disabled");
     306            0 :         return Ok(None);
     307              :     };
     308              : 
     309              :     // The deletion queue needs to know about the startup attachment state to decide which (if any) stored
     310              :     // deletion list entries may still be valid.  We provide that by pushing a recovery operation into
     311              :     // the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
     312              :     // are processed, even though we don't block on recovery completing here.
     313              :     //
     314              :     // Must only do this if remote storage is enabled, otherwise deletion queue
     315              :     // is not running and channel push will fail.
     316            0 :     if resources.remote_storage.is_some() {
     317            0 :         resources
     318            0 :             .deletion_queue_client
     319            0 :             .recover(generations.clone())?;
     320            0 :     }
     321              : 
     322            0 :     Ok(Some(generations))
     323            0 : }
     324              : 
     325              : /// Given a directory discovered in the pageserver's tenants/ directory, attempt
     326              : /// to load a tenant config from it.
     327              : ///
     328              : /// If file is missing, return Ok(None)
     329            0 : fn load_tenant_config(
     330            0 :     conf: &'static PageServerConf,
     331            0 :     dentry: Utf8DirEntry,
     332            0 : ) -> anyhow::Result<Option<(TenantShardId, anyhow::Result<LocationConf>)>> {
     333            0 :     let tenant_dir_path = dentry.path().to_path_buf();
     334            0 :     if crate::is_temporary(&tenant_dir_path) {
     335            0 :         info!("Found temporary tenant directory, removing: {tenant_dir_path}");
     336              :         // No need to use safe_remove_tenant_dir_all because this is already
     337              :         // a temporary path
     338            0 :         if let Err(e) = std::fs::remove_dir_all(&tenant_dir_path) {
     339            0 :             error!(
     340            0 :                 "Failed to remove temporary directory '{}': {:?}",
     341            0 :                 tenant_dir_path, e
     342            0 :             );
     343            0 :         }
     344            0 :         return Ok(None);
     345            0 :     }
     346              : 
     347              :     // This case happens if we crash during attachment before writing a config into the dir
     348            0 :     let is_empty = tenant_dir_path
     349            0 :         .is_empty_dir()
     350            0 :         .with_context(|| format!("Failed to check whether {tenant_dir_path:?} is an empty dir"))?;
     351            0 :     if is_empty {
     352            0 :         info!("removing empty tenant directory {tenant_dir_path:?}");
     353            0 :         if let Err(e) = std::fs::remove_dir(&tenant_dir_path) {
     354            0 :             error!(
     355            0 :                 "Failed to remove empty tenant directory '{}': {e:#}",
     356            0 :                 tenant_dir_path
     357            0 :             )
     358            0 :         }
     359            0 :         return Ok(None);
     360            0 :     }
     361              : 
     362            0 :     let tenant_shard_id = match tenant_dir_path
     363            0 :         .file_name()
     364            0 :         .unwrap_or_default()
     365            0 :         .parse::<TenantShardId>()
     366              :     {
     367            0 :         Ok(id) => id,
     368              :         Err(_) => {
     369            0 :             warn!("Invalid tenant path (garbage in our repo directory?): {tenant_dir_path}",);
     370            0 :             return Ok(None);
     371              :         }
     372              :     };
     373              : 
     374              :     // Clean up legacy `metadata` files.
     375              :     // Doing it here because every single tenant directory is visited here.
     376              :     // In any later code, there's different treatment of tenant dirs
     377              :     // ... depending on whether the tenant is in re-attach response or not
     378              :     // ... epending on whether the tenant is ignored or not
     379            0 :     assert_eq!(
     380            0 :         &conf.tenant_path(&tenant_shard_id),
     381            0 :         &tenant_dir_path,
     382            0 :         "later use of conf....path() methods would be dubious"
     383              :     );
     384            0 :     let timelines: Vec<TimelineId> = match conf.timelines_path(&tenant_shard_id).read_dir_utf8() {
     385            0 :         Ok(iter) => {
     386            0 :             let mut timelines = Vec::new();
     387            0 :             for res in iter {
     388            0 :                 let p = res?;
     389            0 :                 let Some(timeline_id) = p.file_name().parse::<TimelineId>().ok() else {
     390              :                     // skip any entries that aren't TimelineId, such as
     391              :                     // - *.___temp dirs
     392              :                     // - unfinished initdb uploads (test_non_uploaded_root_timeline_is_deleted_after_restart)
     393            0 :                     continue;
     394              :                 };
     395            0 :                 timelines.push(timeline_id);
     396              :             }
     397            0 :             timelines
     398              :         }
     399            0 :         Err(e) if e.kind() == std::io::ErrorKind::NotFound => vec![],
     400            0 :         Err(e) => return Err(anyhow::anyhow!(e)),
     401              :     };
     402            0 :     for timeline_id in timelines {
     403            0 :         let timeline_path = &conf.timeline_path(&tenant_shard_id, &timeline_id);
     404            0 :         let metadata_path = timeline_path.join(METADATA_FILE_NAME);
     405            0 :         match std::fs::remove_file(&metadata_path) {
     406              :             Ok(()) => {
     407            0 :                 crashsafe::fsync(timeline_path)
     408            0 :                     .context("fsync timeline dir after removing legacy metadata file")?;
     409            0 :                 info!("removed legacy metadata file at {metadata_path}");
     410              :             }
     411            0 :             Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
     412            0 :                 // something removed the file earlier, or it was never there
     413            0 :                 // We don't care, this software version doesn't write it again, so, we're good.
     414            0 :             }
     415            0 :             Err(e) => {
     416            0 :                 anyhow::bail!("remove legacy metadata file: {e}: {metadata_path}");
     417              :             }
     418              :         }
     419              :     }
     420              : 
     421            0 :     let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
     422            0 :     if tenant_ignore_mark_file.exists() {
     423            0 :         info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
     424            0 :         return Ok(None);
     425            0 :     }
     426            0 : 
     427            0 :     Ok(Some((
     428            0 :         tenant_shard_id,
     429            0 :         Tenant::load_tenant_config(conf, &tenant_shard_id),
     430            0 :     )))
     431            0 : }
     432              : 
     433              : /// Initial stage of load: walk the local tenants directory, clean up any temp files,
     434              : /// and load configurations for the tenants we found.
     435              : ///
     436              : /// Do this in parallel, because we expect 10k+ tenants, so serial execution can take
     437              : /// seconds even on reasonably fast drives.
     438            0 : async fn init_load_tenant_configs(
     439            0 :     conf: &'static PageServerConf,
     440            0 : ) -> anyhow::Result<HashMap<TenantShardId, anyhow::Result<LocationConf>>> {
     441            0 :     let tenants_dir = conf.tenants_path();
     442              : 
     443            0 :     let dentries = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<Utf8DirEntry>> {
     444            0 :         let dir_entries = tenants_dir
     445            0 :             .read_dir_utf8()
     446            0 :             .with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
     447              : 
     448            0 :         Ok(dir_entries.collect::<Result<Vec<_>, std::io::Error>>()?)
     449            0 :     })
     450            0 :     .await??;
     451              : 
     452            0 :     let mut configs = HashMap::new();
     453            0 : 
     454            0 :     let mut join_set = JoinSet::new();
     455            0 :     for dentry in dentries {
     456            0 :         join_set.spawn_blocking(move || load_tenant_config(conf, dentry));
     457            0 :     }
     458              : 
     459            0 :     while let Some(r) = join_set.join_next().await {
     460            0 :         if let Some((tenant_id, tenant_config)) = r?? {
     461            0 :             configs.insert(tenant_id, tenant_config);
     462            0 :         }
     463              :     }
     464              : 
     465            0 :     Ok(configs)
     466            0 : }
     467              : 
     468              : /// Initialize repositories with locally available timelines.
     469              : /// Timelines that are only partially available locally (remote storage has more data than this pageserver)
     470              : /// are scheduled for download and added to the tenant once download is completed.
     471            0 : #[instrument(skip_all)]
     472              : pub async fn init_tenant_mgr(
     473              :     conf: &'static PageServerConf,
     474              :     resources: TenantSharedResources,
     475              :     init_order: InitializationOrder,
     476              :     cancel: CancellationToken,
     477              : ) -> anyhow::Result<TenantManager> {
     478              :     let mut tenants = BTreeMap::new();
     479              : 
     480              :     let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
     481              : 
     482              :     // Scan local filesystem for attached tenants
     483              :     let tenant_configs = init_load_tenant_configs(conf).await?;
     484              : 
     485              :     // Determine which tenants are to be attached
     486              :     let tenant_generations =
     487              :         init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
     488              : 
     489            0 :     tracing::info!(
     490            0 :         "Attaching {} tenants at startup, warming up {} at a time",
     491            0 :         tenant_configs.len(),
     492            0 :         conf.concurrent_tenant_warmup.initial_permits()
     493            0 :     );
     494              :     TENANT.startup_scheduled.inc_by(tenant_configs.len() as u64);
     495              : 
     496              :     // Construct `Tenant` objects and start them running
     497              :     for (tenant_shard_id, location_conf) in tenant_configs {
     498              :         let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
     499              : 
     500              :         let mut location_conf = match location_conf {
     501              :             Ok(l) => l,
     502              :             Err(e) => {
     503            0 :                 warn!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Marking tenant broken, failed to {e:#}");
     504              : 
     505              :                 tenants.insert(
     506              :                     tenant_shard_id,
     507              :                     TenantSlot::Attached(Tenant::create_broken_tenant(
     508              :                         conf,
     509              :                         tenant_shard_id,
     510              :                         format!("{}", e),
     511              :                     )),
     512              :                 );
     513              :                 continue;
     514              :             }
     515              :         };
     516              : 
     517              :         let generation = if let Some(generations) = &tenant_generations {
     518              :             // We have a generation map: treat it as the authority for whether
     519              :             // this tenant is really attached.
     520              :             if let Some(gen) = generations.get(&tenant_shard_id) {
     521              :                 if let LocationMode::Attached(attached) = &location_conf.mode {
     522              :                     if attached.generation > *gen {
     523            0 :                         tracing::error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
     524            0 :                             "Control plane gave decreasing generation ({gen:?}) in re-attach response for tenant that was attached in generation {:?}, demoting to secondary",
     525            0 :                             attached.generation
     526            0 :                         );
     527              : 
     528              :                         // We cannot safely attach this tenant given a bogus generation number, but let's avoid throwing away
     529              :                         // local disk content: demote to secondary rather than detaching.
     530              :                         tenants.insert(
     531              :                             tenant_shard_id,
     532              :                             TenantSlot::Secondary(SecondaryTenant::new(
     533              :                                 tenant_shard_id,
     534              :                                 location_conf.shard,
     535              :                                 location_conf.tenant_conf.clone(),
     536              :                                 &SecondaryLocationConfig { warm: false },
     537              :                             )),
     538              :                         );
     539              :                     }
     540              :                 }
     541              :                 *gen
     542              :             } else {
     543              :                 match &location_conf.mode {
     544              :                     LocationMode::Secondary(secondary_config) => {
     545              :                         // We do not require the control plane's permission for secondary mode
     546              :                         // tenants, because they do no remote writes and hence require no
     547              :                         // generation number
     548            0 :                         info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Loaded tenant in secondary mode");
     549              :                         tenants.insert(
     550              :                             tenant_shard_id,
     551              :                             TenantSlot::Secondary(SecondaryTenant::new(
     552              :                                 tenant_shard_id,
     553              :                                 location_conf.shard,
     554              :                                 location_conf.tenant_conf,
     555              :                                 secondary_config,
     556              :                             )),
     557              :                         );
     558              :                     }
     559              :                     LocationMode::Attached(_) => {
     560              :                         // TODO: augment re-attach API to enable the control plane to
     561              :                         // instruct us about secondary attachments.  That way, instead of throwing
     562              :                         // away local state, we can gracefully fall back to secondary here, if the control
     563              :                         // plane tells us so.
     564              :                         // (https://github.com/neondatabase/neon/issues/5377)
     565            0 :                         info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Detaching tenant, control plane omitted it in re-attach response");
     566              :                         if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await {
     567            0 :                             error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
     568            0 :                                 "Failed to remove detached tenant directory '{tenant_dir_path}': {e:?}",
     569            0 :                             );
     570              :                         }
     571              :                     }
     572              :                 };
     573              : 
     574              :                 continue;
     575              :             }
     576              :         } else {
     577              :             // Legacy mode: no generation information, any tenant present
     578              :             // on local disk may activate
     579            0 :             info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Starting tenant in legacy mode, no generation",);
     580              :             Generation::none()
     581              :         };
     582              : 
     583              :         // Presence of a generation number implies attachment: attach the tenant
     584              :         // if it wasn't already, and apply the generation number.
     585              :         location_conf.attach_in_generation(generation);
     586              :         Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
     587              : 
     588              :         let shard_identity = location_conf.shard;
     589              :         match tenant_spawn(
     590              :             conf,
     591              :             tenant_shard_id,
     592              :             &tenant_dir_path,
     593              :             resources.clone(),
     594              :             AttachedTenantConf::try_from(location_conf)?,
     595              :             shard_identity,
     596              :             Some(init_order.clone()),
     597              :             &TENANTS,
     598              :             SpawnMode::Normal,
     599              :             &ctx,
     600              :         ) {
     601              :             Ok(tenant) => {
     602              :                 tenants.insert(tenant_shard_id, TenantSlot::Attached(tenant));
     603              :             }
     604              :             Err(e) => {
     605            0 :                 error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to start tenant: {e:#}");
     606              :             }
     607              :         }
     608              :     }
     609              : 
     610            0 :     info!("Processed {} local tenants at startup", tenants.len());
     611              : 
     612              :     let mut tenants_map = TENANTS.write().unwrap();
     613              :     assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
     614              :     METRICS.tenant_slots.set(tenants.len() as u64);
     615              :     *tenants_map = TenantsMap::Open(tenants);
     616              : 
     617              :     Ok(TenantManager {
     618              :         conf,
     619              :         tenants: &TENANTS,
     620              :         resources,
     621              :     })
     622              : }
     623              : 
     624              : /// Wrapper for Tenant::spawn that checks invariants before running, and inserts
     625              : /// a broken tenant in the map if Tenant::spawn fails.
     626              : #[allow(clippy::too_many_arguments)]
     627            0 : pub(crate) fn tenant_spawn(
     628            0 :     conf: &'static PageServerConf,
     629            0 :     tenant_shard_id: TenantShardId,
     630            0 :     tenant_path: &Utf8Path,
     631            0 :     resources: TenantSharedResources,
     632            0 :     location_conf: AttachedTenantConf,
     633            0 :     shard_identity: ShardIdentity,
     634            0 :     init_order: Option<InitializationOrder>,
     635            0 :     tenants: &'static std::sync::RwLock<TenantsMap>,
     636            0 :     mode: SpawnMode,
     637            0 :     ctx: &RequestContext,
     638            0 : ) -> anyhow::Result<Arc<Tenant>> {
     639            0 :     anyhow::ensure!(
     640            0 :         tenant_path.is_dir(),
     641            0 :         "Cannot load tenant from path {tenant_path:?}, it either does not exist or not a directory"
     642              :     );
     643            0 :     anyhow::ensure!(
     644            0 :         !crate::is_temporary(tenant_path),
     645            0 :         "Cannot load tenant from temporary path {tenant_path:?}"
     646              :     );
     647              :     anyhow::ensure!(
     648            0 :         !tenant_path.is_empty_dir().with_context(|| {
     649            0 :             format!("Failed to check whether {tenant_path:?} is an empty dir")
     650            0 :         })?,
     651            0 :         "Cannot load tenant from empty directory {tenant_path:?}"
     652              :     );
     653              : 
     654            0 :     let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
     655            0 :     anyhow::ensure!(
     656            0 :         !conf.tenant_ignore_mark_file_path(&tenant_shard_id).exists(),
     657            0 :         "Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
     658              :     );
     659              : 
     660            0 :     let tenant = match Tenant::spawn(
     661            0 :         conf,
     662            0 :         tenant_shard_id,
     663            0 :         resources,
     664            0 :         location_conf,
     665            0 :         shard_identity,
     666            0 :         init_order,
     667            0 :         tenants,
     668            0 :         mode,
     669            0 :         ctx,
     670            0 :     ) {
     671            0 :         Ok(tenant) => tenant,
     672            0 :         Err(e) => {
     673            0 :             error!("Failed to spawn tenant {tenant_shard_id}, reason: {e:#}");
     674            0 :             Tenant::create_broken_tenant(conf, tenant_shard_id, format!("{e:#}"))
     675              :         }
     676              :     };
     677              : 
     678            0 :     Ok(tenant)
     679            0 : }
     680              : 
     681              : ///
     682              : /// Shut down all tenants. This runs as part of pageserver shutdown.
     683              : ///
     684              : /// NB: We leave the tenants in the map, so that they remain accessible through
     685              : /// the management API until we shut it down. If we removed the shut-down tenants
     686              : /// from the tenants map, the management API would return 404 for these tenants,
     687              : /// because TenantsMap::get() now returns `None`.
     688              : /// That could be easily misinterpreted by control plane, the consumer of the
     689              : /// management API. For example, it could attach the tenant on a different pageserver.
     690              : /// We would then be in split-brain once this pageserver restarts.
     691            0 : #[instrument(skip_all)]
     692              : pub(crate) async fn shutdown_all_tenants() {
     693              :     shutdown_all_tenants0(&TENANTS).await
     694              : }
     695              : 
     696            2 : async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
     697            2 :     let mut join_set = JoinSet::new();
     698              : 
     699              :     // Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
     700            2 :     let (total_in_progress, total_attached) = {
     701            2 :         let mut m = tenants.write().unwrap();
     702            2 :         match &mut *m {
     703              :             TenantsMap::Initializing => {
     704            0 :                 *m = TenantsMap::ShuttingDown(BTreeMap::default());
     705            0 :                 info!("tenants map is empty");
     706            0 :                 return;
     707              :             }
     708            2 :             TenantsMap::Open(tenants) => {
     709            2 :                 let mut shutdown_state = BTreeMap::new();
     710            2 :                 let mut total_in_progress = 0;
     711            2 :                 let mut total_attached = 0;
     712              : 
     713            2 :                 for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
     714            2 :                     match v {
     715            0 :                         TenantSlot::Attached(t) => {
     716            0 :                             shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
     717            0 :                             join_set.spawn(
     718            0 :                                 async move {
     719            0 :                                     let freeze_and_flush = true;
     720              : 
     721            0 :                                     let res = {
     722            0 :                                         let (_guard, shutdown_progress) = completion::channel();
     723            0 :                                         t.shutdown(shutdown_progress, freeze_and_flush).await
     724              :                                     };
     725              : 
     726            0 :                                     if let Err(other_progress) = res {
     727              :                                         // join the another shutdown in progress
     728            0 :                                         other_progress.wait().await;
     729            0 :                                     }
     730              : 
     731              :                                     // we cannot afford per tenant logging here, because if s3 is degraded, we are
     732              :                                     // going to log too many lines
     733            0 :                                     debug!("tenant successfully stopped");
     734            0 :                                 }
     735            0 :                                 .instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())),
     736              :                             );
     737              : 
     738            0 :                             total_attached += 1;
     739              :                         }
     740            0 :                         TenantSlot::Secondary(state) => {
     741            0 :                             // We don't need to wait for this individually per-tenant: the
     742            0 :                             // downloader task will be waited on eventually, this cancel
     743            0 :                             // is just to encourage it to drop out if it is doing work
     744            0 :                             // for this tenant right now.
     745            0 :                             state.cancel.cancel();
     746            0 : 
     747            0 :                             shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state));
     748            0 :                         }
     749            2 :                         TenantSlot::InProgress(notify) => {
     750            2 :                             // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
     751            2 :                             // wait for their notifications to fire in this function.
     752            2 :                             join_set.spawn(async move {
     753            2 :                                 notify.wait().await;
     754            2 :                             });
     755            2 : 
     756            2 :                             total_in_progress += 1;
     757            2 :                         }
     758              :                     }
     759              :                 }
     760            2 :                 *m = TenantsMap::ShuttingDown(shutdown_state);
     761            2 :                 (total_in_progress, total_attached)
     762              :             }
     763              :             TenantsMap::ShuttingDown(_) => {
     764            0 :                 error!("already shutting down, this function isn't supposed to be called more than once");
     765            0 :                 return;
     766              :             }
     767              :         }
     768              :     };
     769              : 
     770            2 :     let started_at = std::time::Instant::now();
     771              : 
     772            2 :     info!(
     773            2 :         "Waiting for {} InProgress tenants and {} Attached tenants to shut down",
     774            2 :         total_in_progress, total_attached
     775            2 :     );
     776              : 
     777            2 :     let total = join_set.len();
     778            2 :     let mut panicked = 0;
     779            2 :     let mut buffering = true;
     780            2 :     const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
     781            2 :     let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
     782              : 
     783            6 :     while !join_set.is_empty() {
     784            4 :         tokio::select! {
     785            2 :             Some(joined) = join_set.join_next() => {
     786              :                 match joined {
     787              :                     Ok(()) => {},
     788              :                     Err(join_error) if join_error.is_cancelled() => {
     789              :                         unreachable!("we are not cancelling any of the tasks");
     790              :                     }
     791              :                     Err(join_error) if join_error.is_panic() => {
     792              :                         // cannot really do anything, as this panic is likely a bug
     793              :                         panicked += 1;
     794              :                     }
     795              :                     Err(join_error) => {
     796            0 :                         warn!("unknown kind of JoinError: {join_error}");
     797              :                     }
     798              :                 }
     799              :                 if !buffering {
     800              :                     // buffer so that every 500ms since the first update (or starting) we'll log
     801              :                     // how far away we are; this is because we will get SIGKILL'd at 10s, and we
     802              :                     // are not able to log *then*.
     803              :                     buffering = true;
     804              :                     buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
     805              :                 }
     806              :             },
     807              :             _ = &mut buffered, if buffering => {
     808              :                 buffering = false;
     809            2 :                 info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
     810              :             }
     811              :         }
     812              :     }
     813              : 
     814            2 :     if panicked > 0 {
     815            0 :         warn!(
     816            0 :             panicked,
     817            0 :             total, "observed panicks while shutting down tenants"
     818            0 :         );
     819            2 :     }
     820              : 
     821              :     // caller will log how long we took
     822            2 : }
     823              : 
     824            0 : #[derive(Debug, thiserror::Error)]
     825              : pub(crate) enum SetNewTenantConfigError {
     826              :     #[error(transparent)]
     827              :     GetTenant(#[from] GetTenantError),
     828              :     #[error(transparent)]
     829              :     Persist(anyhow::Error),
     830              :     #[error(transparent)]
     831              :     Other(anyhow::Error),
     832              : }
     833              : 
     834            0 : pub(crate) async fn set_new_tenant_config(
     835            0 :     conf: &'static PageServerConf,
     836            0 :     new_tenant_conf: TenantConfOpt,
     837            0 :     tenant_id: TenantId,
     838            0 : ) -> Result<(), SetNewTenantConfigError> {
     839            0 :     // Legacy API: does not support sharding
     840            0 :     let tenant_shard_id = TenantShardId::unsharded(tenant_id);
     841              : 
     842            0 :     info!("configuring tenant {tenant_id}");
     843            0 :     let tenant = get_tenant(tenant_shard_id, true)?;
     844              : 
     845            0 :     if !tenant.tenant_shard_id().shard_count.is_unsharded() {
     846              :         // Note that we use ShardParameters::default below.
     847            0 :         return Err(SetNewTenantConfigError::Other(anyhow::anyhow!(
     848            0 :             "This API may only be used on single-sharded tenants, use the /location_config API for sharded tenants"
     849            0 :         )));
     850            0 :     }
     851            0 : 
     852            0 :     // This is a legacy API that only operates on attached tenants: the preferred
     853            0 :     // API to use is the location_config/ endpoint, which lets the caller provide
     854            0 :     // the full LocationConf.
     855            0 :     let location_conf = LocationConf::attached_single(
     856            0 :         new_tenant_conf.clone(),
     857            0 :         tenant.generation,
     858            0 :         &ShardParameters::default(),
     859            0 :     );
     860            0 : 
     861            0 :     Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf)
     862            0 :         .await
     863            0 :         .map_err(SetNewTenantConfigError::Persist)?;
     864            0 :     tenant.set_new_tenant_config(new_tenant_conf);
     865            0 :     Ok(())
     866            0 : }
     867              : 
     868            0 : #[derive(thiserror::Error, Debug)]
     869              : pub(crate) enum UpsertLocationError {
     870              :     #[error("Bad config request: {0}")]
     871              :     BadRequest(anyhow::Error),
     872              : 
     873              :     #[error("Cannot change config in this state: {0}")]
     874              :     Unavailable(#[from] TenantMapError),
     875              : 
     876              :     #[error("Tenant is already being modified")]
     877              :     InProgress,
     878              : 
     879              :     #[error("Failed to flush: {0}")]
     880              :     Flush(anyhow::Error),
     881              : 
     882              :     #[error("Internal error: {0}")]
     883              :     Other(#[from] anyhow::Error),
     884              : }
     885              : 
     886              : impl TenantManager {
     887              :     /// Convenience function so that anyone with a TenantManager can get at the global configuration, without
     888              :     /// having to pass it around everywhere as a separate object.
     889            0 :     pub(crate) fn get_conf(&self) -> &'static PageServerConf {
     890            0 :         self.conf
     891            0 :     }
     892              : 
     893              :     /// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or is not fitting to the query.
     894              :     /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
     895            0 :     pub(crate) fn get_attached_tenant_shard(
     896            0 :         &self,
     897            0 :         tenant_shard_id: TenantShardId,
     898            0 :         active_only: bool,
     899            0 :     ) -> Result<Arc<Tenant>, GetTenantError> {
     900            0 :         let locked = self.tenants.read().unwrap();
     901              : 
     902            0 :         let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
     903              : 
     904            0 :         match peek_slot {
     905            0 :             Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
     906              :                 TenantState::Broken {
     907            0 :                     reason,
     908            0 :                     backtrace: _,
     909            0 :                 } if active_only => Err(GetTenantError::Broken(reason)),
     910            0 :                 TenantState::Active => Ok(Arc::clone(tenant)),
     911              :                 _ => {
     912            0 :                     if active_only {
     913            0 :                         Err(GetTenantError::NotActive(tenant_shard_id))
     914              :                     } else {
     915            0 :                         Ok(Arc::clone(tenant))
     916              :                     }
     917              :                 }
     918              :             },
     919            0 :             Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
     920              :             None | Some(TenantSlot::Secondary(_)) => {
     921            0 :                 Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
     922              :             }
     923              :         }
     924            0 :     }
     925              : 
     926            0 :     pub(crate) fn get_secondary_tenant_shard(
     927            0 :         &self,
     928            0 :         tenant_shard_id: TenantShardId,
     929            0 :     ) -> Option<Arc<SecondaryTenant>> {
     930            0 :         let locked = self.tenants.read().unwrap();
     931            0 : 
     932            0 :         let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
     933            0 :             .ok()
     934            0 :             .flatten();
     935              : 
     936            0 :         match peek_slot {
     937            0 :             Some(TenantSlot::Secondary(s)) => Some(s.clone()),
     938            0 :             _ => None,
     939              :         }
     940            0 :     }
     941              : 
     942              :     /// Whether the `TenantManager` is responsible for the tenant shard
     943            0 :     pub(crate) fn manages_tenant_shard(&self, tenant_shard_id: TenantShardId) -> bool {
     944            0 :         let locked = self.tenants.read().unwrap();
     945            0 : 
     946            0 :         let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
     947            0 :             .ok()
     948            0 :             .flatten();
     949            0 : 
     950            0 :         peek_slot.is_some()
     951            0 :     }
     952              : 
     953            0 :     #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
     954              :     pub(crate) async fn upsert_location(
     955              :         &self,
     956              :         tenant_shard_id: TenantShardId,
     957              :         new_location_config: LocationConf,
     958              :         flush: Option<Duration>,
     959              :         mut spawn_mode: SpawnMode,
     960              :         ctx: &RequestContext,
     961              :     ) -> Result<Option<Arc<Tenant>>, UpsertLocationError> {
     962              :         debug_assert_current_span_has_tenant_id();
     963            0 :         info!("configuring tenant location to state {new_location_config:?}");
     964              : 
     965              :         enum FastPathModified {
     966              :             Attached(Arc<Tenant>),
     967              :             Secondary(Arc<SecondaryTenant>),
     968              :         }
     969              : 
     970              :         // Special case fast-path for updates to existing slots: if our upsert is only updating configuration,
     971              :         // then we do not need to set the slot to InProgress, we can just call into the
     972              :         // existng tenant.
     973              :         let fast_path_taken = {
     974              :             let locked = self.tenants.read().unwrap();
     975              :             let peek_slot =
     976              :                 tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
     977              :             match (&new_location_config.mode, peek_slot) {
     978              :                 (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
     979              :                     match attach_conf.generation.cmp(&tenant.generation) {
     980              :                         Ordering::Equal => {
     981              :                             // A transition from Attached to Attached in the same generation, we may
     982              :                             // take our fast path and just provide the updated configuration
     983              :                             // to the tenant.
     984              :                             tenant.set_new_location_config(
     985              :                                 AttachedTenantConf::try_from(new_location_config.clone())
     986              :                                     .map_err(UpsertLocationError::BadRequest)?,
     987              :                             );
     988              : 
     989              :                             Some(FastPathModified::Attached(tenant.clone()))
     990              :                         }
     991              :                         Ordering::Less => {
     992              :                             return Err(UpsertLocationError::BadRequest(anyhow::anyhow!(
     993              :                                 "Generation {:?} is less than existing {:?}",
     994              :                                 attach_conf.generation,
     995              :                                 tenant.generation
     996              :                             )));
     997              :                         }
     998              :                         Ordering::Greater => {
     999              :                             // Generation advanced, fall through to general case of replacing `Tenant` object
    1000              :                             None
    1001              :                         }
    1002              :                     }
    1003              :                 }
    1004              :                 (
    1005              :                     LocationMode::Secondary(secondary_conf),
    1006              :                     Some(TenantSlot::Secondary(secondary_tenant)),
    1007              :                 ) => {
    1008              :                     secondary_tenant.set_config(secondary_conf);
    1009              :                     secondary_tenant.set_tenant_conf(&new_location_config.tenant_conf);
    1010              :                     Some(FastPathModified::Secondary(secondary_tenant.clone()))
    1011              :                 }
    1012              :                 _ => {
    1013              :                     // Not an Attached->Attached transition, fall through to general case
    1014              :                     None
    1015              :                 }
    1016              :             }
    1017              :         };
    1018              : 
    1019              :         // Fast-path continued: having dropped out of the self.tenants lock, do the async
    1020              :         // phase of writing config and/or waiting for flush, before returning.
    1021              :         match fast_path_taken {
    1022              :             Some(FastPathModified::Attached(tenant)) => {
    1023              :                 Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
    1024              :                     .await?;
    1025              : 
    1026              :                 // Transition to AttachedStale means we may well hold a valid generation
    1027              :                 // still, and have been requested to go stale as part of a migration.  If
    1028              :                 // the caller set `flush`, then flush to remote storage.
    1029              :                 if let LocationMode::Attached(AttachedLocationConfig {
    1030              :                     generation: _,
    1031              :                     attach_mode: AttachmentMode::Stale,
    1032              :                 }) = &new_location_config.mode
    1033              :                 {
    1034              :                     if let Some(flush_timeout) = flush {
    1035              :                         match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
    1036              :                             Ok(Err(e)) => {
    1037              :                                 return Err(UpsertLocationError::Flush(e));
    1038              :                             }
    1039              :                             Ok(Ok(_)) => return Ok(Some(tenant)),
    1040              :                             Err(_) => {
    1041            0 :                                 tracing::warn!(
    1042            0 :                                 timeout_ms = flush_timeout.as_millis(),
    1043            0 :                                 "Timed out waiting for flush to remote storage, proceeding anyway."
    1044            0 :                             )
    1045              :                             }
    1046              :                         }
    1047              :                     }
    1048              :                 }
    1049              : 
    1050              :                 return Ok(Some(tenant));
    1051              :             }
    1052              :             Some(FastPathModified::Secondary(_secondary_tenant)) => {
    1053              :                 Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
    1054              :                     .await?;
    1055              : 
    1056              :                 return Ok(None);
    1057              :             }
    1058              :             None => {
    1059              :                 // Proceed with the general case procedure, where we will shutdown & remove any existing
    1060              :                 // slot contents and replace with a fresh one
    1061              :             }
    1062              :         };
    1063              : 
    1064              :         // General case for upserts to TenantsMap, excluding the case above: we will substitute an
    1065              :         // InProgress value to the slot while we make whatever changes are required.  The state for
    1066              :         // the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
    1067              :         // the state is ill-defined while we're in transition.  Transitions are async, but fast: we do
    1068              :         // not do significant I/O, and shutdowns should be prompt via cancellation tokens.
    1069              :         let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)
    1070            0 :             .map_err(|e| match e {
    1071              :                 TenantSlotError::AlreadyExists(_, _) | TenantSlotError::NotFound(_) => {
    1072            0 :                     unreachable!("Called with mode Any")
    1073              :                 }
    1074            0 :                 TenantSlotError::InProgress => UpsertLocationError::InProgress,
    1075            0 :                 TenantSlotError::MapState(s) => UpsertLocationError::Unavailable(s),
    1076            0 :             })?;
    1077              : 
    1078              :         match slot_guard.get_old_value() {
    1079              :             Some(TenantSlot::Attached(tenant)) => {
    1080              :                 // The case where we keep a Tenant alive was covered above in the special case
    1081              :                 // for Attached->Attached transitions in the same generation.  By this point,
    1082              :                 // if we see an attached tenant we know it will be discarded and should be
    1083              :                 // shut down.
    1084              :                 let (_guard, progress) = utils::completion::channel();
    1085              : 
    1086              :                 match tenant.get_attach_mode() {
    1087              :                     AttachmentMode::Single | AttachmentMode::Multi => {
    1088              :                         // Before we leave our state as the presumed holder of the latest generation,
    1089              :                         // flush any outstanding deletions to reduce the risk of leaking objects.
    1090              :                         self.resources.deletion_queue_client.flush_advisory()
    1091              :                     }
    1092              :                     AttachmentMode::Stale => {
    1093              :                         // If we're stale there's not point trying to flush deletions
    1094              :                     }
    1095              :                 };
    1096              : 
    1097            0 :                 info!("Shutting down attached tenant");
    1098              :                 match tenant.shutdown(progress, false).await {
    1099              :                     Ok(()) => {}
    1100              :                     Err(barrier) => {
    1101            0 :                         info!("Shutdown already in progress, waiting for it to complete");
    1102              :                         barrier.wait().await;
    1103              :                     }
    1104              :                 }
    1105              :                 slot_guard.drop_old_value().expect("We just shut it down");
    1106              : 
    1107              :                 // Edge case: if we were called with SpawnMode::Create, but a Tenant already existed, then
    1108              :                 // the caller thinks they're creating but the tenant already existed.  We must switch to
    1109              :                 // Normal mode so that when starting this Tenant we properly probe remote storage for timelines,
    1110              :                 // rather than assuming it to be empty.
    1111              :                 spawn_mode = SpawnMode::Normal;
    1112              :             }
    1113              :             Some(TenantSlot::Secondary(state)) => {
    1114            0 :                 info!("Shutting down secondary tenant");
    1115              :                 state.shutdown().await;
    1116              :             }
    1117              :             Some(TenantSlot::InProgress(_)) => {
    1118              :                 // This should never happen: acquire_slot should error out
    1119              :                 // if the contents of a slot were InProgress.
    1120              :                 return Err(UpsertLocationError::Other(anyhow::anyhow!(
    1121              :                     "Acquired an InProgress slot, this is a bug."
    1122              :                 )));
    1123              :             }
    1124              :             None => {
    1125              :                 // Slot was vacant, nothing needs shutting down.
    1126              :             }
    1127              :         }
    1128              : 
    1129              :         let tenant_path = self.conf.tenant_path(&tenant_shard_id);
    1130              :         let timelines_path = self.conf.timelines_path(&tenant_shard_id);
    1131              : 
    1132              :         // Directory structure is the same for attached and secondary modes:
    1133              :         // create it if it doesn't exist.  Timeline load/creation expects the
    1134              :         // timelines/ subdir to already exist.
    1135              :         //
    1136              :         // Does not need to be fsync'd because local storage is just a cache.
    1137              :         tokio::fs::create_dir_all(&timelines_path)
    1138              :             .await
    1139            0 :             .with_context(|| format!("Creating {timelines_path}"))?;
    1140              : 
    1141              :         // Before activating either secondary or attached mode, persist the
    1142              :         // configuration, so that on restart we will re-attach (or re-start
    1143              :         // secondary) on the tenant.
    1144              :         Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config).await?;
    1145              : 
    1146              :         let new_slot = match &new_location_config.mode {
    1147              :             LocationMode::Secondary(secondary_config) => {
    1148              :                 let shard_identity = new_location_config.shard;
    1149              :                 TenantSlot::Secondary(SecondaryTenant::new(
    1150              :                     tenant_shard_id,
    1151              :                     shard_identity,
    1152              :                     new_location_config.tenant_conf,
    1153              :                     secondary_config,
    1154              :                 ))
    1155              :             }
    1156              :             LocationMode::Attached(_attach_config) => {
    1157              :                 let shard_identity = new_location_config.shard;
    1158              : 
    1159              :                 // Testing hack: if we are configured with no control plane, then drop the generation
    1160              :                 // from upserts.  This enables creating generation-less tenants even though neon_local
    1161              :                 // always uses generations when calling the location conf API.
    1162              :                 let attached_conf = if cfg!(feature = "testing") {
    1163              :                     let mut conf = AttachedTenantConf::try_from(new_location_config)?;
    1164              :                     if self.conf.control_plane_api.is_none() {
    1165              :                         conf.location.generation = Generation::none();
    1166              :                     }
    1167              :                     conf
    1168              :                 } else {
    1169              :                     AttachedTenantConf::try_from(new_location_config)?
    1170              :                 };
    1171              : 
    1172              :                 let tenant = tenant_spawn(
    1173              :                     self.conf,
    1174              :                     tenant_shard_id,
    1175              :                     &tenant_path,
    1176              :                     self.resources.clone(),
    1177              :                     attached_conf,
    1178              :                     shard_identity,
    1179              :                     None,
    1180              :                     self.tenants,
    1181              :                     spawn_mode,
    1182              :                     ctx,
    1183              :                 )?;
    1184              : 
    1185              :                 TenantSlot::Attached(tenant)
    1186              :             }
    1187              :         };
    1188              : 
    1189              :         let attached_tenant = if let TenantSlot::Attached(tenant) = &new_slot {
    1190              :             Some(tenant.clone())
    1191              :         } else {
    1192              :             None
    1193              :         };
    1194              : 
    1195              :         match slot_guard.upsert(new_slot) {
    1196              :             Err(TenantSlotUpsertError::InternalError(e)) => {
    1197              :                 Err(UpsertLocationError::Other(anyhow::anyhow!(e)))
    1198              :             }
    1199              :             Err(TenantSlotUpsertError::MapState(e)) => Err(UpsertLocationError::Unavailable(e)),
    1200              :             Err(TenantSlotUpsertError::ShuttingDown((new_slot, _completion))) => {
    1201              :                 // If we just called tenant_spawn() on a new tenant, and can't insert it into our map, then
    1202              :                 // we must not leak it: this would violate the invariant that after shutdown_all_tenants, all tenants
    1203              :                 // are shutdown.
    1204              :                 //
    1205              :                 // We must shut it down inline here.
    1206              :                 match new_slot {
    1207              :                     TenantSlot::InProgress(_) => {
    1208              :                         // Unreachable because we never insert an InProgress
    1209              :                         unreachable!()
    1210              :                     }
    1211              :                     TenantSlot::Attached(tenant) => {
    1212              :                         let (_guard, progress) = utils::completion::channel();
    1213            0 :                         info!("Shutting down just-spawned tenant, because tenant manager is shut down");
    1214              :                         match tenant.shutdown(progress, false).await {
    1215              :                             Ok(()) => {
    1216            0 :                                 info!("Finished shutting down just-spawned tenant");
    1217              :                             }
    1218              :                             Err(barrier) => {
    1219            0 :                                 info!("Shutdown already in progress, waiting for it to complete");
    1220              :                                 barrier.wait().await;
    1221              :                             }
    1222              :                         }
    1223              :                     }
    1224              :                     TenantSlot::Secondary(secondary_tenant) => {
    1225              :                         secondary_tenant.shutdown().await;
    1226              :                     }
    1227              :                 }
    1228              : 
    1229              :                 Err(UpsertLocationError::Unavailable(
    1230              :                     TenantMapError::ShuttingDown,
    1231              :                 ))
    1232              :             }
    1233              :             Ok(()) => Ok(attached_tenant),
    1234              :         }
    1235              :     }
    1236              : 
    1237              :     /// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
    1238              :     /// LocationConf that was last used to attach it.  Optionally, the local file cache may be
    1239              :     /// dropped before re-attaching.
    1240              :     ///
    1241              :     /// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
    1242              :     /// where an issue is identified that would go away with a restart of the tenant.
    1243              :     ///
    1244              :     /// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
    1245              :     /// to respect the cancellation tokens used in normal shutdown().
    1246            0 :     #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
    1247              :     pub(crate) async fn reset_tenant(
    1248              :         &self,
    1249              :         tenant_shard_id: TenantShardId,
    1250              :         drop_cache: bool,
    1251              :         ctx: &RequestContext,
    1252              :     ) -> anyhow::Result<()> {
    1253              :         let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
    1254              :         let Some(old_slot) = slot_guard.get_old_value() else {
    1255              :             anyhow::bail!("Tenant not found when trying to reset");
    1256              :         };
    1257              : 
    1258              :         let Some(tenant) = old_slot.get_attached() else {
    1259              :             slot_guard.revert();
    1260              :             anyhow::bail!("Tenant is not in attached state");
    1261              :         };
    1262              : 
    1263              :         let (_guard, progress) = utils::completion::channel();
    1264              :         match tenant.shutdown(progress, false).await {
    1265              :             Ok(()) => {
    1266              :                 slot_guard.drop_old_value()?;
    1267              :             }
    1268              :             Err(_barrier) => {
    1269              :                 slot_guard.revert();
    1270              :                 anyhow::bail!("Cannot reset Tenant, already shutting down");
    1271              :             }
    1272              :         }
    1273              : 
    1274              :         let tenant_path = self.conf.tenant_path(&tenant_shard_id);
    1275              :         let timelines_path = self.conf.timelines_path(&tenant_shard_id);
    1276              :         let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
    1277              : 
    1278              :         if drop_cache {
    1279            0 :             tracing::info!("Dropping local file cache");
    1280              : 
    1281              :             match tokio::fs::read_dir(&timelines_path).await {
    1282              :                 Err(e) => {
    1283            0 :                     tracing::warn!("Failed to list timelines while dropping cache: {}", e);
    1284              :                 }
    1285              :                 Ok(mut entries) => {
    1286              :                     while let Some(entry) = entries.next_entry().await? {
    1287              :                         tokio::fs::remove_dir_all(entry.path()).await?;
    1288              :                     }
    1289              :                 }
    1290              :             }
    1291              :         }
    1292              : 
    1293              :         let shard_identity = config.shard;
    1294              :         let tenant = tenant_spawn(
    1295              :             self.conf,
    1296              :             tenant_shard_id,
    1297              :             &tenant_path,
    1298              :             self.resources.clone(),
    1299              :             AttachedTenantConf::try_from(config)?,
    1300              :             shard_identity,
    1301              :             None,
    1302              :             self.tenants,
    1303              :             SpawnMode::Normal,
    1304              :             ctx,
    1305              :         )?;
    1306              : 
    1307              :         slot_guard.upsert(TenantSlot::Attached(tenant))?;
    1308              : 
    1309              :         Ok(())
    1310              :     }
    1311              : 
    1312            0 :     pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec<Arc<Tenant>> {
    1313            0 :         let locked = self.tenants.read().unwrap();
    1314            0 :         match &*locked {
    1315            0 :             TenantsMap::Initializing => Vec::new(),
    1316            0 :             TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => map
    1317            0 :                 .values()
    1318            0 :                 .filter_map(|slot| {
    1319            0 :                     slot.get_attached()
    1320            0 :                         .and_then(|t| if t.is_active() { Some(t.clone()) } else { None })
    1321            0 :                 })
    1322            0 :                 .collect(),
    1323              :         }
    1324            0 :     }
    1325              :     // Do some synchronous work for all tenant slots in Secondary state.  The provided
    1326              :     // callback should be small and fast, as it will be called inside the global
    1327              :     // TenantsMap lock.
    1328            0 :     pub(crate) fn foreach_secondary_tenants<F>(&self, mut func: F)
    1329            0 :     where
    1330            0 :         // TODO: let the callback return a hint to drop out of the loop early
    1331            0 :         F: FnMut(&TenantShardId, &Arc<SecondaryTenant>),
    1332            0 :     {
    1333            0 :         let locked = self.tenants.read().unwrap();
    1334              : 
    1335            0 :         let map = match &*locked {
    1336            0 :             TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
    1337            0 :             TenantsMap::Open(m) => m,
    1338              :         };
    1339              : 
    1340            0 :         for (tenant_id, slot) in map {
    1341            0 :             if let TenantSlot::Secondary(state) = slot {
    1342              :                 // Only expose secondary tenants that are not currently shutting down
    1343            0 :                 if !state.cancel.is_cancelled() {
    1344            0 :                     func(tenant_id, state)
    1345            0 :                 }
    1346            0 :             }
    1347              :         }
    1348            0 :     }
    1349              : 
    1350              :     /// Total list of all tenant slots: this includes attached, secondary, and InProgress.
    1351            0 :     pub(crate) fn list(&self) -> Vec<(TenantShardId, TenantSlot)> {
    1352            0 :         let locked = self.tenants.read().unwrap();
    1353            0 :         match &*locked {
    1354            0 :             TenantsMap::Initializing => Vec::new(),
    1355            0 :             TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
    1356            0 :                 map.iter().map(|(k, v)| (*k, v.clone())).collect()
    1357              :             }
    1358              :         }
    1359            0 :     }
    1360              : 
    1361            0 :     pub(crate) async fn delete_tenant(
    1362            0 :         &self,
    1363            0 :         tenant_shard_id: TenantShardId,
    1364            0 :         activation_timeout: Duration,
    1365            0 :     ) -> Result<(), DeleteTenantError> {
    1366            0 :         super::span::debug_assert_current_span_has_tenant_id();
    1367              :         // We acquire a SlotGuard during this function to protect against concurrent
    1368              :         // changes while the ::prepare phase of DeleteTenantFlow executes, but then
    1369              :         // have to return the Tenant to the map while the background deletion runs.
    1370              :         //
    1371              :         // TODO: refactor deletion to happen outside the lifetime of a Tenant.
    1372              :         // Currently, deletion requires a reference to the tenants map in order to
    1373              :         // keep the Tenant in the map until deletion is complete, and then remove
    1374              :         // it at the end.
    1375              :         //
    1376              :         // See https://github.com/neondatabase/neon/issues/5080
    1377              : 
    1378            0 :         let slot_guard =
    1379            0 :             tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
    1380              : 
    1381              :         // unwrap is safe because we used MustExist mode when acquiring
    1382            0 :         let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
    1383            0 :             TenantSlot::Attached(tenant) => tenant.clone(),
    1384              :             _ => {
    1385              :                 // Express "not attached" as equivalent to "not found"
    1386            0 :                 return Err(DeleteTenantError::NotAttached);
    1387              :             }
    1388              :         };
    1389              : 
    1390            0 :         match tenant.current_state() {
    1391            0 :             TenantState::Broken { .. } | TenantState::Stopping { .. } => {
    1392            0 :                 // If a tenant is broken or stopping, DeleteTenantFlow can
    1393            0 :                 // handle it: broken tenants proceed to delete, stopping tenants
    1394            0 :                 // are checked for deletion already in progress.
    1395            0 :             }
    1396              :             _ => {
    1397            0 :                 tenant
    1398            0 :                     .wait_to_become_active(activation_timeout)
    1399            0 :                     .await
    1400            0 :                     .map_err(|e| match e {
    1401              :                         GetActiveTenantError::WillNotBecomeActive(_) => {
    1402            0 :                             DeleteTenantError::InvalidState(tenant.current_state())
    1403              :                         }
    1404            0 :                         GetActiveTenantError::Cancelled => DeleteTenantError::Cancelled,
    1405            0 :                         GetActiveTenantError::NotFound(_) => DeleteTenantError::NotAttached,
    1406              :                         GetActiveTenantError::WaitForActiveTimeout {
    1407            0 :                             latest_state: _latest_state,
    1408            0 :                             wait_time: _wait_time,
    1409            0 :                         } => DeleteTenantError::InvalidState(tenant.current_state()),
    1410            0 :                     })?;
    1411              :             }
    1412              :         }
    1413              : 
    1414            0 :         let result = DeleteTenantFlow::run(
    1415            0 :             self.conf,
    1416            0 :             self.resources.remote_storage.clone(),
    1417            0 :             &TENANTS,
    1418            0 :             tenant,
    1419            0 :         )
    1420            0 :         .await;
    1421              : 
    1422              :         // The Tenant goes back into the map in Stopping state, it will eventually be removed by DeleteTenantFLow
    1423            0 :         slot_guard.revert();
    1424            0 :         result
    1425            0 :     }
    1426              : 
    1427            0 :     #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), new_shard_count=%new_shard_count.literal()))]
    1428              :     pub(crate) async fn shard_split(
    1429              :         &self,
    1430              :         tenant_shard_id: TenantShardId,
    1431              :         new_shard_count: ShardCount,
    1432              :         ctx: &RequestContext,
    1433              :     ) -> anyhow::Result<Vec<TenantShardId>> {
    1434              :         let tenant = get_tenant(tenant_shard_id, true)?;
    1435              : 
    1436              :         // Plan: identify what the new child shards will be
    1437              :         if new_shard_count.count() <= tenant_shard_id.shard_count.count() {
    1438              :             anyhow::bail!("Requested shard count is not an increase");
    1439              :         }
    1440              :         let expansion_factor = new_shard_count.count() / tenant_shard_id.shard_count.count();
    1441              :         if !expansion_factor.is_power_of_two() {
    1442              :             anyhow::bail!("Requested split is not a power of two");
    1443              :         }
    1444              : 
    1445              :         let parent_shard_identity = tenant.shard_identity;
    1446              :         let parent_tenant_conf = tenant.get_tenant_conf();
    1447              :         let parent_generation = tenant.generation;
    1448              : 
    1449              :         let child_shards = tenant_shard_id.split(new_shard_count);
    1450            0 :         tracing::info!(
    1451            0 :             "Shard {} splits into: {}",
    1452            0 :             tenant_shard_id.to_index(),
    1453            0 :             child_shards
    1454            0 :                 .iter()
    1455            0 :                 .map(|id| format!("{}", id.to_index()))
    1456            0 :                 .join(",")
    1457            0 :         );
    1458              : 
    1459              :         // Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
    1460              :         if let Err(e) = tenant.split_prepare(&child_shards).await {
    1461              :             // If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
    1462              :             // have been left in a partially-shut-down state.
    1463            0 :             tracing::warn!("Failed to prepare for split: {e}, reloading Tenant before returning");
    1464              :             self.reset_tenant(tenant_shard_id, false, ctx).await?;
    1465              :             return Err(e);
    1466              :         }
    1467              : 
    1468              :         self.resources.deletion_queue_client.flush_advisory();
    1469              : 
    1470              :         // Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
    1471              :         drop(tenant);
    1472              :         let mut parent_slot_guard =
    1473              :             tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
    1474              :         let parent = match parent_slot_guard.get_old_value() {
    1475              :             Some(TenantSlot::Attached(t)) => t,
    1476              :             Some(TenantSlot::Secondary(_)) => anyhow::bail!("Tenant location in secondary mode"),
    1477              :             Some(TenantSlot::InProgress(_)) => {
    1478              :                 // tenant_map_acquire_slot never returns InProgress, if a slot was InProgress
    1479              :                 // it would return an error.
    1480              :                 unreachable!()
    1481              :             }
    1482              :             None => {
    1483              :                 // We don't actually need the parent shard to still be attached to do our work, but it's
    1484              :                 // a weird enough situation that the caller probably didn't want us to continue working
    1485              :                 // if they had detached the tenant they requested the split on.
    1486              :                 anyhow::bail!("Detached parent shard in the middle of split!")
    1487              :             }
    1488              :         };
    1489              : 
    1490              :         // Optimization: hardlink layers from the parent into the children, so that they don't have to
    1491              :         // re-download & duplicate the data referenced in their initial IndexPart
    1492              :         self.shard_split_hardlink(parent, child_shards.clone())
    1493              :             .await?;
    1494              : 
    1495              :         // Take a snapshot of where the parent's WAL ingest had got to: we will wait for
    1496              :         // child shards to reach this point.
    1497              :         let mut target_lsns = HashMap::new();
    1498              :         for timeline in parent.timelines.lock().unwrap().clone().values() {
    1499              :             target_lsns.insert(timeline.timeline_id, timeline.get_last_record_lsn());
    1500              :         }
    1501              : 
    1502              :         // TODO: we should have the parent shard stop its WAL ingest here, it's a waste of resources
    1503              :         // and could slow down the children trying to catch up.
    1504              : 
    1505              :         // Phase 3: Spawn the child shards
    1506              :         for child_shard in &child_shards {
    1507              :             let mut child_shard_identity = parent_shard_identity;
    1508              :             child_shard_identity.count = child_shard.shard_count;
    1509              :             child_shard_identity.number = child_shard.shard_number;
    1510              : 
    1511              :             let child_location_conf = LocationConf {
    1512              :                 mode: LocationMode::Attached(AttachedLocationConfig {
    1513              :                     generation: parent_generation,
    1514              :                     attach_mode: AttachmentMode::Single,
    1515              :                 }),
    1516              :                 shard: child_shard_identity,
    1517              :                 tenant_conf: parent_tenant_conf.clone(),
    1518              :             };
    1519              : 
    1520              :             self.upsert_location(
    1521              :                 *child_shard,
    1522              :                 child_location_conf,
    1523              :                 None,
    1524              :                 SpawnMode::Normal,
    1525              :                 ctx,
    1526              :             )
    1527              :             .await?;
    1528              :         }
    1529              : 
    1530              :         // Phase 4: wait for child chards WAL ingest to catch up to target LSN
    1531              :         for child_shard_id in &child_shards {
    1532              :             let child_shard_id = *child_shard_id;
    1533              :             let child_shard = {
    1534              :                 let locked = TENANTS.read().unwrap();
    1535              :                 let peek_slot =
    1536              :                     tenant_map_peek_slot(&locked, &child_shard_id, TenantSlotPeekMode::Read)?;
    1537            0 :                 peek_slot.and_then(|s| s.get_attached()).cloned()
    1538              :             };
    1539              :             if let Some(t) = child_shard {
    1540              :                 // Wait for the child shard to become active: this should be very quick because it only
    1541              :                 // has to download the index_part that we just uploaded when creating it.
    1542              :                 if let Err(e) = t.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await {
    1543              :                     // This is not fatal: we have durably created the child shard.  It just makes the
    1544              :                     // split operation less seamless for clients, as we will may detach the parent
    1545              :                     // shard before the child shards are fully ready to serve requests.
    1546            0 :                     tracing::warn!("Failed to wait for shard {child_shard_id} to activate: {e}");
    1547              :                     continue;
    1548              :                 }
    1549              : 
    1550              :                 let timelines = t.timelines.lock().unwrap().clone();
    1551              :                 for timeline in timelines.values() {
    1552              :                     let Some(target_lsn) = target_lsns.get(&timeline.timeline_id) else {
    1553              :                         continue;
    1554              :                     };
    1555              : 
    1556            0 :                     tracing::info!(
    1557            0 :                         "Waiting for child shard {}/{} to reach target lsn {}...",
    1558            0 :                         child_shard_id,
    1559            0 :                         timeline.timeline_id,
    1560            0 :                         target_lsn
    1561            0 :                     );
    1562              :                     if let Err(e) = timeline.wait_lsn(*target_lsn, ctx).await {
    1563              :                         // Failure here might mean shutdown, in any case this part is an optimization
    1564              :                         // and we shouldn't hold up the split operation.
    1565            0 :                         tracing::warn!(
    1566            0 :                             "Failed to wait for timeline {} to reach lsn {target_lsn}: {e}",
    1567            0 :                             timeline.timeline_id
    1568            0 :                         );
    1569              :                     } else {
    1570            0 :                         tracing::info!(
    1571            0 :                             "Child shard {}/{} reached target lsn {}",
    1572            0 :                             child_shard_id,
    1573            0 :                             timeline.timeline_id,
    1574            0 :                             target_lsn
    1575            0 :                         );
    1576              :                     }
    1577              :                 }
    1578              :             }
    1579              :         }
    1580              : 
    1581              :         // Phase 5: Shut down the parent shard, and erase it from disk
    1582              :         let (_guard, progress) = completion::channel();
    1583              :         match parent.shutdown(progress, false).await {
    1584              :             Ok(()) => {}
    1585              :             Err(other) => {
    1586              :                 other.wait().await;
    1587              :             }
    1588              :         }
    1589              :         let local_tenant_directory = self.conf.tenant_path(&tenant_shard_id);
    1590              :         let tmp_path = safe_rename_tenant_dir(&local_tenant_directory)
    1591              :             .await
    1592            0 :             .with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))?;
    1593              :         task_mgr::spawn(
    1594              :             task_mgr::BACKGROUND_RUNTIME.handle(),
    1595              :             TaskKind::MgmtRequest,
    1596              :             None,
    1597              :             None,
    1598              :             "tenant_files_delete",
    1599              :             false,
    1600            0 :             async move {
    1601            0 :                 fs::remove_dir_all(tmp_path.as_path())
    1602            0 :                     .await
    1603            0 :                     .with_context(|| format!("tenant directory {:?} deletion", tmp_path))
    1604            0 :             },
    1605              :         );
    1606              : 
    1607              :         parent_slot_guard.drop_old_value()?;
    1608              : 
    1609              :         // Phase 6: Release the InProgress on the parent shard
    1610              :         drop(parent_slot_guard);
    1611              : 
    1612              :         Ok(child_shards)
    1613              :     }
    1614              : 
    1615              :     /// Part of [`Self::shard_split`]: hard link parent shard layers into child shards, as an optimization
    1616              :     /// to avoid the children downloading them again.
    1617              :     ///
    1618              :     /// For each resident layer in the parent shard, we will hard link it into all of the child shards.
    1619            0 :     async fn shard_split_hardlink(
    1620            0 :         &self,
    1621            0 :         parent_shard: &Tenant,
    1622            0 :         child_shards: Vec<TenantShardId>,
    1623            0 :     ) -> anyhow::Result<()> {
    1624            0 :         debug_assert_current_span_has_tenant_id();
    1625            0 : 
    1626            0 :         let parent_path = self.conf.tenant_path(parent_shard.get_tenant_shard_id());
    1627            0 :         let (parent_timelines, parent_layers) = {
    1628            0 :             let mut parent_layers = Vec::new();
    1629            0 :             let timelines = parent_shard.timelines.lock().unwrap().clone();
    1630            0 :             let parent_timelines = timelines.keys().cloned().collect::<Vec<_>>();
    1631            0 :             for timeline in timelines.values() {
    1632            0 :                 let timeline_layers = timeline
    1633            0 :                     .layers
    1634            0 :                     .read()
    1635            0 :                     .await
    1636            0 :                     .resident_layers()
    1637            0 :                     .collect::<Vec<_>>()
    1638            0 :                     .await;
    1639            0 :                 for layer in timeline_layers {
    1640            0 :                     let relative_path = layer
    1641            0 :                         .local_path()
    1642            0 :                         .strip_prefix(&parent_path)
    1643            0 :                         .context("Removing prefix from parent layer path")?;
    1644            0 :                     parent_layers.push(relative_path.to_owned());
    1645              :                 }
    1646              :             }
    1647              :             debug_assert!(
    1648            0 :                 !parent_layers.is_empty(),
    1649            0 :                 "shutdown cannot empty the layermap"
    1650              :             );
    1651            0 :             (parent_timelines, parent_layers)
    1652            0 :         };
    1653            0 : 
    1654            0 :         let mut child_prefixes = Vec::new();
    1655            0 :         let mut create_dirs = Vec::new();
    1656              : 
    1657            0 :         for child in child_shards {
    1658            0 :             let child_prefix = self.conf.tenant_path(&child);
    1659            0 :             create_dirs.push(child_prefix.clone());
    1660            0 :             create_dirs.extend(
    1661            0 :                 parent_timelines
    1662            0 :                     .iter()
    1663            0 :                     .map(|t| self.conf.timeline_path(&child, t)),
    1664            0 :             );
    1665            0 : 
    1666            0 :             child_prefixes.push(child_prefix);
    1667            0 :         }
    1668              : 
    1669              :         // Since we will do a large number of small filesystem metadata operations, batch them into
    1670              :         // spawn_blocking calls rather than doing each one as a tokio::fs round-trip.
    1671            0 :         let jh = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> {
    1672            0 :             for dir in &create_dirs {
    1673            0 :                 if let Err(e) = std::fs::create_dir_all(dir) {
    1674              :                     // Ignore AlreadyExists errors, drop out on all other errors
    1675            0 :                     match e.kind() {
    1676            0 :                         std::io::ErrorKind::AlreadyExists => {}
    1677              :                         _ => {
    1678            0 :                             return Err(anyhow::anyhow!(e).context(format!("Creating {dir}")));
    1679              :                         }
    1680              :                     }
    1681            0 :                 }
    1682              :             }
    1683              : 
    1684            0 :             for child_prefix in child_prefixes {
    1685            0 :                 for relative_layer in &parent_layers {
    1686            0 :                     let parent_path = parent_path.join(relative_layer);
    1687            0 :                     let child_path = child_prefix.join(relative_layer);
    1688            0 :                     if let Err(e) = std::fs::hard_link(&parent_path, &child_path) {
    1689            0 :                         match e.kind() {
    1690            0 :                             std::io::ErrorKind::AlreadyExists => {}
    1691              :                             std::io::ErrorKind::NotFound => {
    1692            0 :                                 tracing::info!(
    1693            0 :                                     "Layer {} not found during hard-linking, evicted during split?",
    1694            0 :                                     relative_layer
    1695            0 :                                 );
    1696              :                             }
    1697              :                             _ => {
    1698            0 :                                 return Err(anyhow::anyhow!(e).context(format!(
    1699            0 :                                     "Hard linking {relative_layer} into {child_prefix}"
    1700            0 :                                 )))
    1701              :                             }
    1702              :                         }
    1703            0 :                     }
    1704              :                 }
    1705              :             }
    1706              : 
    1707              :             // Durability is not required for correctness, but if we crashed during split and
    1708              :             // then came restarted with empty timeline dirs, it would be very inefficient to
    1709              :             // re-populate from remote storage.
    1710            0 :             for dir in create_dirs {
    1711            0 :                 if let Err(e) = crashsafe::fsync(&dir) {
    1712              :                     // Something removed a newly created timeline dir out from underneath us?  Extremely
    1713              :                     // unexpected, but not worth panic'ing over as this whole function is just an
    1714              :                     // optimization.
    1715            0 :                     tracing::warn!("Failed to fsync directory {dir}: {e}")
    1716            0 :                 }
    1717              :             }
    1718              : 
    1719            0 :             Ok(parent_layers.len())
    1720            0 :         });
    1721            0 : 
    1722            0 :         match jh.await {
    1723            0 :             Ok(Ok(layer_count)) => {
    1724            0 :                 tracing::info!(count = layer_count, "Hard linked layers into child shards");
    1725              :             }
    1726            0 :             Ok(Err(e)) => {
    1727              :                 // This is an optimization, so we tolerate failure.
    1728            0 :                 tracing::warn!("Error hard-linking layers, proceeding anyway: {e}")
    1729              :             }
    1730            0 :             Err(e) => {
    1731            0 :                 // This is something totally unexpected like a panic, so bail out.
    1732            0 :                 anyhow::bail!("Error joining hard linking task: {e}");
    1733              :             }
    1734              :         }
    1735              : 
    1736            0 :         Ok(())
    1737            0 :     }
    1738              : }
    1739              : 
    1740            0 : #[derive(Debug, thiserror::Error)]
    1741              : pub(crate) enum GetTenantError {
    1742              :     /// NotFound is a TenantId rather than TenantShardId, because this error type is used from
    1743              :     /// getters that use a TenantId and a ShardSelector, not just getters that target a specific shard.
    1744              :     #[error("Tenant {0} not found")]
    1745              :     NotFound(TenantId),
    1746              : 
    1747              :     #[error("Tenant {0} is not active")]
    1748              :     NotActive(TenantShardId),
    1749              :     /// Broken is logically a subset of NotActive, but a distinct error is useful as
    1750              :     /// NotActive is usually a retryable state for API purposes, whereas Broken
    1751              :     /// is a stuck error state
    1752              :     #[error("Tenant is broken: {0}")]
    1753              :     Broken(String),
    1754              : 
    1755              :     // Initializing or shutting down: cannot authoritatively say whether we have this tenant
    1756              :     #[error("Tenant map is not available: {0}")]
    1757              :     MapState(#[from] TenantMapError),
    1758              : }
    1759              : 
    1760              : /// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
    1761              : /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
    1762              : ///
    1763              : /// This method is cancel-safe.
    1764            0 : pub(crate) fn get_tenant(
    1765            0 :     tenant_shard_id: TenantShardId,
    1766            0 :     active_only: bool,
    1767            0 : ) -> Result<Arc<Tenant>, GetTenantError> {
    1768            0 :     let locked = TENANTS.read().unwrap();
    1769              : 
    1770            0 :     let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
    1771              : 
    1772            0 :     match peek_slot {
    1773            0 :         Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
    1774              :             TenantState::Broken {
    1775            0 :                 reason,
    1776            0 :                 backtrace: _,
    1777            0 :             } if active_only => Err(GetTenantError::Broken(reason)),
    1778            0 :             TenantState::Active => Ok(Arc::clone(tenant)),
    1779              :             _ => {
    1780            0 :                 if active_only {
    1781            0 :                     Err(GetTenantError::NotActive(tenant_shard_id))
    1782              :                 } else {
    1783            0 :                     Ok(Arc::clone(tenant))
    1784              :                 }
    1785              :             }
    1786              :         },
    1787            0 :         Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
    1788              :         None | Some(TenantSlot::Secondary(_)) => {
    1789            0 :             Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
    1790              :         }
    1791              :     }
    1792            0 : }
    1793              : 
    1794            0 : #[derive(thiserror::Error, Debug)]
    1795              : pub(crate) enum GetActiveTenantError {
    1796              :     /// We may time out either while TenantSlot is InProgress, or while the Tenant
    1797              :     /// is in a non-Active state
    1798              :     #[error(
    1799              :         "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
    1800              :     )]
    1801              :     WaitForActiveTimeout {
    1802              :         latest_state: Option<TenantState>,
    1803              :         wait_time: Duration,
    1804              :     },
    1805              : 
    1806              :     /// The TenantSlot is absent, or in secondary mode
    1807              :     #[error(transparent)]
    1808              :     NotFound(#[from] GetTenantError),
    1809              : 
    1810              :     /// Cancellation token fired while we were waiting
    1811              :     #[error("cancelled")]
    1812              :     Cancelled,
    1813              : 
    1814              :     /// Tenant exists, but is in a state that cannot become active (e.g. Stopping, Broken)
    1815              :     #[error("will not become active.  Current state: {0}")]
    1816              :     WillNotBecomeActive(TenantState),
    1817              : }
    1818              : 
    1819              : /// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`]
    1820              : /// state, then wait for up to `timeout`.  If the [`Tenant`] is not currently in [`TenantState::Active`],
    1821              : /// then wait for up to `timeout` (minus however long we waited for the slot).
    1822            0 : pub(crate) async fn get_active_tenant_with_timeout(
    1823            0 :     tenant_id: TenantId,
    1824            0 :     shard_selector: ShardSelector,
    1825            0 :     timeout: Duration,
    1826            0 :     cancel: &CancellationToken,
    1827            0 : ) -> Result<Arc<Tenant>, GetActiveTenantError> {
    1828            0 :     enum WaitFor {
    1829            0 :         Barrier(utils::completion::Barrier),
    1830            0 :         Tenant(Arc<Tenant>),
    1831            0 :     }
    1832            0 : 
    1833            0 :     let wait_start = Instant::now();
    1834            0 :     let deadline = wait_start + timeout;
    1835              : 
    1836            0 :     let (wait_for, tenant_shard_id) = {
    1837            0 :         let locked = TENANTS.read().unwrap();
    1838              : 
    1839              :         // Resolve TenantId to TenantShardId
    1840            0 :         let tenant_shard_id = locked
    1841            0 :             .resolve_attached_shard(&tenant_id, shard_selector)
    1842            0 :             .ok_or(GetActiveTenantError::NotFound(GetTenantError::NotFound(
    1843            0 :                 tenant_id,
    1844            0 :             )))?;
    1845              : 
    1846            0 :         let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
    1847            0 :             .map_err(GetTenantError::MapState)?;
    1848            0 :         match peek_slot {
    1849            0 :             Some(TenantSlot::Attached(tenant)) => {
    1850            0 :                 match tenant.current_state() {
    1851              :                     TenantState::Active => {
    1852              :                         // Fast path: we don't need to do any async waiting.
    1853            0 :                         return Ok(tenant.clone());
    1854              :                     }
    1855              :                     _ => {
    1856            0 :                         tenant.activate_now();
    1857            0 :                         (WaitFor::Tenant(tenant.clone()), tenant_shard_id)
    1858              :                     }
    1859              :                 }
    1860              :             }
    1861              :             Some(TenantSlot::Secondary(_)) => {
    1862            0 :                 return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
    1863            0 :                     tenant_shard_id,
    1864            0 :                 )))
    1865              :             }
    1866            0 :             Some(TenantSlot::InProgress(barrier)) => {
    1867            0 :                 (WaitFor::Barrier(barrier.clone()), tenant_shard_id)
    1868              :             }
    1869              :             None => {
    1870            0 :                 return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
    1871            0 :                     tenant_id,
    1872            0 :                 )))
    1873              :             }
    1874              :         }
    1875              :     };
    1876              : 
    1877            0 :     let tenant = match wait_for {
    1878            0 :         WaitFor::Barrier(barrier) => {
    1879            0 :             tracing::debug!("Waiting for tenant InProgress state to pass...");
    1880            0 :             timeout_cancellable(
    1881            0 :                 deadline.duration_since(Instant::now()),
    1882            0 :                 cancel,
    1883            0 :                 barrier.wait(),
    1884            0 :             )
    1885            0 :             .await
    1886            0 :             .map_err(|e| match e {
    1887            0 :                 TimeoutCancellableError::Timeout => GetActiveTenantError::WaitForActiveTimeout {
    1888            0 :                     latest_state: None,
    1889            0 :                     wait_time: wait_start.elapsed(),
    1890            0 :                 },
    1891            0 :                 TimeoutCancellableError::Cancelled => GetActiveTenantError::Cancelled,
    1892            0 :             })?;
    1893              :             {
    1894            0 :                 let locked = TENANTS.read().unwrap();
    1895            0 :                 let peek_slot =
    1896            0 :                     tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
    1897            0 :                         .map_err(GetTenantError::MapState)?;
    1898            0 :                 match peek_slot {
    1899            0 :                     Some(TenantSlot::Attached(tenant)) => tenant.clone(),
    1900              :                     _ => {
    1901            0 :                         return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
    1902            0 :                             tenant_shard_id,
    1903            0 :                         )))
    1904              :                     }
    1905              :                 }
    1906              :             }
    1907              :         }
    1908            0 :         WaitFor::Tenant(tenant) => tenant,
    1909              :     };
    1910              : 
    1911            0 :     tracing::debug!("Waiting for tenant to enter active state...");
    1912            0 :     tenant
    1913            0 :         .wait_to_become_active(deadline.duration_since(Instant::now()))
    1914            0 :         .await?;
    1915            0 :     Ok(tenant)
    1916            0 : }
    1917              : 
    1918            0 : #[derive(Debug, thiserror::Error)]
    1919              : pub(crate) enum DeleteTimelineError {
    1920              :     #[error("Tenant {0}")]
    1921              :     Tenant(#[from] GetTenantError),
    1922              : 
    1923              :     #[error("Timeline {0}")]
    1924              :     Timeline(#[from] crate::tenant::DeleteTimelineError),
    1925              : }
    1926              : 
    1927            0 : #[derive(Debug, thiserror::Error)]
    1928              : pub(crate) enum TenantStateError {
    1929              :     #[error("Tenant {0} is stopping")]
    1930              :     IsStopping(TenantShardId),
    1931              :     #[error(transparent)]
    1932              :     SlotError(#[from] TenantSlotError),
    1933              :     #[error(transparent)]
    1934              :     SlotUpsertError(#[from] TenantSlotUpsertError),
    1935              :     #[error(transparent)]
    1936              :     Other(#[from] anyhow::Error),
    1937              : }
    1938              : 
    1939            0 : pub(crate) async fn detach_tenant(
    1940            0 :     conf: &'static PageServerConf,
    1941            0 :     tenant_shard_id: TenantShardId,
    1942            0 :     detach_ignored: bool,
    1943            0 :     deletion_queue_client: &DeletionQueueClient,
    1944            0 : ) -> Result<(), TenantStateError> {
    1945            0 :     let tmp_path = detach_tenant0(
    1946            0 :         conf,
    1947            0 :         &TENANTS,
    1948            0 :         tenant_shard_id,
    1949            0 :         detach_ignored,
    1950            0 :         deletion_queue_client,
    1951            0 :     )
    1952            0 :     .await?;
    1953              :     // Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
    1954              :     // After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
    1955            0 :     let task_tenant_id = None;
    1956            0 :     task_mgr::spawn(
    1957            0 :         task_mgr::BACKGROUND_RUNTIME.handle(),
    1958            0 :         TaskKind::MgmtRequest,
    1959            0 :         task_tenant_id,
    1960            0 :         None,
    1961            0 :         "tenant_files_delete",
    1962            0 :         false,
    1963            0 :         async move {
    1964            0 :             fs::remove_dir_all(tmp_path.as_path())
    1965            0 :                 .await
    1966            0 :                 .with_context(|| format!("tenant directory {:?} deletion", tmp_path))
    1967            0 :         },
    1968            0 :     );
    1969            0 :     Ok(())
    1970            0 : }
    1971              : 
    1972            0 : async fn detach_tenant0(
    1973            0 :     conf: &'static PageServerConf,
    1974            0 :     tenants: &std::sync::RwLock<TenantsMap>,
    1975            0 :     tenant_shard_id: TenantShardId,
    1976            0 :     detach_ignored: bool,
    1977            0 :     deletion_queue_client: &DeletionQueueClient,
    1978            0 : ) -> Result<Utf8PathBuf, TenantStateError> {
    1979            0 :     let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
    1980            0 :         let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
    1981            0 :         safe_rename_tenant_dir(&local_tenant_directory)
    1982            0 :             .await
    1983            0 :             .with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))
    1984            0 :     };
    1985              : 
    1986            0 :     let removal_result = remove_tenant_from_memory(
    1987            0 :         tenants,
    1988            0 :         tenant_shard_id,
    1989            0 :         tenant_dir_rename_operation(tenant_shard_id),
    1990            0 :     )
    1991            0 :     .await;
    1992              : 
    1993              :     // Flush pending deletions, so that they have a good chance of passing validation
    1994              :     // before this tenant is potentially re-attached elsewhere.
    1995            0 :     deletion_queue_client.flush_advisory();
    1996            0 : 
    1997            0 :     // Ignored tenants are not present in memory and will bail the removal from memory operation.
    1998            0 :     // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
    1999            0 :     if detach_ignored
    2000              :         && matches!(
    2001            0 :             removal_result,
    2002              :             Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
    2003              :         )
    2004              :     {
    2005            0 :         let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
    2006            0 :         if tenant_ignore_mark.exists() {
    2007            0 :             info!("Detaching an ignored tenant");
    2008            0 :             let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
    2009            0 :                 .await
    2010            0 :                 .with_context(|| {
    2011            0 :                     format!("Ignored tenant {tenant_shard_id} local directory rename")
    2012            0 :                 })?;
    2013            0 :             return Ok(tmp_path);
    2014            0 :         }
    2015            0 :     }
    2016              : 
    2017            0 :     removal_result
    2018            0 : }
    2019              : 
    2020            0 : pub(crate) async fn load_tenant(
    2021            0 :     conf: &'static PageServerConf,
    2022            0 :     tenant_id: TenantId,
    2023            0 :     generation: Generation,
    2024            0 :     broker_client: storage_broker::BrokerClientChannel,
    2025            0 :     remote_storage: Option<GenericRemoteStorage>,
    2026            0 :     deletion_queue_client: DeletionQueueClient,
    2027            0 :     ctx: &RequestContext,
    2028            0 : ) -> Result<(), TenantMapInsertError> {
    2029            0 :     // This is a legacy API (replaced by `/location_conf`).  It does not support sharding
    2030            0 :     let tenant_shard_id = TenantShardId::unsharded(tenant_id);
    2031              : 
    2032            0 :     let slot_guard =
    2033            0 :         tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
    2034            0 :     let tenant_path = conf.tenant_path(&tenant_shard_id);
    2035            0 : 
    2036            0 :     let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
    2037            0 :     if tenant_ignore_mark.exists() {
    2038            0 :         std::fs::remove_file(&tenant_ignore_mark).with_context(|| {
    2039            0 :             format!(
    2040            0 :                 "Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"
    2041            0 :             )
    2042            0 :         })?;
    2043            0 :     }
    2044              : 
    2045            0 :     let resources = TenantSharedResources {
    2046            0 :         broker_client,
    2047            0 :         remote_storage,
    2048            0 :         deletion_queue_client,
    2049            0 :     };
    2050              : 
    2051            0 :     let mut location_conf =
    2052            0 :         Tenant::load_tenant_config(conf, &tenant_shard_id).map_err(TenantMapInsertError::Other)?;
    2053            0 :     location_conf.attach_in_generation(generation);
    2054            0 : 
    2055            0 :     Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
    2056              : 
    2057            0 :     let shard_identity = location_conf.shard;
    2058            0 :     let new_tenant = tenant_spawn(
    2059            0 :         conf,
    2060            0 :         tenant_shard_id,
    2061            0 :         &tenant_path,
    2062            0 :         resources,
    2063            0 :         AttachedTenantConf::try_from(location_conf)?,
    2064            0 :         shard_identity,
    2065            0 :         None,
    2066            0 :         &TENANTS,
    2067            0 :         SpawnMode::Normal,
    2068            0 :         ctx,
    2069            0 :     )
    2070            0 :     .with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))?;
    2071              : 
    2072            0 :     slot_guard.upsert(TenantSlot::Attached(new_tenant))?;
    2073            0 :     Ok(())
    2074            0 : }
    2075              : 
    2076            0 : pub(crate) async fn ignore_tenant(
    2077            0 :     conf: &'static PageServerConf,
    2078            0 :     tenant_id: TenantId,
    2079            0 : ) -> Result<(), TenantStateError> {
    2080            0 :     ignore_tenant0(conf, &TENANTS, tenant_id).await
    2081            0 : }
    2082              : 
    2083            0 : #[instrument(skip_all, fields(shard_id))]
    2084              : async fn ignore_tenant0(
    2085              :     conf: &'static PageServerConf,
    2086              :     tenants: &std::sync::RwLock<TenantsMap>,
    2087              :     tenant_id: TenantId,
    2088              : ) -> Result<(), TenantStateError> {
    2089              :     // This is a legacy API (replaced by `/location_conf`).  It does not support sharding
    2090              :     let tenant_shard_id = TenantShardId::unsharded(tenant_id);
    2091              :     tracing::Span::current().record(
    2092              :         "shard_id",
    2093              :         tracing::field::display(tenant_shard_id.shard_slug()),
    2094              :     );
    2095              : 
    2096            0 :     remove_tenant_from_memory(tenants, tenant_shard_id, async {
    2097            0 :         let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
    2098            0 :         fs::File::create(&ignore_mark_file)
    2099            0 :             .await
    2100            0 :             .context("Failed to create ignore mark file")
    2101            0 :             .and_then(|_| {
    2102            0 :                 crashsafe::fsync_file_and_parent(&ignore_mark_file)
    2103            0 :                     .context("Failed to fsync ignore mark file")
    2104            0 :             })
    2105            0 :             .with_context(|| format!("Failed to crate ignore mark for tenant {tenant_shard_id}"))?;
    2106            0 :         Ok(())
    2107            0 :     })
    2108              :     .await
    2109              : }
    2110              : 
    2111            0 : #[derive(Debug, thiserror::Error)]
    2112              : pub(crate) enum TenantMapListError {
    2113              :     #[error("tenant map is still initiailizing")]
    2114              :     Initializing,
    2115              : }
    2116              : 
    2117              : ///
    2118              : /// Get list of tenants, for the mgmt API
    2119              : ///
    2120            0 : pub(crate) async fn list_tenants(
    2121            0 : ) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
    2122            0 :     let tenants = TENANTS.read().unwrap();
    2123            0 :     let m = match &*tenants {
    2124            0 :         TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
    2125            0 :         TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
    2126            0 :     };
    2127            0 :     Ok(m.iter()
    2128            0 :         .filter_map(|(id, tenant)| match tenant {
    2129            0 :             TenantSlot::Attached(tenant) => {
    2130            0 :                 Some((*id, tenant.current_state(), tenant.generation()))
    2131              :             }
    2132            0 :             TenantSlot::Secondary(_) => None,
    2133            0 :             TenantSlot::InProgress(_) => None,
    2134            0 :         })
    2135            0 :         .collect())
    2136            0 : }
    2137              : 
    2138            0 : #[derive(Debug, thiserror::Error)]
    2139              : pub(crate) enum TenantMapInsertError {
    2140              :     #[error(transparent)]
    2141              :     SlotError(#[from] TenantSlotError),
    2142              :     #[error(transparent)]
    2143              :     SlotUpsertError(#[from] TenantSlotUpsertError),
    2144              :     #[error(transparent)]
    2145              :     Other(#[from] anyhow::Error),
    2146              : }
    2147              : 
    2148              : /// Superset of TenantMapError: issues that can occur when acquiring a slot
    2149              : /// for a particular tenant ID.
    2150            0 : #[derive(Debug, thiserror::Error)]
    2151              : pub(crate) enum TenantSlotError {
    2152              :     /// When acquiring a slot with the expectation that the tenant already exists.
    2153              :     #[error("Tenant {0} not found")]
    2154              :     NotFound(TenantShardId),
    2155              : 
    2156              :     /// When acquiring a slot with the expectation that the tenant does not already exist.
    2157              :     #[error("tenant {0} already exists, state: {1:?}")]
    2158              :     AlreadyExists(TenantShardId, TenantState),
    2159              : 
    2160              :     // Tried to read a slot that is currently being mutated by another administrative
    2161              :     // operation.
    2162              :     #[error("tenant has a state change in progress, try again later")]
    2163              :     InProgress,
    2164              : 
    2165              :     #[error(transparent)]
    2166              :     MapState(#[from] TenantMapError),
    2167              : }
    2168              : 
    2169              : /// Superset of TenantMapError: issues that can occur when using a SlotGuard
    2170              : /// to insert a new value.
    2171            0 : #[derive(thiserror::Error)]
    2172              : pub(crate) enum TenantSlotUpsertError {
    2173              :     /// An error where the slot is in an unexpected state, indicating a code bug
    2174              :     #[error("Internal error updating Tenant")]
    2175              :     InternalError(Cow<'static, str>),
    2176              : 
    2177              :     #[error(transparent)]
    2178              :     MapState(TenantMapError),
    2179              : 
    2180              :     // If we encounter TenantManager shutdown during upsert, we must carry the Completion
    2181              :     // from the SlotGuard, so that the caller can hold it while they clean up: otherwise
    2182              :     // TenantManager shutdown might race ahead before we're done cleaning up any Tenant that
    2183              :     // was protected by the SlotGuard.
    2184              :     #[error("Shutting down")]
    2185              :     ShuttingDown((TenantSlot, utils::completion::Completion)),
    2186              : }
    2187              : 
    2188              : impl std::fmt::Debug for TenantSlotUpsertError {
    2189            0 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
    2190            0 :         match self {
    2191            0 :             Self::InternalError(reason) => write!(f, "Internal Error {reason}"),
    2192            0 :             Self::MapState(map_error) => write!(f, "Tenant map state: {map_error:?}"),
    2193            0 :             Self::ShuttingDown(_completion) => write!(f, "Tenant map shutting down"),
    2194              :         }
    2195            0 :     }
    2196              : }
    2197              : 
    2198            0 : #[derive(Debug, thiserror::Error)]
    2199              : enum TenantSlotDropError {
    2200              :     /// It is only legal to drop a TenantSlot if its contents are fully shut down
    2201              :     #[error("Tenant was not shut down")]
    2202              :     NotShutdown,
    2203              : }
    2204              : 
    2205              : /// Errors that can happen any time we are walking the tenant map to try and acquire
    2206              : /// the TenantSlot for a particular tenant.
    2207            0 : #[derive(Debug, thiserror::Error)]
    2208              : pub enum TenantMapError {
    2209              :     // Tried to read while initializing
    2210              :     #[error("tenant map is still initializing")]
    2211              :     StillInitializing,
    2212              : 
    2213              :     // Tried to read while shutting down
    2214              :     #[error("tenant map is shutting down")]
    2215              :     ShuttingDown,
    2216              : }
    2217              : 
    2218              : /// Guards a particular tenant_id's content in the TenantsMap.  While this
    2219              : /// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
    2220              : /// for this tenant, which acts as a marker for any operations targeting
    2221              : /// this tenant to retry later, or wait for the InProgress state to end.
    2222              : ///
    2223              : /// This structure enforces the important invariant that we do not have overlapping
    2224              : /// tasks that will try use local storage for a the same tenant ID: we enforce that
    2225              : /// the previous contents of a slot have been shut down before the slot can be
    2226              : /// left empty or used for something else
    2227              : ///
    2228              : /// Holders of a SlotGuard should explicitly dispose of it, using either `upsert`
    2229              : /// to provide a new value, or `revert` to put the slot back into its initial
    2230              : /// state.  If the SlotGuard is dropped without calling either of these, then
    2231              : /// we will leave the slot empty if our `old_value` is already shut down, else
    2232              : /// we will replace the slot with `old_value` (equivalent to doing a revert).
    2233              : ///
    2234              : /// The `old_value` may be dropped before the SlotGuard is dropped, by calling
    2235              : /// `drop_old_value`.  It is an error to call this without shutting down
    2236              : /// the conents of `old_value`.
    2237              : pub struct SlotGuard {
    2238              :     tenant_shard_id: TenantShardId,
    2239              :     old_value: Option<TenantSlot>,
    2240              :     upserted: bool,
    2241              : 
    2242              :     /// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will
    2243              :     /// release any waiters as soon as this SlotGuard is dropped.
    2244              :     completion: utils::completion::Completion,
    2245              : }
    2246              : 
    2247              : impl SlotGuard {
    2248            2 :     fn new(
    2249            2 :         tenant_shard_id: TenantShardId,
    2250            2 :         old_value: Option<TenantSlot>,
    2251            2 :         completion: utils::completion::Completion,
    2252            2 :     ) -> Self {
    2253            2 :         Self {
    2254            2 :             tenant_shard_id,
    2255            2 :             old_value,
    2256            2 :             upserted: false,
    2257            2 :             completion,
    2258            2 :         }
    2259            2 :     }
    2260              : 
    2261              :     /// Get any value that was present in the slot before we acquired ownership
    2262              :     /// of it: in state transitions, this will be the old state.
    2263            2 :     fn get_old_value(&self) -> &Option<TenantSlot> {
    2264            2 :         &self.old_value
    2265            2 :     }
    2266              : 
    2267              :     /// Emplace a new value in the slot.  This consumes the guard, and after
    2268              :     /// returning, the slot is no longer protected from concurrent changes.
    2269            0 :     fn upsert(mut self, new_value: TenantSlot) -> Result<(), TenantSlotUpsertError> {
    2270            0 :         if !self.old_value_is_shutdown() {
    2271              :             // This is a bug: callers should never try to drop an old value without
    2272              :             // shutting it down
    2273            0 :             return Err(TenantSlotUpsertError::InternalError(
    2274            0 :                 "Old TenantSlot value not shut down".into(),
    2275            0 :             ));
    2276            0 :         }
    2277              : 
    2278            0 :         let replaced = {
    2279            0 :             let mut locked = TENANTS.write().unwrap();
    2280            0 : 
    2281            0 :             if let TenantSlot::InProgress(_) = new_value {
    2282              :                 // It is never expected to try and upsert InProgress via this path: it should
    2283              :                 // only be written via the tenant_map_acquire_slot path.  If we hit this it's a bug.
    2284            0 :                 return Err(TenantSlotUpsertError::InternalError(
    2285            0 :                     "Attempt to upsert an InProgress state".into(),
    2286            0 :                 ));
    2287            0 :             }
    2288              : 
    2289            0 :             let m = match &mut *locked {
    2290              :                 TenantsMap::Initializing => {
    2291            0 :                     return Err(TenantSlotUpsertError::MapState(
    2292            0 :                         TenantMapError::StillInitializing,
    2293            0 :                     ))
    2294              :                 }
    2295              :                 TenantsMap::ShuttingDown(_) => {
    2296            0 :                     return Err(TenantSlotUpsertError::ShuttingDown((
    2297            0 :                         new_value,
    2298            0 :                         self.completion.clone(),
    2299            0 :                     )));
    2300              :                 }
    2301            0 :                 TenantsMap::Open(m) => m,
    2302            0 :             };
    2303            0 : 
    2304            0 :             let replaced = m.insert(self.tenant_shard_id, new_value);
    2305            0 :             self.upserted = true;
    2306            0 : 
    2307            0 :             METRICS.tenant_slots.set(m.len() as u64);
    2308            0 : 
    2309            0 :             replaced
    2310              :         };
    2311              : 
    2312              :         // Sanity check: on an upsert we should always be replacing an InProgress marker
    2313            0 :         match replaced {
    2314              :             Some(TenantSlot::InProgress(_)) => {
    2315              :                 // Expected case: we find our InProgress in the map: nothing should have
    2316              :                 // replaced it because the code that acquires slots will not grant another
    2317              :                 // one for the same TenantId.
    2318            0 :                 Ok(())
    2319              :             }
    2320              :             None => {
    2321            0 :                 METRICS.unexpected_errors.inc();
    2322            0 :                 error!(
    2323            0 :                     tenant_shard_id = %self.tenant_shard_id,
    2324            0 :                     "Missing InProgress marker during tenant upsert, this is a bug."
    2325            0 :                 );
    2326            0 :                 Err(TenantSlotUpsertError::InternalError(
    2327            0 :                     "Missing InProgress marker during tenant upsert".into(),
    2328            0 :                 ))
    2329              :             }
    2330            0 :             Some(slot) => {
    2331            0 :                 METRICS.unexpected_errors.inc();
    2332            0 :                 error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug.  Contents: {:?}", slot);
    2333            0 :                 Err(TenantSlotUpsertError::InternalError(
    2334            0 :                     "Unexpected contents of TenantSlot".into(),
    2335            0 :                 ))
    2336              :             }
    2337              :         }
    2338            0 :     }
    2339              : 
    2340              :     /// Replace the InProgress slot with whatever was in the guard when we started
    2341            0 :     fn revert(mut self) {
    2342            0 :         if let Some(value) = self.old_value.take() {
    2343            0 :             match self.upsert(value) {
    2344            0 :                 Err(TenantSlotUpsertError::InternalError(_)) => {
    2345            0 :                     // We already logged the error, nothing else we can do.
    2346            0 :                 }
    2347              :                 Err(
    2348              :                     TenantSlotUpsertError::MapState(_) | TenantSlotUpsertError::ShuttingDown(_),
    2349            0 :                 ) => {
    2350            0 :                     // If the map is shutting down, we need not replace anything
    2351            0 :                 }
    2352            0 :                 Ok(()) => {}
    2353              :             }
    2354            0 :         }
    2355            0 :     }
    2356              : 
    2357              :     /// We may never drop our old value until it is cleanly shut down: otherwise we might leave
    2358              :     /// rogue background tasks that would write to the local tenant directory that this guard
    2359              :     /// is responsible for protecting
    2360            2 :     fn old_value_is_shutdown(&self) -> bool {
    2361            2 :         match self.old_value.as_ref() {
    2362            2 :             Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(),
    2363            0 :             Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(),
    2364              :             Some(TenantSlot::InProgress(_)) => {
    2365              :                 // A SlotGuard cannot be constructed for a slot that was already InProgress
    2366            0 :                 unreachable!()
    2367              :             }
    2368            0 :             None => true,
    2369              :         }
    2370            2 :     }
    2371              : 
    2372              :     /// The guard holder is done with the old value of the slot: they are obliged to already
    2373              :     /// shut it down before we reach this point.
    2374            2 :     fn drop_old_value(&mut self) -> Result<(), TenantSlotDropError> {
    2375            2 :         if !self.old_value_is_shutdown() {
    2376            0 :             Err(TenantSlotDropError::NotShutdown)
    2377              :         } else {
    2378            2 :             self.old_value.take();
    2379            2 :             Ok(())
    2380              :         }
    2381            2 :     }
    2382              : }
    2383              : 
    2384              : impl Drop for SlotGuard {
    2385            2 :     fn drop(&mut self) {
    2386            2 :         if self.upserted {
    2387            0 :             return;
    2388            2 :         }
    2389            2 :         // Our old value is already shutdown, or it never existed: it is safe
    2390            2 :         // for us to fully release the TenantSlot back into an empty state
    2391            2 : 
    2392            2 :         let mut locked = TENANTS.write().unwrap();
    2393              : 
    2394            2 :         let m = match &mut *locked {
    2395              :             TenantsMap::Initializing => {
    2396              :                 // There is no map, this should never happen.
    2397            2 :                 return;
    2398              :             }
    2399              :             TenantsMap::ShuttingDown(_) => {
    2400              :                 // When we transition to shutdown, InProgress elements are removed
    2401              :                 // from the map, so we do not need to clean up our Inprogress marker.
    2402              :                 // See [`shutdown_all_tenants0`]
    2403            0 :                 return;
    2404              :             }
    2405            0 :             TenantsMap::Open(m) => m,
    2406            0 :         };
    2407            0 : 
    2408            0 :         use std::collections::btree_map::Entry;
    2409            0 :         match m.entry(self.tenant_shard_id) {
    2410            0 :             Entry::Occupied(mut entry) => {
    2411            0 :                 if !matches!(entry.get(), TenantSlot::InProgress(_)) {
    2412            0 :                     METRICS.unexpected_errors.inc();
    2413            0 :                     error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug.  Contents: {:?}", entry.get());
    2414            0 :                 }
    2415              : 
    2416            0 :                 if self.old_value_is_shutdown() {
    2417            0 :                     entry.remove();
    2418            0 :                 } else {
    2419            0 :                     entry.insert(self.old_value.take().unwrap());
    2420            0 :                 }
    2421              :             }
    2422              :             Entry::Vacant(_) => {
    2423            0 :                 METRICS.unexpected_errors.inc();
    2424            0 :                 error!(
    2425            0 :                     tenant_shard_id = %self.tenant_shard_id,
    2426            0 :                     "Missing InProgress marker during SlotGuard drop, this is a bug."
    2427            0 :                 );
    2428              :             }
    2429              :         }
    2430              : 
    2431            0 :         METRICS.tenant_slots.set(m.len() as u64);
    2432            2 :     }
    2433              : }
    2434              : 
    2435              : enum TenantSlotPeekMode {
    2436              :     /// In Read mode, peek will be permitted to see the slots even if the pageserver is shutting down
    2437              :     Read,
    2438              :     /// In Write mode, trying to peek at a slot while the pageserver is shutting down is an error
    2439              :     Write,
    2440              : }
    2441              : 
    2442            0 : fn tenant_map_peek_slot<'a>(
    2443            0 :     tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
    2444            0 :     tenant_shard_id: &TenantShardId,
    2445            0 :     mode: TenantSlotPeekMode,
    2446            0 : ) -> Result<Option<&'a TenantSlot>, TenantMapError> {
    2447            0 :     match tenants.deref() {
    2448            0 :         TenantsMap::Initializing => Err(TenantMapError::StillInitializing),
    2449            0 :         TenantsMap::ShuttingDown(m) => match mode {
    2450              :             TenantSlotPeekMode::Read => Ok(Some(
    2451              :                 // When reading in ShuttingDown state, we must translate None results
    2452              :                 // into a ShuttingDown error, because absence of a tenant shard ID in the map
    2453              :                 // isn't a reliable indicator of the tenant being gone: it might have been
    2454              :                 // InProgress when shutdown started, and cleaned up from that state such
    2455              :                 // that it's now no longer in the map.  Callers will have to wait until
    2456              :                 // we next start up to get a proper answer.  This avoids incorrect 404 API responses.
    2457            0 :                 m.get(tenant_shard_id).ok_or(TenantMapError::ShuttingDown)?,
    2458              :             )),
    2459            0 :             TenantSlotPeekMode::Write => Err(TenantMapError::ShuttingDown),
    2460              :         },
    2461            0 :         TenantsMap::Open(m) => Ok(m.get(tenant_shard_id)),
    2462              :     }
    2463            0 : }
    2464              : 
    2465              : enum TenantSlotAcquireMode {
    2466              :     /// Acquire the slot irrespective of current state, or whether it already exists
    2467              :     Any,
    2468              :     /// Return an error if trying to acquire a slot and it doesn't already exist
    2469              :     MustExist,
    2470              :     /// Return an error if trying to acquire a slot and it already exists
    2471              :     MustNotExist,
    2472              : }
    2473              : 
    2474            0 : fn tenant_map_acquire_slot(
    2475            0 :     tenant_shard_id: &TenantShardId,
    2476            0 :     mode: TenantSlotAcquireMode,
    2477            0 : ) -> Result<SlotGuard, TenantSlotError> {
    2478            0 :     tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode)
    2479            0 : }
    2480              : 
    2481            2 : fn tenant_map_acquire_slot_impl(
    2482            2 :     tenant_shard_id: &TenantShardId,
    2483            2 :     tenants: &std::sync::RwLock<TenantsMap>,
    2484            2 :     mode: TenantSlotAcquireMode,
    2485            2 : ) -> Result<SlotGuard, TenantSlotError> {
    2486            2 :     use TenantSlotAcquireMode::*;
    2487            2 :     METRICS.tenant_slot_writes.inc();
    2488            2 : 
    2489            2 :     let mut locked = tenants.write().unwrap();
    2490            2 :     let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug());
    2491            2 :     let _guard = span.enter();
    2492              : 
    2493            2 :     let m = match &mut *locked {
    2494            0 :         TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
    2495            0 :         TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
    2496            2 :         TenantsMap::Open(m) => m,
    2497            2 :     };
    2498            2 : 
    2499            2 :     use std::collections::btree_map::Entry;
    2500            2 : 
    2501            2 :     let entry = m.entry(*tenant_shard_id);
    2502            2 : 
    2503            2 :     match entry {
    2504            0 :         Entry::Vacant(v) => match mode {
    2505              :             MustExist => {
    2506            0 :                 tracing::debug!("Vacant && MustExist: return NotFound");
    2507            0 :                 Err(TenantSlotError::NotFound(*tenant_shard_id))
    2508              :             }
    2509              :             _ => {
    2510            0 :                 let (completion, barrier) = utils::completion::channel();
    2511            0 :                 v.insert(TenantSlot::InProgress(barrier));
    2512            0 :                 tracing::debug!("Vacant, inserted InProgress");
    2513            0 :                 Ok(SlotGuard::new(*tenant_shard_id, None, completion))
    2514              :             }
    2515              :         },
    2516            2 :         Entry::Occupied(mut o) => {
    2517            2 :             // Apply mode-driven checks
    2518            2 :             match (o.get(), mode) {
    2519              :                 (TenantSlot::InProgress(_), _) => {
    2520            0 :                     tracing::debug!("Occupied, failing for InProgress");
    2521            0 :                     Err(TenantSlotError::InProgress)
    2522              :                 }
    2523            0 :                 (slot, MustNotExist) => match slot {
    2524            0 :                     TenantSlot::Attached(tenant) => {
    2525            0 :                         tracing::debug!("Attached && MustNotExist, return AlreadyExists");
    2526            0 :                         Err(TenantSlotError::AlreadyExists(
    2527            0 :                             *tenant_shard_id,
    2528            0 :                             tenant.current_state(),
    2529            0 :                         ))
    2530              :                     }
    2531              :                     _ => {
    2532              :                         // FIXME: the AlreadyExists error assumes that we have a Tenant
    2533              :                         // to get the state from
    2534            0 :                         tracing::debug!("Occupied & MustNotExist, return AlreadyExists");
    2535            0 :                         Err(TenantSlotError::AlreadyExists(
    2536            0 :                             *tenant_shard_id,
    2537            0 :                             TenantState::Broken {
    2538            0 :                                 reason: "Present but not attached".to_string(),
    2539            0 :                                 backtrace: "".to_string(),
    2540            0 :                             },
    2541            0 :                         ))
    2542              :                     }
    2543              :                 },
    2544              :                 _ => {
    2545              :                     // Happy case: the slot was not in any state that violated our mode
    2546            2 :                     let (completion, barrier) = utils::completion::channel();
    2547            2 :                     let old_value = o.insert(TenantSlot::InProgress(barrier));
    2548            2 :                     tracing::debug!("Occupied, replaced with InProgress");
    2549            2 :                     Ok(SlotGuard::new(
    2550            2 :                         *tenant_shard_id,
    2551            2 :                         Some(old_value),
    2552            2 :                         completion,
    2553            2 :                     ))
    2554              :                 }
    2555              :             }
    2556              :         }
    2557              :     }
    2558            2 : }
    2559              : 
    2560              : /// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
    2561              : /// Allows to remove other tenant resources manually, via `tenant_cleanup`.
    2562              : /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
    2563              : /// operation would be needed to remove it.
    2564            2 : async fn remove_tenant_from_memory<V, F>(
    2565            2 :     tenants: &std::sync::RwLock<TenantsMap>,
    2566            2 :     tenant_shard_id: TenantShardId,
    2567            2 :     tenant_cleanup: F,
    2568            2 : ) -> Result<V, TenantStateError>
    2569            2 : where
    2570            2 :     F: std::future::Future<Output = anyhow::Result<V>>,
    2571            2 : {
    2572            2 :     let mut slot_guard =
    2573            2 :         tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
    2574              : 
    2575              :     // allow pageserver shutdown to await for our completion
    2576            2 :     let (_guard, progress) = completion::channel();
    2577              : 
    2578              :     // The SlotGuard allows us to manipulate the Tenant object without fear of some
    2579              :     // concurrent API request doing something else for the same tenant ID.
    2580            2 :     let attached_tenant = match slot_guard.get_old_value() {
    2581            2 :         Some(TenantSlot::Attached(tenant)) => {
    2582            2 :             // whenever we remove a tenant from memory, we don't want to flush and wait for upload
    2583            2 :             let freeze_and_flush = false;
    2584            2 : 
    2585            2 :             // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
    2586            2 :             // that we can continue safely to cleanup.
    2587            2 :             match tenant.shutdown(progress, freeze_and_flush).await {
    2588            2 :                 Ok(()) => {}
    2589            0 :                 Err(_other) => {
    2590            0 :                     // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
    2591            0 :                     // wait for it but return an error right away because these are distinct requests.
    2592            0 :                     slot_guard.revert();
    2593            0 :                     return Err(TenantStateError::IsStopping(tenant_shard_id));
    2594              :                 }
    2595              :             }
    2596            2 :             Some(tenant)
    2597              :         }
    2598            0 :         Some(TenantSlot::Secondary(secondary_state)) => {
    2599            0 :             tracing::info!("Shutting down in secondary mode");
    2600            0 :             secondary_state.shutdown().await;
    2601            0 :             None
    2602              :         }
    2603              :         Some(TenantSlot::InProgress(_)) => {
    2604              :             // Acquiring a slot guarantees its old value was not InProgress
    2605            0 :             unreachable!();
    2606              :         }
    2607            0 :         None => None,
    2608              :     };
    2609              : 
    2610            2 :     match tenant_cleanup
    2611            2 :         .await
    2612            2 :         .with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
    2613              :     {
    2614            2 :         Ok(hook_value) => {
    2615            2 :             // Success: drop the old TenantSlot::Attached.
    2616            2 :             slot_guard
    2617            2 :                 .drop_old_value()
    2618            2 :                 .expect("We just called shutdown");
    2619            2 : 
    2620            2 :             Ok(hook_value)
    2621              :         }
    2622            0 :         Err(e) => {
    2623              :             // If we had a Tenant, set it to Broken and put it back in the TenantsMap
    2624            0 :             if let Some(attached_tenant) = attached_tenant {
    2625            0 :                 attached_tenant.set_broken(e.to_string()).await;
    2626            0 :             }
    2627              :             // Leave the broken tenant in the map
    2628            0 :             slot_guard.revert();
    2629            0 : 
    2630            0 :             Err(TenantStateError::Other(e))
    2631              :         }
    2632              :     }
    2633            2 : }
    2634              : 
    2635              : use {
    2636              :     crate::repository::GcResult, pageserver_api::models::TimelineGcRequest,
    2637              :     utils::http::error::ApiError,
    2638              : };
    2639              : 
    2640            0 : pub(crate) async fn immediate_gc(
    2641            0 :     tenant_shard_id: TenantShardId,
    2642            0 :     timeline_id: TimelineId,
    2643            0 :     gc_req: TimelineGcRequest,
    2644            0 :     cancel: CancellationToken,
    2645            0 :     ctx: &RequestContext,
    2646            0 : ) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
    2647            0 :     let guard = TENANTS.read().unwrap();
    2648              : 
    2649            0 :     let tenant = guard
    2650            0 :         .get(&tenant_shard_id)
    2651            0 :         .map(Arc::clone)
    2652            0 :         .with_context(|| format!("tenant {tenant_shard_id}"))
    2653            0 :         .map_err(|e| ApiError::NotFound(e.into()))?;
    2654              : 
    2655            0 :     let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
    2656            0 :     // Use tenant's pitr setting
    2657            0 :     let pitr = tenant.get_pitr_interval();
    2658            0 : 
    2659            0 :     // Run in task_mgr to avoid race with tenant_detach operation
    2660            0 :     let ctx = ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
    2661            0 :     let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
    2662            0 :     // TODO: spawning is redundant now, need to hold the gate
    2663            0 :     task_mgr::spawn(
    2664            0 :         &tokio::runtime::Handle::current(),
    2665            0 :         TaskKind::GarbageCollector,
    2666            0 :         Some(tenant_shard_id),
    2667            0 :         Some(timeline_id),
    2668            0 :         &format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"),
    2669            0 :         false,
    2670            0 :         async move {
    2671            0 :             fail::fail_point!("immediate_gc_task_pre");
    2672              : 
    2673              :             #[allow(unused_mut)]
    2674            0 :             let mut result = tenant
    2675            0 :                 .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
    2676            0 :                 .instrument(info_span!("manual_gc", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))
    2677            0 :                 .await;
    2678              :                 // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
    2679              :                 // better once the types support it.
    2680              : 
    2681              :             #[cfg(feature = "testing")]
    2682              :             {
    2683            0 :                 if let Ok(result) = result.as_mut() {
    2684              :                     // why not futures unordered? it seems it needs very much the same task structure
    2685              :                     // but would only run on single task.
    2686            0 :                     let mut js = tokio::task::JoinSet::new();
    2687            0 :                     for layer in std::mem::take(&mut result.doomed_layers) {
    2688            0 :                         js.spawn(layer.wait_drop());
    2689            0 :                     }
    2690            0 :                     tracing::info!(total = js.len(), "starting to wait for the gc'd layers to be dropped");
    2691            0 :                     while let Some(res) = js.join_next().await {
    2692            0 :                         res.expect("wait_drop should not panic");
    2693            0 :                     }
    2694            0 :                 }
    2695              : 
    2696            0 :                 let timeline = tenant.get_timeline(timeline_id, false).ok();
    2697            0 :                 let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref());
    2698              : 
    2699            0 :                 if let Some(rtc) = rtc {
    2700              :                     // layer drops schedule actions on remote timeline client to actually do the
    2701              :                     // deletions; don't care just exit fast about the shutdown error
    2702            0 :                     drop(rtc.wait_completion().await);
    2703            0 :                 }
    2704              :             }
    2705              : 
    2706            0 :             match task_done.send(result) {
    2707            0 :                 Ok(_) => (),
    2708            0 :                 Err(result) => error!("failed to send gc result: {result:?}"),
    2709              :             }
    2710            0 :             Ok(())
    2711            0 :         }
    2712            0 :     );
    2713            0 : 
    2714            0 :     // drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
    2715            0 :     drop(guard);
    2716            0 : 
    2717            0 :     Ok(wait_task_done)
    2718            0 : }
    2719              : 
    2720              : #[cfg(test)]
    2721              : mod tests {
    2722              :     use std::collections::BTreeMap;
    2723              :     use std::sync::Arc;
    2724              :     use tracing::Instrument;
    2725              : 
    2726              :     use crate::tenant::mgr::TenantSlot;
    2727              : 
    2728              :     use super::{super::harness::TenantHarness, TenantsMap};
    2729              : 
    2730            2 :     #[tokio::test(start_paused = true)]
    2731            2 :     async fn shutdown_awaits_in_progress_tenant() {
    2732            2 :         // Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully
    2733            2 :         // wait for it to complete before proceeding.
    2734            2 : 
    2735            2 :         let h = TenantHarness::create("shutdown_awaits_in_progress_tenant").unwrap();
    2736            2 :         let (t, _ctx) = h.load().await;
    2737            2 : 
    2738            2 :         // harness loads it to active, which is forced and nothing is running on the tenant
    2739            2 : 
    2740            2 :         let id = t.tenant_shard_id();
    2741            2 : 
    2742            2 :         // tenant harness configures the logging and we cannot escape it
    2743            2 :         let span = h.span();
    2744            2 :         let _e = span.enter();
    2745            2 : 
    2746            2 :         let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
    2747            2 :         let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
    2748            2 : 
    2749            2 :         // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
    2750            2 :         // permit it to proceed: that will stick the tenant in InProgress
    2751            2 : 
    2752            2 :         let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
    2753            2 :         let (until_cleanup_started, cleanup_started) = utils::completion::channel();
    2754            2 :         let mut remove_tenant_from_memory_task = {
    2755            2 :             let jh = tokio::spawn({
    2756            2 :                 let tenants = tenants.clone();
    2757            2 :                 async move {
    2758            2 :                     let cleanup = async move {
    2759            2 :                         drop(until_cleanup_started);
    2760            2 :                         can_complete_cleanup.wait().await;
    2761            2 :                         anyhow::Ok(())
    2762            2 :                     };
    2763            2 :                     super::remove_tenant_from_memory(&tenants, id, cleanup).await
    2764            2 :                 }
    2765            2 :                 .instrument(h.span())
    2766            2 :             });
    2767            2 : 
    2768            2 :             // now the long cleanup should be in place, with the stopping state
    2769            2 :             cleanup_started.wait().await;
    2770            2 :             jh
    2771            2 :         };
    2772            2 : 
    2773            2 :         let mut shutdown_task = {
    2774            2 :             let (until_shutdown_started, shutdown_started) = utils::completion::channel();
    2775            2 : 
    2776            2 :             let shutdown_task = tokio::spawn(async move {
    2777            2 :                 drop(until_shutdown_started);
    2778            4 :                 super::shutdown_all_tenants0(&tenants).await;
    2779            2 :             });
    2780            2 : 
    2781            2 :             shutdown_started.wait().await;
    2782            2 :             shutdown_task
    2783            2 :         };
    2784            2 : 
    2785            2 :         let long_time = std::time::Duration::from_secs(15);
    2786            4 :         tokio::select! {
    2787            4 :             _ = &mut shutdown_task => unreachable!("shutdown should block on remove_tenant_from_memory completing"),
    2788            4 :             _ = &mut remove_tenant_from_memory_task => unreachable!("remove_tenant_from_memory_task should not complete until explicitly unblocked"),
    2789            4 :             _ = tokio::time::sleep(long_time) => {},
    2790            4 :         }
    2791            2 : 
    2792            2 :         drop(until_cleanup_completed);
    2793            2 : 
    2794            2 :         // Now that we allow it to proceed, shutdown should complete immediately
    2795            2 :         remove_tenant_from_memory_task.await.unwrap().unwrap();
    2796            2 :         shutdown_task.await.unwrap();
    2797            2 :     }
    2798              : }
        

Generated by: LCOV version 2.1-beta