LCOV - code coverage report
Current view: top level - pageserver/src/tenant - mgr.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 84.8 % 547 464
Test Date: 2023-09-06 10:18:01 Functions: 63.0 % 135 85

            Line data    Source code
       1              : //! This module acts as a switchboard to access different repositories managed by this
       2              : //! page server.
       3              : 
       4              : use std::collections::{hash_map, HashMap};
       5              : use std::ffi::OsStr;
       6              : use std::path::Path;
       7              : use std::sync::Arc;
       8              : use tokio::fs;
       9              : 
      10              : use anyhow::Context;
      11              : use once_cell::sync::Lazy;
      12              : use tokio::sync::RwLock;
      13              : use tokio::task::JoinSet;
      14              : use tracing::*;
      15              : 
      16              : use remote_storage::GenericRemoteStorage;
      17              : use utils::crashsafe;
      18              : 
      19              : use crate::config::PageServerConf;
      20              : use crate::context::{DownloadBehavior, RequestContext};
      21              : use crate::task_mgr::{self, TaskKind};
      22              : use crate::tenant::config::TenantConfOpt;
      23              : use crate::tenant::delete::DeleteTenantFlow;
      24              : use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
      25              : use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
      26              : 
      27              : use utils::crashsafe::path_with_suffix_extension;
      28              : use utils::fs_ext::PathExt;
      29              : use utils::generation::Generation;
      30              : use utils::id::{TenantId, TimelineId};
      31              : 
      32              : use super::delete::DeleteTenantError;
      33              : use super::timeline::delete::DeleteTimelineFlow;
      34              : use super::TenantSharedResources;
      35              : 
      36              : /// The tenants known to the pageserver.
      37              : /// The enum variants are used to distinguish the different states that the pageserver can be in.
      38              : pub(crate) enum TenantsMap {
      39              :     /// [`init_tenant_mgr`] is not done yet.
      40              :     Initializing,
      41              :     /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
      42              :     /// New tenants can be added using [`tenant_map_insert`].
      43              :     Open(HashMap<TenantId, Arc<Tenant>>),
      44              :     /// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
      45              :     /// Existing tenants are still accessible, but no new tenants can be created.
      46              :     ShuttingDown(HashMap<TenantId, Arc<Tenant>>),
      47              : }
      48              : 
      49              : impl TenantsMap {
      50        10222 :     pub(crate) fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
      51        10222 :         match self {
      52            0 :             TenantsMap::Initializing => None,
      53        10222 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id),
      54              :         }
      55        10222 :     }
      56          125 :     pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
      57          125 :         match self {
      58            0 :             TenantsMap::Initializing => None,
      59          125 :             TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
      60              :         }
      61          125 :     }
      62              : }
      63              : 
      64              : /// This is "safe" in that that it won't leave behind a partially deleted directory
      65              : /// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting
      66              : /// the contents.
      67              : ///
      68              : /// This is pageserver-specific, as it relies on future processes after a crash to check
      69              : /// for TEMP_FILE_SUFFIX when loading things.
      70           37 : async fn safe_remove_tenant_dir_all(path: impl AsRef<Path>) -> std::io::Result<()> {
      71           37 :     let parent = path
      72           37 :         .as_ref()
      73           37 :         .parent()
      74           37 :         // It is invalid to call this function with a relative path.  Tenant directories
      75           37 :         // should always have a parent.
      76           37 :         .ok_or(std::io::Error::new(
      77           37 :             std::io::ErrorKind::InvalidInput,
      78           37 :             "Path must be absolute",
      79           37 :         ))?;
      80              : 
      81           37 :     let tmp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX);
      82           37 :     fs::rename(&path, &tmp_path).await?;
      83           37 :     fs::File::open(parent).await?.sync_all().await?;
      84           37 :     fs::remove_dir_all(tmp_path).await
      85           37 : }
      86              : 
      87          575 : static TENANTS: Lazy<RwLock<TenantsMap>> = Lazy::new(|| RwLock::new(TenantsMap::Initializing));
      88              : 
      89              : /// Initialize repositories with locally available timelines.
      90              : /// Timelines that are only partially available locally (remote storage has more data than this pageserver)
      91              : /// are scheduled for download and added to the tenant once download is completed.
      92         2875 : #[instrument(skip_all)]
      93              : pub async fn init_tenant_mgr(
      94              :     conf: &'static PageServerConf,
      95              :     resources: TenantSharedResources,
      96              :     init_order: InitializationOrder,
      97              : ) -> anyhow::Result<()> {
      98              :     // Scan local filesystem for attached tenants
      99              :     let tenants_dir = conf.tenants_path();
     100              : 
     101              :     let mut tenants = HashMap::new();
     102              : 
     103              :     let mut dir_entries = fs::read_dir(&tenants_dir)
     104              :         .await
     105            0 :         .with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
     106              : 
     107              :     let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
     108              : 
     109              :     loop {
     110              :         match dir_entries.next_entry().await {
     111              :             Ok(None) => break,
     112              :             Ok(Some(dir_entry)) => {
     113              :                 let tenant_dir_path = dir_entry.path();
     114              :                 if crate::is_temporary(&tenant_dir_path) {
     115            0 :                     info!(
     116            0 :                         "Found temporary tenant directory, removing: {}",
     117            0 :                         tenant_dir_path.display()
     118            0 :                     );
     119              :                     // No need to use safe_remove_tenant_dir_all because this is already
     120              :                     // a temporary path
     121              :                     if let Err(e) = fs::remove_dir_all(&tenant_dir_path).await {
     122            0 :                         error!(
     123            0 :                             "Failed to remove temporary directory '{}': {:?}",
     124            0 :                             tenant_dir_path.display(),
     125            0 :                             e
     126            0 :                         );
     127              :                     }
     128              :                 } else {
     129              :                     // This case happens if we:
     130              :                     // * crash during attach before creating the attach marker file
     131              :                     // * crash during tenant delete before removing tenant directory
     132            0 :                     let is_empty = tenant_dir_path.is_empty_dir().with_context(|| {
     133            0 :                         format!("Failed to check whether {tenant_dir_path:?} is an empty dir")
     134            0 :                     })?;
     135              :                     if is_empty {
     136            3 :                         info!("removing empty tenant directory {tenant_dir_path:?}");
     137              :                         if let Err(e) = fs::remove_dir(&tenant_dir_path).await {
     138            0 :                             error!(
     139            0 :                                 "Failed to remove empty tenant directory '{}': {e:#}",
     140            0 :                                 tenant_dir_path.display()
     141            0 :                             )
     142              :                         }
     143              :                         continue;
     144              :                     }
     145              : 
     146              :                     let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
     147              :                     if tenant_ignore_mark_file.exists() {
     148            2 :                         info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
     149              :                         continue;
     150              :                     }
     151              : 
     152              :                     match schedule_local_tenant_processing(
     153              :                         conf,
     154              :                         &tenant_dir_path,
     155              :                         resources.clone(),
     156              :                         Some(init_order.clone()),
     157              :                         &TENANTS,
     158              :                         &ctx,
     159              :                     ) {
     160              :                         Ok(tenant) => {
     161              :                             tenants.insert(tenant.tenant_id(), tenant);
     162              :                         }
     163              :                         Err(e) => {
     164            0 :                             error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}");
     165              :                         }
     166              :                     }
     167              :                 }
     168              :             }
     169              :             Err(e) => {
     170              :                 // On error, print it, but continue with the other tenants. If we error out
     171              :                 // here, the pageserver startup fails altogether, causing outage for *all*
     172              :                 // tenants. That seems worse.
     173            0 :                 error!(
     174            0 :                     "Failed to list tenants dir entry in directory {tenants_dir:?}, reason: {e:?}"
     175            0 :                 );
     176              :             }
     177              :         }
     178              :     }
     179              : 
     180          575 :     info!("Processed {} local tenants at startup", tenants.len());
     181              : 
     182              :     let mut tenants_map = TENANTS.write().await;
     183              :     assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
     184              :     *tenants_map = TenantsMap::Open(tenants);
     185              :     Ok(())
     186              : }
     187              : 
     188          739 : pub(crate) fn schedule_local_tenant_processing(
     189          739 :     conf: &'static PageServerConf,
     190          739 :     tenant_path: &Path,
     191          739 :     resources: TenantSharedResources,
     192          739 :     init_order: Option<InitializationOrder>,
     193          739 :     tenants: &'static tokio::sync::RwLock<TenantsMap>,
     194          739 :     ctx: &RequestContext,
     195          739 : ) -> anyhow::Result<Arc<Tenant>> {
     196          739 :     anyhow::ensure!(
     197          739 :         tenant_path.is_dir(),
     198            0 :         "Cannot load tenant from path {tenant_path:?}, it either does not exist or not a directory"
     199              :     );
     200          739 :     anyhow::ensure!(
     201          739 :         !crate::is_temporary(tenant_path),
     202            0 :         "Cannot load tenant from temporary path {tenant_path:?}"
     203              :     );
     204              :     anyhow::ensure!(
     205          739 :         !tenant_path.is_empty_dir().with_context(|| {
     206            0 :             format!("Failed to check whether {tenant_path:?} is an empty dir")
     207          739 :         })?,
     208            0 :         "Cannot load tenant from empty directory {tenant_path:?}"
     209              :     );
     210              : 
     211          739 :     let tenant_id = tenant_path
     212          739 :         .file_name()
     213          739 :         .and_then(OsStr::to_str)
     214          739 :         .unwrap_or_default()
     215          739 :         .parse::<TenantId>()
     216          739 :         .with_context(|| {
     217            0 :             format!("Could not parse tenant id out of the tenant dir name in path {tenant_path:?}")
     218          739 :         })?;
     219              : 
     220          739 :     let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
     221          739 :     anyhow::ensure!(
     222          739 :         !conf.tenant_ignore_mark_file_path(&tenant_id).exists(),
     223            0 :         "Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
     224              :     );
     225              : 
     226          739 :     let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() {
     227           42 :         info!("tenant {tenant_id} has attaching mark file, resuming its attach operation");
     228           42 :         if let Some(remote_storage) = resources.remote_storage {
     229           42 :             match Tenant::spawn_attach(
     230           42 :                 conf,
     231           42 :                 tenant_id,
     232           42 :                 Generation::none(),
     233           42 :                 resources.broker_client,
     234           42 :                 tenants,
     235           42 :                 remote_storage,
     236           42 :                 ctx,
     237           42 :             ) {
     238           42 :                 Ok(tenant) => tenant,
     239            0 :                 Err(e) => {
     240            0 :                     error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
     241            0 :                     Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"))
     242              :                 }
     243              :             }
     244              :         } else {
     245            0 :             warn!("tenant {tenant_id} has attaching mark file, but pageserver has no remote storage configured");
     246            0 :             Tenant::create_broken_tenant(
     247            0 :                 conf,
     248            0 :                 tenant_id,
     249            0 :                 "attaching mark file present but no remote storage configured".to_string(),
     250            0 :             )
     251              :         }
     252              :     } else {
     253          697 :         info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
     254              :         // Start loading the tenant into memory. It will initially be in Loading state.
     255          697 :         Tenant::spawn_load(
     256          697 :             conf,
     257          697 :             tenant_id,
     258          697 :             Generation::none(),
     259          697 :             resources,
     260          697 :             init_order,
     261          697 :             tenants,
     262          697 :             ctx,
     263          697 :         )
     264              :     };
     265          739 :     Ok(tenant)
     266          739 : }
     267              : 
     268              : ///
     269              : /// Shut down all tenants. This runs as part of pageserver shutdown.
     270              : ///
     271              : /// NB: We leave the tenants in the map, so that they remain accessible through
     272              : /// the management API until we shut it down. If we removed the shut-down tenants
     273              : /// from the tenants map, the management API would return 404 for these tenants,
     274              : /// because TenantsMap::get() now returns `None`.
     275              : /// That could be easily misinterpreted by control plane, the consumer of the
     276              : /// management API. For example, it could attach the tenant on a different pageserver.
     277              : /// We would then be in split-brain once this pageserver restarts.
     278          444 : #[instrument(skip_all)]
     279              : pub async fn shutdown_all_tenants() {
     280              :     shutdown_all_tenants0(&TENANTS).await
     281              : }
     282              : 
     283          149 : async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock<TenantsMap>) {
     284              :     use utils::completion;
     285              : 
     286              :     // Prevent new tenants from being created.
     287          149 :     let tenants_to_shut_down = {
     288          149 :         let mut m = tenants.write().await;
     289          149 :         match &mut *m {
     290              :             TenantsMap::Initializing => {
     291            0 :                 *m = TenantsMap::ShuttingDown(HashMap::default());
     292            0 :                 info!("tenants map is empty");
     293            0 :                 return;
     294              :             }
     295          149 :             TenantsMap::Open(tenants) => {
     296          149 :                 let tenants_clone = tenants.clone();
     297          149 :                 *m = TenantsMap::ShuttingDown(std::mem::take(tenants));
     298          149 :                 tenants_clone
     299              :             }
     300              :             TenantsMap::ShuttingDown(_) => {
     301              :                 // TODO: it is possible that detach and shutdown happen at the same time. as a
     302              :                 // result, during shutdown we do not wait for detach.
     303            0 :                 error!("already shutting down, this function isn't supposed to be called more than once");
     304            0 :                 return;
     305              :             }
     306              :         }
     307              :     };
     308              : 
     309          149 :     let started_at = std::time::Instant::now();
     310          149 :     let mut join_set = JoinSet::new();
     311          313 :     for (tenant_id, tenant) in tenants_to_shut_down {
     312          164 :         join_set.spawn(
     313          164 :             async move {
     314          164 :                 let freeze_and_flush = true;
     315              : 
     316          164 :                 let res = {
     317          164 :                     let (_guard, shutdown_progress) = completion::channel();
     318          471 :                     tenant.shutdown(shutdown_progress, freeze_and_flush).await
     319              :                 };
     320              : 
     321          164 :                 if let Err(other_progress) = res {
     322              :                     // join the another shutdown in progress
     323            1 :                     other_progress.wait().await;
     324          163 :                 }
     325              : 
     326              :                 // we cannot afford per tenant logging here, because if s3 is degraded, we are
     327              :                 // going to log too many lines
     328              : 
     329          164 :                 debug!("tenant successfully stopped");
     330          164 :             }
     331          164 :             .instrument(info_span!("shutdown", %tenant_id)),
     332              :         );
     333              :     }
     334              : 
     335          149 :     let total = join_set.len();
     336          149 :     let mut panicked = 0;
     337          149 :     let mut buffering = true;
     338          149 :     const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
     339          149 :     let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
     340              : 
     341          318 :     while !join_set.is_empty() {
     342          169 :         tokio::select! {
     343          164 :             Some(joined) = join_set.join_next() => {
     344              :                 match joined {
     345              :                     Ok(()) => {}
     346              :                     Err(join_error) if join_error.is_cancelled() => {
     347              :                         unreachable!("we are not cancelling any of the futures");
     348              :                     }
     349              :                     Err(join_error) if join_error.is_panic() => {
     350              :                         // cannot really do anything, as this panic is likely a bug
     351              :                         panicked += 1;
     352              :                     }
     353              :                     Err(join_error) => {
     354            0 :                         warn!("unknown kind of JoinError: {join_error}");
     355              :                     }
     356              :                 }
     357              :                 if !buffering {
     358              :                     // buffer so that every 500ms since the first update (or starting) we'll log
     359              :                     // how far away we are; this is because we will get SIGKILL'd at 10s, and we
     360              :                     // are not able to log *then*.
     361              :                     buffering = true;
     362              :                     buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
     363              :                 }
     364              :             },
     365              :             _ = &mut buffered, if buffering => {
     366              :                 buffering = false;
     367            5 :                 info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
     368              :             }
     369              :         }
     370              :     }
     371              : 
     372          149 :     if panicked > 0 {
     373            0 :         warn!(
     374            0 :             panicked,
     375            0 :             total, "observed panicks while shutting down tenants"
     376            0 :         );
     377          149 :     }
     378              : 
     379              :     // caller will log how long we took
     380          149 : }
     381              : 
     382          480 : pub async fn create_tenant(
     383          480 :     conf: &'static PageServerConf,
     384          480 :     tenant_conf: TenantConfOpt,
     385          480 :     tenant_id: TenantId,
     386          480 :     broker_client: storage_broker::BrokerClientChannel,
     387          480 :     remote_storage: Option<GenericRemoteStorage>,
     388          480 :     ctx: &RequestContext,
     389          480 : ) -> Result<Arc<Tenant>, TenantMapInsertError> {
     390          480 :     tenant_map_insert(tenant_id, || async {
     391              :         // We're holding the tenants lock in write mode while doing local IO.
     392              :         // If this section ever becomes contentious, introduce a new `TenantState::Creating`
     393              :         // and do the work in that state.
     394          480 :         let tenant_directory = super::create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Create).await?;
     395              :         // TODO: tenant directory remains on disk if we bail out from here on.
     396              :         //       See https://github.com/neondatabase/neon/issues/4233
     397              : 
     398          479 :         let tenant_resources = TenantSharedResources {
     399          479 :             broker_client,
     400          479 :             remote_storage,
     401          479 :         };
     402          479 :         let created_tenant =
     403          479 :             schedule_local_tenant_processing(conf, &tenant_directory, tenant_resources, None, &TENANTS, ctx)?;
     404              :         // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
     405              :         //      See https://github.com/neondatabase/neon/issues/4233
     406              : 
     407          479 :         let crated_tenant_id = created_tenant.tenant_id();
     408          479 :         anyhow::ensure!(
     409          479 :                 tenant_id == crated_tenant_id,
     410            0 :                 "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {crated_tenant_id})",
     411              :             );
     412          479 :         Ok(created_tenant)
     413          480 :     }).await
     414          480 : }
     415              : 
     416            0 : #[derive(Debug, thiserror::Error)]
     417              : pub enum SetNewTenantConfigError {
     418              :     #[error(transparent)]
     419              :     GetTenant(#[from] GetTenantError),
     420              :     #[error(transparent)]
     421              :     Persist(anyhow::Error),
     422              : }
     423              : 
     424           27 : pub async fn set_new_tenant_config(
     425           27 :     conf: &'static PageServerConf,
     426           27 :     new_tenant_conf: TenantConfOpt,
     427           27 :     tenant_id: TenantId,
     428           27 : ) -> Result<(), SetNewTenantConfigError> {
     429           27 :     info!("configuring tenant {tenant_id}");
     430           27 :     let tenant = get_tenant(tenant_id, true).await?;
     431              : 
     432           27 :     let tenant_config_path = conf.tenant_config_path(&tenant_id);
     433           27 :     Tenant::persist_tenant_config(&tenant_id, &tenant_config_path, new_tenant_conf)
     434            0 :         .await
     435           27 :         .map_err(SetNewTenantConfigError::Persist)?;
     436           27 :     tenant.set_new_tenant_config(new_tenant_conf);
     437           27 :     Ok(())
     438           27 : }
     439              : 
     440           16 : #[derive(Debug, thiserror::Error)]
     441              : pub enum GetTenantError {
     442              :     #[error("Tenant {0} not found")]
     443              :     NotFound(TenantId),
     444              :     #[error("Tenant {0} is not active")]
     445              :     NotActive(TenantId),
     446              : }
     447              : 
     448              : /// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
     449              : /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
     450         9536 : pub async fn get_tenant(
     451         9536 :     tenant_id: TenantId,
     452         9536 :     active_only: bool,
     453         9536 : ) -> Result<Arc<Tenant>, GetTenantError> {
     454         9536 :     let m = TENANTS.read().await;
     455         9536 :     let tenant = m
     456         9536 :         .get(&tenant_id)
     457         9536 :         .ok_or(GetTenantError::NotFound(tenant_id))?;
     458         9443 :     if active_only && !tenant.is_active() {
     459            4 :         Err(GetTenantError::NotActive(tenant_id))
     460              :     } else {
     461         9439 :         Ok(Arc::clone(tenant))
     462              :     }
     463         9536 : }
     464              : 
     465          132 : pub async fn delete_tenant(
     466          132 :     conf: &'static PageServerConf,
     467          132 :     remote_storage: Option<GenericRemoteStorage>,
     468          132 :     tenant_id: TenantId,
     469          132 : ) -> Result<(), DeleteTenantError> {
     470          675 :     DeleteTenantFlow::run(conf, remote_storage, &TENANTS, tenant_id).await
     471          132 : }
     472              : 
     473            0 : #[derive(Debug, thiserror::Error)]
     474              : pub enum DeleteTimelineError {
     475              :     #[error("Tenant {0}")]
     476              :     Tenant(#[from] GetTenantError),
     477              : 
     478              :     #[error("Timeline {0}")]
     479              :     Timeline(#[from] crate::tenant::DeleteTimelineError),
     480              : }
     481              : 
     482          121 : pub async fn delete_timeline(
     483          121 :     tenant_id: TenantId,
     484          121 :     timeline_id: TimelineId,
     485          121 :     _ctx: &RequestContext,
     486          121 : ) -> Result<(), DeleteTimelineError> {
     487          121 :     let tenant = get_tenant(tenant_id, true).await?;
     488          543 :     DeleteTimelineFlow::run(&tenant, timeline_id, false).await?;
     489          103 :     Ok(())
     490          121 : }
     491              : 
     492            0 : #[derive(Debug, thiserror::Error)]
     493              : pub enum TenantStateError {
     494              :     #[error("Tenant {0} not found")]
     495              :     NotFound(TenantId),
     496              :     #[error("Tenant {0} is stopping")]
     497              :     IsStopping(TenantId),
     498              :     #[error("Tenant {0} is not active")]
     499              :     NotActive(TenantId),
     500              :     #[error(transparent)]
     501              :     Other(#[from] anyhow::Error),
     502              : }
     503              : 
     504           40 : pub async fn detach_tenant(
     505           40 :     conf: &'static PageServerConf,
     506           40 :     tenant_id: TenantId,
     507           40 :     detach_ignored: bool,
     508           40 : ) -> Result<(), TenantStateError> {
     509          270 :     detach_tenant0(conf, &TENANTS, tenant_id, detach_ignored).await
     510           40 : }
     511              : 
     512           40 : async fn detach_tenant0(
     513           40 :     conf: &'static PageServerConf,
     514           40 :     tenants: &tokio::sync::RwLock<TenantsMap>,
     515           40 :     tenant_id: TenantId,
     516           40 :     detach_ignored: bool,
     517           40 : ) -> Result<(), TenantStateError> {
     518           40 :     let local_files_cleanup_operation = |tenant_id_to_clean| async move {
     519           37 :         let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
     520           37 :         safe_remove_tenant_dir_all(&local_tenant_directory)
     521          148 :             .await
     522           37 :             .with_context(|| {
     523            0 :                 format!("local tenant directory {local_tenant_directory:?} removal")
     524           37 :             })?;
     525           37 :         Ok(())
     526           40 :     };
     527              : 
     528           40 :     let removal_result =
     529           40 :         remove_tenant_from_memory(tenants, tenant_id, local_files_cleanup_operation(tenant_id))
     530          266 :             .await;
     531              : 
     532              :     // Ignored tenants are not present in memory and will bail the removal from memory operation.
     533              :     // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
     534           40 :     if detach_ignored && matches!(removal_result, Err(TenantStateError::NotFound(_))) {
     535            1 :         let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
     536            1 :         if tenant_ignore_mark.exists() {
     537            1 :             info!("Detaching an ignored tenant");
     538            1 :             local_files_cleanup_operation(tenant_id)
     539            4 :                 .await
     540            1 :                 .with_context(|| format!("Ignored tenant {tenant_id} local files cleanup"))?;
     541            1 :             return Ok(());
     542            0 :         }
     543           39 :     }
     544              : 
     545           39 :     removal_result
     546           40 : }
     547              : 
     548            7 : pub async fn load_tenant(
     549            7 :     conf: &'static PageServerConf,
     550            7 :     tenant_id: TenantId,
     551            7 :     broker_client: storage_broker::BrokerClientChannel,
     552            7 :     remote_storage: Option<GenericRemoteStorage>,
     553            7 :     ctx: &RequestContext,
     554            7 : ) -> Result<(), TenantMapInsertError> {
     555            7 :     tenant_map_insert(tenant_id, || async {
     556            6 :         let tenant_path = conf.tenant_path(&tenant_id);
     557            6 :         let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
     558            6 :         if tenant_ignore_mark.exists() {
     559            6 :             std::fs::remove_file(&tenant_ignore_mark)
     560            6 :                 .with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
     561            0 :         }
     562              : 
     563            6 :         let resources = TenantSharedResources {
     564            6 :             broker_client,
     565            6 :             remote_storage,
     566            6 :         };
     567            6 :         let new_tenant = schedule_local_tenant_processing(conf, &tenant_path,  resources, None,  &TENANTS, ctx)
     568            6 :             .with_context(|| {
     569            0 :                 format!("Failed to schedule tenant processing in path {tenant_path:?}")
     570            6 :             })?;
     571              : 
     572            6 :         Ok(new_tenant)
     573            7 :     }).await?;
     574            6 :     Ok(())
     575            7 : }
     576              : 
     577            8 : pub async fn ignore_tenant(
     578            8 :     conf: &'static PageServerConf,
     579            8 :     tenant_id: TenantId,
     580            8 : ) -> Result<(), TenantStateError> {
     581           34 :     ignore_tenant0(conf, &TENANTS, tenant_id).await
     582            8 : }
     583              : 
     584            8 : async fn ignore_tenant0(
     585            8 :     conf: &'static PageServerConf,
     586            8 :     tenants: &tokio::sync::RwLock<TenantsMap>,
     587            8 :     tenant_id: TenantId,
     588            8 : ) -> Result<(), TenantStateError> {
     589            8 :     remove_tenant_from_memory(tenants, tenant_id, async {
     590            8 :         let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_id);
     591            8 :         fs::File::create(&ignore_mark_file)
     592            8 :             .await
     593            8 :             .context("Failed to create ignore mark file")
     594            8 :             .and_then(|_| {
     595            8 :                 crashsafe::fsync_file_and_parent(&ignore_mark_file)
     596            8 :                     .context("Failed to fsync ignore mark file")
     597            8 :             })
     598            8 :             .with_context(|| format!("Failed to crate ignore mark for tenant {tenant_id}"))?;
     599            8 :         Ok(())
     600            8 :     })
     601           34 :     .await
     602            8 : }
     603              : 
     604            0 : #[derive(Debug, thiserror::Error)]
     605              : pub enum TenantMapListError {
     606              :     #[error("tenant map is still initiailizing")]
     607              :     Initializing,
     608              : }
     609              : 
     610              : ///
     611              : /// Get list of tenants, for the mgmt API
     612              : ///
     613          144 : pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
     614          144 :     let tenants = TENANTS.read().await;
     615          144 :     let m = match &*tenants {
     616            0 :         TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
     617          144 :         TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
     618          144 :     };
     619          144 :     Ok(m.iter()
     620          200 :         .map(|(id, tenant)| (*id, tenant.current_state()))
     621          144 :         .collect())
     622          144 : }
     623              : 
     624              : /// Execute Attach mgmt API command.
     625              : ///
     626              : /// Downloading all the tenant data is performed in the background, this merely
     627              : /// spawns the background task and returns quickly.
     628           48 : pub async fn attach_tenant(
     629           48 :     conf: &'static PageServerConf,
     630           48 :     tenant_id: TenantId,
     631           48 :     tenant_conf: TenantConfOpt,
     632           48 :     broker_client: storage_broker::BrokerClientChannel,
     633           48 :     remote_storage: GenericRemoteStorage,
     634           48 :     ctx: &RequestContext,
     635           48 : ) -> Result<(), TenantMapInsertError> {
     636           48 :     tenant_map_insert(tenant_id, || async {
     637           41 :         let tenant_dir = create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Attach).await?;
     638              :         // TODO: tenant directory remains on disk if we bail out from here on.
     639              :         //       See https://github.com/neondatabase/neon/issues/4233
     640              : 
     641              :         // Without the attach marker, schedule_local_tenant_processing will treat the attached tenant as fully attached
     642           39 :         let marker_file_exists = conf
     643           39 :             .tenant_attaching_mark_file_path(&tenant_id)
     644           39 :             .try_exists()
     645           39 :             .context("check for attach marker file existence")?;
     646           39 :         anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
     647              : 
     648           39 :         let resources = TenantSharedResources {
     649           39 :             broker_client,
     650           39 :             remote_storage: Some(remote_storage),
     651           39 :         };
     652           39 :         let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, resources, None, &TENANTS, ctx)?;
     653              :         // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
     654              :         //      See https://github.com/neondatabase/neon/issues/4233
     655              : 
     656           39 :         let attached_tenant_id = attached_tenant.tenant_id();
     657           39 :         anyhow::ensure!(
     658           39 :             tenant_id == attached_tenant_id,
     659            0 :             "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})",
     660              :         );
     661           39 :         Ok(attached_tenant)
     662           48 :     })
     663            9 :     .await?;
     664           39 :     Ok(())
     665           48 : }
     666              : 
     667            0 : #[derive(Debug, thiserror::Error)]
     668              : pub enum TenantMapInsertError {
     669              :     #[error("tenant map is still initializing")]
     670              :     StillInitializing,
     671              :     #[error("tenant map is shutting down")]
     672              :     ShuttingDown,
     673              :     #[error("tenant {0} already exists, state: {1:?}")]
     674              :     TenantAlreadyExists(TenantId, TenantState),
     675              :     #[error(transparent)]
     676              :     Closure(#[from] anyhow::Error),
     677              : }
     678              : 
     679              : /// Give the given closure access to the tenants map entry for the given `tenant_id`, iff that
     680              : /// entry is vacant. The closure is responsible for creating the tenant object and inserting
     681              : /// it into the tenants map through the vacnt entry that it receives as argument.
     682              : ///
     683              : /// NB: the closure should return quickly because the current implementation of tenants map
     684              : /// serializes access through an `RwLock`.
     685          535 : async fn tenant_map_insert<F, R>(
     686          535 :     tenant_id: TenantId,
     687          535 :     insert_fn: F,
     688          535 : ) -> Result<Arc<Tenant>, TenantMapInsertError>
     689          535 : where
     690          535 :     F: FnOnce() -> R,
     691          535 :     R: std::future::Future<Output = anyhow::Result<Arc<Tenant>>>,
     692          535 : {
     693          535 :     let mut guard = TENANTS.write().await;
     694          535 :     let m = match &mut *guard {
     695            0 :         TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing),
     696            0 :         TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown),
     697          535 :         TenantsMap::Open(m) => m,
     698          535 :     };
     699          535 :     match m.entry(tenant_id) {
     700            8 :         hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists(
     701            8 :             tenant_id,
     702            8 :             e.get().current_state(),
     703            8 :         )),
     704          527 :         hash_map::Entry::Vacant(v) => match insert_fn().await {
     705          524 :             Ok(tenant) => {
     706          524 :                 v.insert(tenant.clone());
     707          524 :                 Ok(tenant)
     708              :             }
     709            3 :             Err(e) => Err(TenantMapInsertError::Closure(e)),
     710              :         },
     711              :     }
     712          535 : }
     713              : 
     714              : /// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
     715              : /// Allows to remove other tenant resources manually, via `tenant_cleanup`.
     716              : /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
     717              : /// operation would be needed to remove it.
     718           49 : async fn remove_tenant_from_memory<V, F>(
     719           49 :     tenants: &tokio::sync::RwLock<TenantsMap>,
     720           49 :     tenant_id: TenantId,
     721           49 :     tenant_cleanup: F,
     722           49 : ) -> Result<V, TenantStateError>
     723           49 : where
     724           49 :     F: std::future::Future<Output = anyhow::Result<V>>,
     725           49 : {
     726              :     use utils::completion;
     727              : 
     728              :     // It's important to keep the tenant in memory after the final cleanup, to avoid cleanup races.
     729              :     // The exclusive lock here ensures we don't miss the tenant state updates before trying another removal.
     730              :     // tenant-wde cleanup operations may take some time (removing the entire tenant directory), we want to
     731              :     // avoid holding the lock for the entire process.
     732           45 :     let tenant = {
     733           49 :         tenants
     734           49 :             .write()
     735            0 :             .await
     736           49 :             .get(&tenant_id)
     737           49 :             .cloned()
     738           49 :             .ok_or(TenantStateError::NotFound(tenant_id))?
     739              :     };
     740              : 
     741              :     // allow pageserver shutdown to await for our completion
     742           45 :     let (_guard, progress) = completion::channel();
     743           45 : 
     744           45 :     // whenever we remove a tenant from memory, we don't want to flush and wait for upload
     745           45 :     let freeze_and_flush = false;
     746           45 : 
     747           45 :     // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
     748           45 :     // that we can continue safely to cleanup.
     749          148 :     match tenant.shutdown(progress, freeze_and_flush).await {
     750           45 :         Ok(()) => {}
     751            0 :         Err(_other) => {
     752            0 :             // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
     753            0 :             // wait for it but return an error right away because these are distinct requests.
     754            0 :             return Err(TenantStateError::IsStopping(tenant_id));
     755              :         }
     756              :     }
     757              : 
     758           45 :     match tenant_cleanup
     759          153 :         .await
     760           45 :         .with_context(|| format!("Failed to run cleanup for tenant {tenant_id}"))
     761              :     {
     762           45 :         Ok(hook_value) => {
     763           45 :             let mut tenants_accessor = tenants.write().await;
     764           45 :             if tenants_accessor.remove(&tenant_id).is_none() {
     765            0 :                 warn!("Tenant {tenant_id} got removed from memory before operation finished");
     766           45 :             }
     767           45 :             Ok(hook_value)
     768              :         }
     769            0 :         Err(e) => {
     770            0 :             let tenants_accessor = tenants.read().await;
     771            0 :             match tenants_accessor.get(&tenant_id) {
     772            0 :                 Some(tenant) => {
     773            0 :                     tenant.set_broken(e.to_string()).await;
     774              :                 }
     775              :                 None => {
     776            0 :                     warn!("Tenant {tenant_id} got removed from memory");
     777            0 :                     return Err(TenantStateError::NotFound(tenant_id));
     778              :                 }
     779              :             }
     780            0 :             Err(TenantStateError::Other(e))
     781              :         }
     782              :     }
     783           49 : }
     784              : 
     785              : use {
     786              :     crate::repository::GcResult, pageserver_api::models::TimelineGcRequest,
     787              :     utils::http::error::ApiError,
     788              : };
     789              : 
     790          505 : pub async fn immediate_gc(
     791          505 :     tenant_id: TenantId,
     792          505 :     timeline_id: TimelineId,
     793          505 :     gc_req: TimelineGcRequest,
     794          505 :     ctx: &RequestContext,
     795          505 : ) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
     796          505 :     let guard = TENANTS.read().await;
     797          505 :     let tenant = guard
     798          505 :         .get(&tenant_id)
     799          505 :         .map(Arc::clone)
     800          505 :         .with_context(|| format!("tenant {tenant_id}"))
     801          505 :         .map_err(|e| ApiError::NotFound(e.into()))?;
     802              : 
     803          504 :     let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
     804          504 :     // Use tenant's pitr setting
     805          504 :     let pitr = tenant.get_pitr_interval();
     806          504 : 
     807          504 :     // Run in task_mgr to avoid race with tenant_detach operation
     808          504 :     let ctx = ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
     809          504 :     let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
     810          504 :     task_mgr::spawn(
     811          504 :         &tokio::runtime::Handle::current(),
     812          504 :         TaskKind::GarbageCollector,
     813          504 :         Some(tenant_id),
     814          504 :         Some(timeline_id),
     815          504 :         &format!("timeline_gc_handler garbage collection run for tenant {tenant_id} timeline {timeline_id}"),
     816          504 :         false,
     817          504 :         async move {
     818            0 :             fail::fail_point!("immediate_gc_task_pre");
     819          504 :             let result = tenant
     820          504 :                 .gc_iteration(Some(timeline_id), gc_horizon, pitr, &ctx)
     821          504 :                 .instrument(info_span!("manual_gc", %tenant_id, %timeline_id))
     822          165 :                 .await;
     823              :                 // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
     824              :                 // better once the types support it.
     825          504 :             match task_done.send(result) {
     826          504 :                 Ok(_) => (),
     827            0 :                 Err(result) => error!("failed to send gc result: {result:?}"),
     828              :             }
     829          504 :             Ok(())
     830          504 :         }
     831          504 :     );
     832          504 : 
     833          504 :     // drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
     834          504 :     drop(guard);
     835          504 : 
     836          504 :     Ok(wait_task_done)
     837          505 : }
     838              : 
     839              : #[cfg(test)]
     840              : mod tests {
     841              :     use std::collections::HashMap;
     842              :     use std::sync::Arc;
     843              :     use tracing::{info_span, Instrument};
     844              : 
     845              :     use super::{super::harness::TenantHarness, TenantsMap};
     846              : 
     847            1 :     #[tokio::test(start_paused = true)]
     848            1 :     async fn shutdown_joins_remove_tenant_from_memory() {
     849              :         // the test is a bit ugly with the lockstep together with spawned tasks. the aim is to make
     850              :         // sure `shutdown_all_tenants0` per-tenant processing joins in any active
     851              :         // remove_tenant_from_memory calls, which is enforced by making the operation last until
     852              :         // we've ran `shutdown_all_tenants0` for a long time.
     853              : 
     854            1 :         let (t, _ctx) = TenantHarness::create("shutdown_joins_detach")
     855            1 :             .unwrap()
     856            1 :             .load()
     857            1 :             .await;
     858              : 
     859              :         // harness loads it to active, which is forced and nothing is running on the tenant
     860              : 
     861            1 :         let id = t.tenant_id();
     862              : 
     863              :         // tenant harness configures the logging and we cannot escape it
     864            1 :         let _e = info_span!("testing", tenant_id = %id).entered();
     865            1 : 
     866            1 :         let tenants = HashMap::from([(id, t.clone())]);
     867            1 :         let tenants = Arc::new(tokio::sync::RwLock::new(TenantsMap::Open(tenants)));
     868            1 : 
     869            1 :         let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
     870            1 :         let (until_cleanup_started, cleanup_started) = utils::completion::channel();
     871              : 
     872              :         // start a "detaching operation", which will take a while, until can_complete_cleanup
     873            1 :         let cleanup_task = {
     874            1 :             let jh = tokio::spawn({
     875            1 :                 let tenants = tenants.clone();
     876            1 :                 async move {
     877            1 :                     let cleanup = async move {
     878            1 :                         drop(until_cleanup_started);
     879            1 :                         can_complete_cleanup.wait().await;
     880            1 :                         anyhow::Ok(())
     881            1 :                     };
     882            1 :                     super::remove_tenant_from_memory(&tenants, id, cleanup).await
     883            1 :                 }
     884            1 :                 .instrument(info_span!("foobar", tenant_id = %id))
     885              :             });
     886              : 
     887              :             // now the long cleanup should be in place, with the stopping state
     888            1 :             cleanup_started.wait().await;
     889            1 :             jh
     890              :         };
     891              : 
     892            1 :         let mut cleanup_progress = std::pin::pin!(t
     893            1 :             .shutdown(utils::completion::Barrier::default(), false)
     894            0 :             .await
     895            1 :             .unwrap_err()
     896            1 :             .wait());
     897              : 
     898            1 :         let mut shutdown_task = {
     899            1 :             let (until_shutdown_started, shutdown_started) = utils::completion::channel();
     900            1 : 
     901            1 :             let shutdown_task = tokio::spawn(async move {
     902            1 :                 drop(until_shutdown_started);
     903            2 :                 super::shutdown_all_tenants0(&tenants).await;
     904            1 :             });
     905            1 : 
     906            1 :             shutdown_started.wait().await;
     907            1 :             shutdown_task
     908            1 :         };
     909            1 : 
     910            1 :         // if the joining in is removed from shutdown_all_tenants0, the shutdown_task should always
     911            1 :         // get to complete within timeout and fail the test. it is expected to continue awaiting
     912            1 :         // until completion or SIGKILL during normal shutdown.
     913            1 :         //
     914            1 :         // the timeout is long to cover anything that shutdown_task could be doing, but it is
     915            1 :         // handled instantly because we use tokio's time pausing in this test. 100s is much more than
     916            1 :         // what we get from systemd on shutdown (10s).
     917            1 :         let long_time = std::time::Duration::from_secs(100);
     918            2 :         tokio::select! {
     919            2 :             _ = &mut shutdown_task => unreachable!("shutdown must continue, until_cleanup_completed is not dropped"),
     920            2 :             _ = &mut cleanup_progress => unreachable!("cleanup progress must continue, until_cleanup_completed is not dropped"),
     921            2 :             _ = tokio::time::sleep(long_time) => {},
     922            2 :         }
     923              : 
     924              :         // allow the remove_tenant_from_memory and thus eventually the shutdown to continue
     925            1 :         drop(until_cleanup_completed);
     926              : 
     927            1 :         let (je, ()) = tokio::join!(shutdown_task, cleanup_progress);
     928            1 :         je.expect("Tenant::shutdown shutdown not have panicked");
     929            1 :         cleanup_task
     930            0 :             .await
     931            1 :             .expect("no panicking")
     932            1 :             .expect("remove_tenant_from_memory failed");
     933            1 : 
     934            1 :         futures::future::poll_immediate(
     935            1 :             t.shutdown(utils::completion::Barrier::default(), false)
     936            0 :                 .await
     937            1 :                 .unwrap_err()
     938            1 :                 .wait(),
     939              :         )
     940            0 :         .await
     941            1 :         .expect("the stopping progress must still be complete");
     942              :     }
     943              : }
        

Generated by: LCOV version 2.1-beta