Line data Source code
1 : //! This module acts as a switchboard to access different repositories managed by this
2 : //! page server.
3 :
4 : use std::borrow::Cow;
5 : use std::cmp::Ordering;
6 : use std::collections::{BTreeMap, HashMap, HashSet};
7 : use std::ops::Deref;
8 : use std::sync::Arc;
9 : use std::time::Duration;
10 :
11 : use anyhow::Context;
12 : use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
13 : use futures::StreamExt;
14 : use itertools::Itertools;
15 : use once_cell::sync::Lazy;
16 : use pageserver_api::key::Key;
17 : use pageserver_api::models::LocationConfigMode;
18 : use pageserver_api::shard::{
19 : ShardCount, ShardIdentity, ShardIndex, ShardNumber, ShardStripeSize, TenantShardId,
20 : };
21 : use pageserver_api::upcall_api::ReAttachResponseTenant;
22 : use rand::Rng;
23 : use rand::distributions::Alphanumeric;
24 : use remote_storage::TimeoutOrCancel;
25 : use sysinfo::SystemExt;
26 : use tokio::fs;
27 : use tokio::task::JoinSet;
28 : use tokio_util::sync::CancellationToken;
29 : use tracing::*;
30 : use utils::crashsafe::path_with_suffix_extension;
31 : use utils::fs_ext::PathExt;
32 : use utils::generation::Generation;
33 : use utils::id::{TenantId, TimelineId};
34 : use utils::{backoff, completion, crashsafe};
35 :
36 : use super::remote_timeline_client::remote_tenant_path;
37 : use super::secondary::SecondaryTenant;
38 : use super::timeline::detach_ancestor::{self, PreparedTimelineDetach};
39 : use super::{GlobalShutDown, TenantSharedResources};
40 : use crate::config::PageServerConf;
41 : use crate::context::{DownloadBehavior, RequestContext};
42 : use crate::controller_upcall_client::{
43 : ControlPlaneGenerationsApi, ControllerUpcallClient, RetryForeverError,
44 : };
45 : use crate::deletion_queue::DeletionQueueClient;
46 : use crate::http::routes::ACTIVE_TENANT_TIMEOUT;
47 : use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
48 : use crate::task_mgr::{BACKGROUND_RUNTIME, TaskKind};
49 : use crate::tenant::config::{
50 : AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, SecondaryLocationConfig,
51 : };
52 : use crate::tenant::span::debug_assert_current_span_has_tenant_id;
53 : use crate::tenant::storage_layer::inmemory_layer;
54 : use crate::tenant::timeline::ShutdownMode;
55 : use crate::tenant::{AttachedTenantConf, GcError, LoadConfigError, SpawnMode, Tenant, TenantState};
56 : use crate::virtual_file::MaybeFatalIo;
57 : use crate::{InitializationOrder, TEMP_FILE_SUFFIX};
58 :
59 : /// For a tenant that appears in TenantsMap, it may either be
60 : /// - `Attached`: has a full Tenant object, is elegible to service
61 : /// reads and ingest WAL.
62 : /// - `Secondary`: is only keeping a local cache warm.
63 : ///
64 : /// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
65 : /// that way we avoid having to carefully switch a tenant's ingestion etc on and off during
66 : /// its lifetime, and we can preserve some important safety invariants like `Tenant` always
67 : /// having a properly acquired generation (Secondary doesn't need a generation)
68 : #[derive(Clone)]
69 : pub(crate) enum TenantSlot {
70 : Attached(Arc<Tenant>),
71 : Secondary(Arc<SecondaryTenant>),
72 : /// In this state, other administrative operations acting on the TenantId should
73 : /// block, or return a retry indicator equivalent to HTTP 503.
74 : InProgress(utils::completion::Barrier),
75 : }
76 :
77 : impl std::fmt::Debug for TenantSlot {
78 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 0 : match self {
80 0 : Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()),
81 0 : Self::Secondary(_) => write!(f, "Secondary"),
82 0 : Self::InProgress(_) => write!(f, "InProgress"),
83 : }
84 0 : }
85 : }
86 :
87 : impl TenantSlot {
88 : /// Return the `Tenant` in this slot if attached, else None
89 0 : fn get_attached(&self) -> Option<&Arc<Tenant>> {
90 0 : match self {
91 0 : Self::Attached(t) => Some(t),
92 0 : Self::Secondary(_) => None,
93 0 : Self::InProgress(_) => None,
94 : }
95 0 : }
96 : }
97 :
98 : /// The tenants known to the pageserver.
99 : /// The enum variants are used to distinguish the different states that the pageserver can be in.
100 : pub(crate) enum TenantsMap {
101 : /// [`init_tenant_mgr`] is not done yet.
102 : Initializing,
103 : /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
104 : /// New tenants can be added using [`tenant_map_acquire_slot`].
105 : Open(BTreeMap<TenantShardId, TenantSlot>),
106 : /// The pageserver has entered shutdown mode via [`TenantManager::shutdown`].
107 : /// Existing tenants are still accessible, but no new tenants can be created.
108 : ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
109 : }
110 :
111 : /// When resolving a TenantId to a shard, we may be looking for the 0th
112 : /// shard, or we might be looking for whichever shard holds a particular page.
113 : #[derive(Copy, Clone)]
114 : pub(crate) enum ShardSelector {
115 : /// Only return the 0th shard, if it is present. If a non-0th shard is present,
116 : /// ignore it.
117 : Zero,
118 : /// Pick the shard that holds this key
119 : Page(Key),
120 : /// The shard ID is known: pick the given shard
121 : Known(ShardIndex),
122 : }
123 :
124 : /// A convenience for use with the re_attach ControllerUpcallClient function: rather
125 : /// than the serializable struct, we build this enum that encapsulates
126 : /// the invariant that attached tenants always have generations.
127 : ///
128 : /// This represents the subset of a LocationConfig that we receive during re-attach.
129 : pub(crate) enum TenantStartupMode {
130 : Attached((AttachmentMode, Generation)),
131 : Secondary,
132 : }
133 :
134 : impl TenantStartupMode {
135 : /// Return the generation & mode that should be used when starting
136 : /// this tenant.
137 : ///
138 : /// If this returns None, the re-attach struct is in an invalid state and
139 : /// should be ignored in the response.
140 0 : fn from_reattach_tenant(rart: ReAttachResponseTenant) -> Option<Self> {
141 0 : match (rart.mode, rart.r#gen) {
142 0 : (LocationConfigMode::Detached, _) => None,
143 0 : (LocationConfigMode::Secondary, _) => Some(Self::Secondary),
144 0 : (LocationConfigMode::AttachedMulti, Some(g)) => {
145 0 : Some(Self::Attached((AttachmentMode::Multi, Generation::new(g))))
146 : }
147 0 : (LocationConfigMode::AttachedSingle, Some(g)) => {
148 0 : Some(Self::Attached((AttachmentMode::Single, Generation::new(g))))
149 : }
150 0 : (LocationConfigMode::AttachedStale, Some(g)) => {
151 0 : Some(Self::Attached((AttachmentMode::Stale, Generation::new(g))))
152 : }
153 : _ => {
154 0 : tracing::warn!(
155 0 : "Received invalid re-attach state for tenant {}: {rart:?}",
156 : rart.id
157 : );
158 0 : None
159 : }
160 : }
161 0 : }
162 : }
163 :
164 : /// Result type for looking up a TenantId to a specific shard
165 : pub(crate) enum ShardResolveResult {
166 : NotFound,
167 : Found(Arc<Tenant>),
168 : // Wait for this barrrier, then query again
169 : InProgress(utils::completion::Barrier),
170 : }
171 :
172 : impl TenantsMap {
173 : /// Convenience function for typical usage, where we want to get a `Tenant` object, for
174 : /// working with attached tenants. If the TenantId is in the map but in Secondary state,
175 : /// None is returned.
176 0 : pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc<Tenant>> {
177 0 : match self {
178 0 : TenantsMap::Initializing => None,
179 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
180 0 : m.get(tenant_shard_id).and_then(|slot| slot.get_attached())
181 : }
182 : }
183 0 : }
184 :
185 : #[cfg(all(debug_assertions, not(test)))]
186 0 : pub(crate) fn len(&self) -> usize {
187 0 : match self {
188 0 : TenantsMap::Initializing => 0,
189 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.len(),
190 : }
191 0 : }
192 : }
193 :
194 : /// Precursor to deletion of a tenant dir: we do a fast rename to a tmp path, and then
195 : /// the slower actual deletion in the background.
196 : ///
197 : /// This is "safe" in that that it won't leave behind a partially deleted directory
198 : /// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting
199 : /// the contents.
200 : ///
201 : /// This is pageserver-specific, as it relies on future processes after a crash to check
202 : /// for TEMP_FILE_SUFFIX when loading things.
203 0 : async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<Utf8PathBuf> {
204 0 : let parent = path
205 0 : .as_ref()
206 0 : .parent()
207 0 : // It is invalid to call this function with a relative path. Tenant directories
208 0 : // should always have a parent.
209 0 : .ok_or(std::io::Error::new(
210 0 : std::io::ErrorKind::InvalidInput,
211 0 : "Path must be absolute",
212 0 : ))?;
213 0 : let rand_suffix = rand::thread_rng()
214 0 : .sample_iter(&Alphanumeric)
215 0 : .take(8)
216 0 : .map(char::from)
217 0 : .collect::<String>()
218 0 : + TEMP_FILE_SUFFIX;
219 0 : let tmp_path = path_with_suffix_extension(&path, &rand_suffix);
220 0 : fs::rename(path.as_ref(), &tmp_path).await?;
221 0 : fs::File::open(parent)
222 0 : .await?
223 0 : .sync_all()
224 0 : .await
225 0 : .maybe_fatal_err("safe_rename_tenant_dir")?;
226 0 : Ok(tmp_path)
227 0 : }
228 :
229 : /// See [`Self::spawn`].
230 : #[derive(Clone, Default)]
231 : pub struct BackgroundPurges(tokio_util::task::TaskTracker);
232 :
233 : impl BackgroundPurges {
234 : /// When we have moved a tenant's content to a temporary directory, we may delete it lazily in
235 : /// the background, and thereby avoid blocking any API requests on this deletion completing.
236 : ///
237 : /// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
238 : /// Thus the [`BackgroundPurges`] type to keep track of these tasks.
239 0 : pub fn spawn(&self, tmp_path: Utf8PathBuf) {
240 0 : // because on shutdown we close and wait, we are misusing TaskTracker a bit.
241 0 : //
242 0 : // so first acquire a token, then check if the tracker has been closed. the tracker might get closed
243 0 : // right after, but at least the shutdown will wait for what we are spawning next.
244 0 : let token = self.0.token();
245 0 :
246 0 : if self.0.is_closed() {
247 0 : warn!(
248 : %tmp_path,
249 0 : "trying to spawn background purge during shutdown, ignoring"
250 : );
251 0 : return;
252 0 : }
253 :
254 0 : let span = info_span!(parent: None, "background_purge", %tmp_path);
255 :
256 0 : let task = move || {
257 0 : let _token = token;
258 0 : let _entered = span.entered();
259 0 : if let Err(error) = std::fs::remove_dir_all(tmp_path.as_path()) {
260 : // should we fatal_io_error here?
261 0 : warn!(%error, "failed to purge tenant directory");
262 0 : }
263 0 : };
264 :
265 0 : BACKGROUND_RUNTIME.spawn_blocking(task);
266 0 : }
267 :
268 : /// When this future completes, all background purges have completed.
269 : /// The first poll of the future will already lock out new background purges spawned via [`Self::spawn`].
270 : ///
271 : /// Concurrent calls will coalesce.
272 : ///
273 : /// # Cancellation-Safety
274 : ///
275 : /// If this future is dropped before polled to completion, concurrent and subsequent
276 : /// instances of this future will continue to be correct.
277 : #[instrument(skip_all)]
278 : pub async fn shutdown(&self) {
279 : // forbid new tasks (can be called many times)
280 : self.0.close();
281 : self.0.wait().await;
282 : }
283 : }
284 :
285 : static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
286 4 : Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
287 :
288 : /// Responsible for storing and mutating the collection of all tenants
289 : /// that this pageserver has state for.
290 : ///
291 : /// Every Tenant and SecondaryTenant instance lives inside the TenantManager.
292 : ///
293 : /// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
294 : /// the same tenant twice concurrently, or trying to configure the same tenant into secondary
295 : /// and attached modes concurrently.
296 : pub struct TenantManager {
297 : conf: &'static PageServerConf,
298 : // TODO: currently this is a &'static pointing to TENANTs. When we finish refactoring
299 : // out of that static variable, the TenantManager can own this.
300 : // See https://github.com/neondatabase/neon/issues/5796
301 : tenants: &'static std::sync::RwLock<TenantsMap>,
302 : resources: TenantSharedResources,
303 :
304 : // Long-running operations that happen outside of a [`Tenant`] lifetime should respect this token.
305 : // This is for edge cases like tenant deletion. In normal cases (within a Tenant lifetime),
306 : // tenants have their own cancellation tokens, which we fire individually in [`Self::shutdown`], or
307 : // when the tenant detaches.
308 : cancel: CancellationToken,
309 :
310 : background_purges: BackgroundPurges,
311 : }
312 :
313 0 : fn emergency_generations(
314 0 : tenant_confs: &HashMap<TenantShardId, Result<LocationConf, LoadConfigError>>,
315 0 : ) -> HashMap<TenantShardId, TenantStartupMode> {
316 0 : tenant_confs
317 0 : .iter()
318 0 : .filter_map(|(tid, lc)| {
319 0 : let lc = match lc {
320 0 : Ok(lc) => lc,
321 0 : Err(_) => return None,
322 : };
323 : Some((
324 0 : *tid,
325 0 : match &lc.mode {
326 0 : LocationMode::Attached(alc) => {
327 0 : TenantStartupMode::Attached((alc.attach_mode, alc.generation))
328 : }
329 0 : LocationMode::Secondary(_) => TenantStartupMode::Secondary,
330 : },
331 : ))
332 0 : })
333 0 : .collect()
334 0 : }
335 :
336 0 : async fn init_load_generations(
337 0 : conf: &'static PageServerConf,
338 0 : tenant_confs: &HashMap<TenantShardId, Result<LocationConf, LoadConfigError>>,
339 0 : resources: &TenantSharedResources,
340 0 : cancel: &CancellationToken,
341 0 : ) -> anyhow::Result<Option<HashMap<TenantShardId, TenantStartupMode>>> {
342 0 : let generations = if conf.control_plane_emergency_mode {
343 0 : error!(
344 0 : "Emergency mode! Tenants will be attached unsafely using their last known generation"
345 : );
346 0 : emergency_generations(tenant_confs)
347 0 : } else if let Some(client) = ControllerUpcallClient::new(conf, cancel) {
348 0 : info!("Calling {} API to re-attach tenants", client.base_url());
349 : // If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
350 0 : match client.re_attach(conf).await {
351 0 : Ok(tenants) => tenants
352 0 : .into_iter()
353 0 : .flat_map(|(id, rart)| {
354 0 : TenantStartupMode::from_reattach_tenant(rart).map(|tsm| (id, tsm))
355 0 : })
356 0 : .collect(),
357 : Err(RetryForeverError::ShuttingDown) => {
358 0 : anyhow::bail!("Shut down while waiting for control plane re-attach response")
359 : }
360 : }
361 : } else {
362 0 : info!("Control plane API not configured, tenant generations are disabled");
363 0 : return Ok(None);
364 : };
365 :
366 : // The deletion queue needs to know about the startup attachment state to decide which (if any) stored
367 : // deletion list entries may still be valid. We provide that by pushing a recovery operation into
368 : // the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
369 : // are processed, even though we don't block on recovery completing here.
370 0 : let attached_tenants = generations
371 0 : .iter()
372 0 : .flat_map(|(id, start_mode)| {
373 0 : match start_mode {
374 0 : TenantStartupMode::Attached((_mode, generation)) => Some(generation),
375 0 : TenantStartupMode::Secondary => None,
376 : }
377 0 : .map(|gen_| (*id, *gen_))
378 0 : })
379 0 : .collect();
380 0 : resources.deletion_queue_client.recover(attached_tenants)?;
381 :
382 0 : Ok(Some(generations))
383 0 : }
384 :
385 : /// Given a directory discovered in the pageserver's tenants/ directory, attempt
386 : /// to load a tenant config from it.
387 : ///
388 : /// If we cleaned up something expected (like an empty dir or a temp dir), return None.
389 0 : fn load_tenant_config(
390 0 : conf: &'static PageServerConf,
391 0 : tenant_shard_id: TenantShardId,
392 0 : dentry: Utf8DirEntry,
393 0 : ) -> Option<Result<LocationConf, LoadConfigError>> {
394 0 : let tenant_dir_path = dentry.path().to_path_buf();
395 0 : if crate::is_temporary(&tenant_dir_path) {
396 0 : info!("Found temporary tenant directory, removing: {tenant_dir_path}");
397 : // No need to use safe_remove_tenant_dir_all because this is already
398 : // a temporary path
399 0 : std::fs::remove_dir_all(&tenant_dir_path).fatal_err("delete temporary tenant dir");
400 0 : return None;
401 0 : }
402 0 :
403 0 : // This case happens if we crash during attachment before writing a config into the dir
404 0 : let is_empty = tenant_dir_path
405 0 : .is_empty_dir()
406 0 : .fatal_err("Checking for empty tenant dir");
407 0 : if is_empty {
408 0 : info!("removing empty tenant directory {tenant_dir_path:?}");
409 0 : std::fs::remove_dir(&tenant_dir_path).fatal_err("delete empty tenant dir");
410 0 : return None;
411 0 : }
412 0 :
413 0 : Some(Tenant::load_tenant_config(conf, &tenant_shard_id))
414 0 : }
415 :
416 : /// Initial stage of load: walk the local tenants directory, clean up any temp files,
417 : /// and load configurations for the tenants we found.
418 : ///
419 : /// Do this in parallel, because we expect 10k+ tenants, so serial execution can take
420 : /// seconds even on reasonably fast drives.
421 0 : async fn init_load_tenant_configs(
422 0 : conf: &'static PageServerConf,
423 0 : ) -> HashMap<TenantShardId, Result<LocationConf, LoadConfigError>> {
424 0 : let tenants_dir = conf.tenants_path();
425 :
426 0 : let dentries = tokio::task::spawn_blocking(move || -> Vec<Utf8DirEntry> {
427 0 : let context = format!("read tenants dir {tenants_dir}");
428 0 : let dir_entries = tenants_dir.read_dir_utf8().fatal_err(&context);
429 0 :
430 0 : dir_entries
431 0 : .collect::<Result<Vec<_>, std::io::Error>>()
432 0 : .fatal_err(&context)
433 0 : })
434 0 : .await
435 0 : .expect("Config load task panicked");
436 0 :
437 0 : let mut configs = HashMap::new();
438 0 :
439 0 : let mut join_set = JoinSet::new();
440 0 : for dentry in dentries {
441 0 : let tenant_shard_id = match dentry.file_name().parse::<TenantShardId>() {
442 0 : Ok(id) => id,
443 : Err(_) => {
444 0 : warn!(
445 0 : "Invalid tenant path (garbage in our repo directory?): '{}'",
446 0 : dentry.file_name()
447 : );
448 0 : continue;
449 : }
450 : };
451 :
452 0 : join_set.spawn_blocking(move || {
453 0 : (
454 0 : tenant_shard_id,
455 0 : load_tenant_config(conf, tenant_shard_id, dentry),
456 0 : )
457 0 : });
458 0 : }
459 :
460 0 : while let Some(r) = join_set.join_next().await {
461 0 : let (tenant_shard_id, tenant_config) = r.expect("Panic in config load task");
462 0 : if let Some(tenant_config) = tenant_config {
463 0 : configs.insert(tenant_shard_id, tenant_config);
464 0 : }
465 : }
466 :
467 0 : configs
468 0 : }
469 :
470 : #[derive(Debug, thiserror::Error)]
471 : pub(crate) enum DeleteTenantError {
472 : #[error("Tenant map slot error {0}")]
473 : SlotError(#[from] TenantSlotError),
474 :
475 : #[error("Cancelled")]
476 : Cancelled,
477 :
478 : #[error(transparent)]
479 : Other(#[from] anyhow::Error),
480 : }
481 :
482 : /// Initialize repositories with locally available timelines.
483 : /// Timelines that are only partially available locally (remote storage has more data than this pageserver)
484 : /// are scheduled for download and added to the tenant once download is completed.
485 : #[instrument(skip_all)]
486 : pub async fn init_tenant_mgr(
487 : conf: &'static PageServerConf,
488 : background_purges: BackgroundPurges,
489 : resources: TenantSharedResources,
490 : init_order: InitializationOrder,
491 : cancel: CancellationToken,
492 : ) -> anyhow::Result<TenantManager> {
493 : let mut tenants = BTreeMap::new();
494 :
495 : let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
496 :
497 : // Initialize dynamic limits that depend on system resources
498 : let system_memory =
499 : sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory())
500 : .total_memory();
501 : let max_ephemeral_layer_bytes =
502 : conf.ephemeral_bytes_per_memory_kb as u64 * (system_memory / 1024);
503 : tracing::info!(
504 : "Initialized ephemeral layer size limit to {max_ephemeral_layer_bytes}, for {system_memory} bytes of memory"
505 : );
506 : inmemory_layer::GLOBAL_RESOURCES.max_dirty_bytes.store(
507 : max_ephemeral_layer_bytes,
508 : std::sync::atomic::Ordering::Relaxed,
509 : );
510 :
511 : // Scan local filesystem for attached tenants
512 : let tenant_configs = init_load_tenant_configs(conf).await;
513 :
514 : // Determine which tenants are to be secondary or attached, and in which generation
515 : let tenant_modes = init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
516 :
517 : tracing::info!(
518 : "Attaching {} tenants at startup, warming up {} at a time",
519 : tenant_configs.len(),
520 : conf.concurrent_tenant_warmup.initial_permits()
521 : );
522 : TENANT.startup_scheduled.inc_by(tenant_configs.len() as u64);
523 :
524 : // Accumulate futures for writing tenant configs, so that we can execute in parallel
525 : let mut config_write_futs = Vec::new();
526 :
527 : // Update the location configs according to the re-attach response and persist them to disk
528 : tracing::info!("Updating {} location configs", tenant_configs.len());
529 : for (tenant_shard_id, location_conf) in tenant_configs {
530 : let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
531 :
532 : let mut location_conf = match location_conf {
533 : Ok(l) => l,
534 : Err(e) => {
535 : // This should only happen in the case of a serialization bug or critical local I/O error: we cannot load this tenant
536 : error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to load tenant config, failed to {e:#}");
537 : continue;
538 : }
539 : };
540 :
541 : // FIXME: if we were attached, and get demoted to secondary on re-attach, we
542 : // don't have a place to get a config.
543 : // (https://github.com/neondatabase/neon/issues/5377)
544 : const DEFAULT_SECONDARY_CONF: SecondaryLocationConfig =
545 : SecondaryLocationConfig { warm: true };
546 :
547 : if let Some(tenant_modes) = &tenant_modes {
548 : // We have a generation map: treat it as the authority for whether
549 : // this tenant is really attached.
550 : match tenant_modes.get(&tenant_shard_id) {
551 : None => {
552 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Detaching tenant, control plane omitted it in re-attach response");
553 :
554 : match safe_rename_tenant_dir(&tenant_dir_path).await {
555 : Ok(tmp_path) => {
556 : background_purges.spawn(tmp_path);
557 : }
558 : Err(e) => {
559 : error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
560 : "Failed to move detached tenant directory '{tenant_dir_path}': {e:?}");
561 : }
562 : };
563 :
564 : // We deleted local content: move on to next tenant, don't try and spawn this one.
565 : continue;
566 : }
567 : Some(TenantStartupMode::Secondary) => {
568 : if !matches!(location_conf.mode, LocationMode::Secondary(_)) {
569 : location_conf.mode = LocationMode::Secondary(DEFAULT_SECONDARY_CONF);
570 : }
571 : }
572 : Some(TenantStartupMode::Attached((attach_mode, generation))) => {
573 : let old_gen_higher = match &location_conf.mode {
574 : LocationMode::Attached(AttachedLocationConfig {
575 : generation: old_generation,
576 : attach_mode: _attach_mode,
577 : }) => {
578 : if old_generation > generation {
579 : Some(old_generation)
580 : } else {
581 : None
582 : }
583 : }
584 : _ => None,
585 : };
586 : if let Some(old_generation) = old_gen_higher {
587 : tracing::error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
588 : "Control plane gave decreasing generation ({generation:?}) in re-attach response for tenant that was attached in generation {:?}, demoting to secondary",
589 : old_generation
590 : );
591 :
592 : // We cannot safely attach this tenant given a bogus generation number, but let's avoid throwing away
593 : // local disk content: demote to secondary rather than detaching.
594 : location_conf.mode = LocationMode::Secondary(DEFAULT_SECONDARY_CONF);
595 : } else {
596 : location_conf.attach_in_generation(*attach_mode, *generation);
597 : }
598 : }
599 : }
600 : } else {
601 : // Legacy mode: no generation information, any tenant present
602 : // on local disk may activate
603 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Starting tenant in legacy mode, no generation",);
604 : };
605 :
606 : // Presence of a generation number implies attachment: attach the tenant
607 : // if it wasn't already, and apply the generation number.
608 0 : config_write_futs.push(async move {
609 0 : let r = Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await;
610 0 : (tenant_shard_id, location_conf, r)
611 0 : });
612 : }
613 :
614 : // Execute config writes with concurrency, to avoid bottlenecking on local FS write latency
615 : tracing::info!(
616 : "Writing {} location config files...",
617 : config_write_futs.len()
618 : );
619 : let config_write_results = futures::stream::iter(config_write_futs)
620 : .buffer_unordered(16)
621 : .collect::<Vec<_>>()
622 : .await;
623 :
624 : tracing::info!(
625 : "Spawning {} tenant shard locations...",
626 : config_write_results.len()
627 : );
628 : // For those shards that have live configurations, construct `Tenant` or `SecondaryTenant` objects and start them running
629 : for (tenant_shard_id, location_conf, config_write_result) in config_write_results {
630 : // Writing a config to local disk is foundational to startup up tenants: panic if we can't.
631 : config_write_result.fatal_err("write tenant shard config file");
632 :
633 : let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
634 : let shard_identity = location_conf.shard;
635 : let slot = match location_conf.mode {
636 : LocationMode::Attached(attached_conf) => TenantSlot::Attached(
637 : tenant_spawn(
638 : conf,
639 : tenant_shard_id,
640 : &tenant_dir_path,
641 : resources.clone(),
642 : AttachedTenantConf::new(location_conf.tenant_conf, attached_conf),
643 : shard_identity,
644 : Some(init_order.clone()),
645 : SpawnMode::Lazy,
646 : &ctx,
647 : )
648 : .expect("global shutdown during init_tenant_mgr cannot happen"),
649 : ),
650 : LocationMode::Secondary(secondary_conf) => {
651 : info!(
652 : tenant_id = %tenant_shard_id.tenant_id,
653 : shard_id = %tenant_shard_id.shard_slug(),
654 : "Starting secondary tenant"
655 : );
656 : TenantSlot::Secondary(SecondaryTenant::new(
657 : tenant_shard_id,
658 : shard_identity,
659 : location_conf.tenant_conf,
660 : &secondary_conf,
661 : ))
662 : }
663 : };
664 :
665 : METRICS.slot_inserted(&slot);
666 : tenants.insert(tenant_shard_id, slot);
667 : }
668 :
669 : info!("Processed {} local tenants at startup", tenants.len());
670 :
671 : let mut tenants_map = TENANTS.write().unwrap();
672 : assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
673 :
674 : *tenants_map = TenantsMap::Open(tenants);
675 :
676 : Ok(TenantManager {
677 : conf,
678 : tenants: &TENANTS,
679 : resources,
680 : cancel: CancellationToken::new(),
681 : background_purges,
682 : })
683 : }
684 :
685 : /// Wrapper for Tenant::spawn that checks invariants before running
686 : #[allow(clippy::too_many_arguments)]
687 0 : fn tenant_spawn(
688 0 : conf: &'static PageServerConf,
689 0 : tenant_shard_id: TenantShardId,
690 0 : tenant_path: &Utf8Path,
691 0 : resources: TenantSharedResources,
692 0 : location_conf: AttachedTenantConf,
693 0 : shard_identity: ShardIdentity,
694 0 : init_order: Option<InitializationOrder>,
695 0 : mode: SpawnMode,
696 0 : ctx: &RequestContext,
697 0 : ) -> Result<Arc<Tenant>, GlobalShutDown> {
698 0 : // All these conditions should have been satisfied by our caller: the tenant dir exists, is a well formed
699 0 : // path, and contains a configuration file. Assertions that do synchronous I/O are limited to debug mode
700 0 : // to avoid impacting prod runtime performance.
701 0 : assert!(!crate::is_temporary(tenant_path));
702 0 : debug_assert!(tenant_path.is_dir());
703 0 : debug_assert!(
704 0 : conf.tenant_location_config_path(&tenant_shard_id)
705 0 : .try_exists()
706 0 : .unwrap()
707 : );
708 :
709 0 : Tenant::spawn(
710 0 : conf,
711 0 : tenant_shard_id,
712 0 : resources,
713 0 : location_conf,
714 0 : shard_identity,
715 0 : init_order,
716 0 : mode,
717 0 : ctx,
718 0 : )
719 0 : }
720 :
721 4 : async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
722 4 : let mut join_set = JoinSet::new();
723 0 :
724 0 : #[cfg(all(debug_assertions, not(test)))]
725 0 : {
726 0 : // Check that our metrics properly tracked the size of the tenants map. This is a convenient location to check,
727 0 : // as it happens implicitly at the end of tests etc.
728 0 : let m = tenants.read().unwrap();
729 0 : debug_assert_eq!(METRICS.slots_total(), m.len() as u64);
730 : }
731 :
732 : // Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
733 4 : let (total_in_progress, total_attached) = {
734 4 : let mut m = tenants.write().unwrap();
735 4 : match &mut *m {
736 : TenantsMap::Initializing => {
737 0 : *m = TenantsMap::ShuttingDown(BTreeMap::default());
738 0 : info!("tenants map is empty");
739 0 : return;
740 : }
741 4 : TenantsMap::Open(tenants) => {
742 4 : let mut shutdown_state = BTreeMap::new();
743 4 : let mut total_in_progress = 0;
744 4 : let mut total_attached = 0;
745 :
746 4 : for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
747 4 : match v {
748 0 : TenantSlot::Attached(t) => {
749 0 : shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
750 0 : join_set.spawn(
751 0 : async move {
752 0 : let res = {
753 0 : let (_guard, shutdown_progress) = completion::channel();
754 0 : t.shutdown(shutdown_progress, ShutdownMode::FreezeAndFlush).await
755 : };
756 :
757 0 : if let Err(other_progress) = res {
758 : // join the another shutdown in progress
759 0 : other_progress.wait().await;
760 0 : }
761 :
762 : // we cannot afford per tenant logging here, because if s3 is degraded, we are
763 : // going to log too many lines
764 0 : debug!("tenant successfully stopped");
765 0 : }
766 0 : .instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())),
767 : );
768 :
769 0 : total_attached += 1;
770 : }
771 0 : TenantSlot::Secondary(state) => {
772 0 : // We don't need to wait for this individually per-tenant: the
773 0 : // downloader task will be waited on eventually, this cancel
774 0 : // is just to encourage it to drop out if it is doing work
775 0 : // for this tenant right now.
776 0 : state.cancel.cancel();
777 0 :
778 0 : shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state));
779 0 : }
780 4 : TenantSlot::InProgress(notify) => {
781 4 : // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
782 4 : // wait for their notifications to fire in this function.
783 4 : join_set.spawn(async move {
784 4 : notify.wait().await;
785 4 : });
786 4 :
787 4 : total_in_progress += 1;
788 4 : }
789 : }
790 : }
791 4 : *m = TenantsMap::ShuttingDown(shutdown_state);
792 4 : (total_in_progress, total_attached)
793 : }
794 : TenantsMap::ShuttingDown(_) => {
795 0 : error!(
796 0 : "already shutting down, this function isn't supposed to be called more than once"
797 : );
798 0 : return;
799 : }
800 : }
801 : };
802 :
803 4 : let started_at = std::time::Instant::now();
804 4 :
805 4 : info!(
806 0 : "Waiting for {} InProgress tenants and {} Attached tenants to shut down",
807 : total_in_progress, total_attached
808 : );
809 :
810 4 : let total = join_set.len();
811 4 : let mut panicked = 0;
812 4 : let mut buffering = true;
813 : const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
814 4 : let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
815 :
816 12 : while !join_set.is_empty() {
817 8 : tokio::select! {
818 8 : Some(joined) = join_set.join_next() => {
819 0 : match joined {
820 4 : Ok(()) => {},
821 0 : Err(join_error) if join_error.is_cancelled() => {
822 0 : unreachable!("we are not cancelling any of the tasks");
823 : }
824 0 : Err(join_error) if join_error.is_panic() => {
825 0 : // cannot really do anything, as this panic is likely a bug
826 0 : panicked += 1;
827 0 : }
828 0 : Err(join_error) => {
829 0 : warn!("unknown kind of JoinError: {join_error}");
830 : }
831 : }
832 4 : if !buffering {
833 4 : // buffer so that every 500ms since the first update (or starting) we'll log
834 4 : // how far away we are; this is because we will get SIGKILL'd at 10s, and we
835 4 : // are not able to log *then*.
836 4 : buffering = true;
837 4 : buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
838 4 : }
839 : },
840 8 : _ = &mut buffered, if buffering => {
841 4 : buffering = false;
842 4 : info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
843 : }
844 : }
845 : }
846 :
847 4 : if panicked > 0 {
848 0 : warn!(
849 : panicked,
850 0 : total, "observed panicks while shutting down tenants"
851 : );
852 4 : }
853 :
854 : // caller will log how long we took
855 4 : }
856 :
857 : #[derive(thiserror::Error, Debug)]
858 : pub(crate) enum UpsertLocationError {
859 : #[error("Bad config request: {0}")]
860 : BadRequest(anyhow::Error),
861 :
862 : #[error("Cannot change config in this state: {0}")]
863 : Unavailable(#[from] TenantMapError),
864 :
865 : #[error("Tenant is already being modified")]
866 : InProgress,
867 :
868 : #[error("Failed to flush: {0}")]
869 : Flush(anyhow::Error),
870 :
871 : /// This error variant is for unexpected situations (soft assertions) where the system is in an unexpected state.
872 : #[error("Internal error: {0}")]
873 : InternalError(anyhow::Error),
874 : }
875 :
876 : impl TenantManager {
877 : /// Convenience function so that anyone with a TenantManager can get at the global configuration, without
878 : /// having to pass it around everywhere as a separate object.
879 0 : pub(crate) fn get_conf(&self) -> &'static PageServerConf {
880 0 : self.conf
881 0 : }
882 :
883 : /// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or currently
884 : /// undergoing a state change (i.e. slot is InProgress).
885 : ///
886 : /// The return Tenant is not guaranteed to be active: check its status after obtaing it, or
887 : /// use [`Tenant::wait_to_become_active`] before using it if you will do I/O on it.
888 0 : pub(crate) fn get_attached_tenant_shard(
889 0 : &self,
890 0 : tenant_shard_id: TenantShardId,
891 0 : ) -> Result<Arc<Tenant>, GetTenantError> {
892 0 : let locked = self.tenants.read().unwrap();
893 :
894 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
895 :
896 0 : match peek_slot {
897 0 : Some(TenantSlot::Attached(tenant)) => Ok(Arc::clone(tenant)),
898 0 : Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
899 : None | Some(TenantSlot::Secondary(_)) => {
900 0 : Err(GetTenantError::ShardNotFound(tenant_shard_id))
901 : }
902 : }
903 0 : }
904 :
905 0 : pub(crate) fn get_secondary_tenant_shard(
906 0 : &self,
907 0 : tenant_shard_id: TenantShardId,
908 0 : ) -> Option<Arc<SecondaryTenant>> {
909 0 : let locked = self.tenants.read().unwrap();
910 0 :
911 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
912 0 : .ok()
913 0 : .flatten();
914 :
915 0 : match peek_slot {
916 0 : Some(TenantSlot::Secondary(s)) => Some(s.clone()),
917 0 : _ => None,
918 : }
919 0 : }
920 :
921 : /// Whether the `TenantManager` is responsible for the tenant shard
922 0 : pub(crate) fn manages_tenant_shard(&self, tenant_shard_id: TenantShardId) -> bool {
923 0 : let locked = self.tenants.read().unwrap();
924 0 :
925 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
926 0 : .ok()
927 0 : .flatten();
928 0 :
929 0 : peek_slot.is_some()
930 0 : }
931 :
932 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
933 : pub(crate) async fn upsert_location(
934 : &self,
935 : tenant_shard_id: TenantShardId,
936 : new_location_config: LocationConf,
937 : flush: Option<Duration>,
938 : mut spawn_mode: SpawnMode,
939 : ctx: &RequestContext,
940 : ) -> Result<Option<Arc<Tenant>>, UpsertLocationError> {
941 : debug_assert_current_span_has_tenant_id();
942 : info!("configuring tenant location to state {new_location_config:?}");
943 :
944 : enum FastPathModified {
945 : Attached(Arc<Tenant>),
946 : Secondary(Arc<SecondaryTenant>),
947 : }
948 :
949 : // Special case fast-path for updates to existing slots: if our upsert is only updating configuration,
950 : // then we do not need to set the slot to InProgress, we can just call into the
951 : // existng tenant.
952 : let fast_path_taken = {
953 : let locked = self.tenants.read().unwrap();
954 : let peek_slot =
955 : tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
956 : match (&new_location_config.mode, peek_slot) {
957 : (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
958 : match attach_conf.generation.cmp(&tenant.generation) {
959 : Ordering::Equal => {
960 : // A transition from Attached to Attached in the same generation, we may
961 : // take our fast path and just provide the updated configuration
962 : // to the tenant.
963 : tenant.set_new_location_config(
964 : AttachedTenantConf::try_from(new_location_config.clone())
965 : .map_err(UpsertLocationError::BadRequest)?,
966 : );
967 :
968 : Some(FastPathModified::Attached(tenant.clone()))
969 : }
970 : Ordering::Less => {
971 : return Err(UpsertLocationError::BadRequest(anyhow::anyhow!(
972 : "Generation {:?} is less than existing {:?}",
973 : attach_conf.generation,
974 : tenant.generation
975 : )));
976 : }
977 : Ordering::Greater => {
978 : // Generation advanced, fall through to general case of replacing `Tenant` object
979 : None
980 : }
981 : }
982 : }
983 : (
984 : LocationMode::Secondary(secondary_conf),
985 : Some(TenantSlot::Secondary(secondary_tenant)),
986 : ) => {
987 : secondary_tenant.set_config(secondary_conf);
988 : secondary_tenant.set_tenant_conf(&new_location_config.tenant_conf);
989 : Some(FastPathModified::Secondary(secondary_tenant.clone()))
990 : }
991 : _ => {
992 : // Not an Attached->Attached transition, fall through to general case
993 : None
994 : }
995 : }
996 : };
997 :
998 : // Fast-path continued: having dropped out of the self.tenants lock, do the async
999 : // phase of writing config and/or waiting for flush, before returning.
1000 : match fast_path_taken {
1001 : Some(FastPathModified::Attached(tenant)) => {
1002 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
1003 : .await
1004 : .fatal_err("write tenant shard config");
1005 :
1006 : // Transition to AttachedStale means we may well hold a valid generation
1007 : // still, and have been requested to go stale as part of a migration. If
1008 : // the caller set `flush`, then flush to remote storage.
1009 : if let LocationMode::Attached(AttachedLocationConfig {
1010 : generation: _,
1011 : attach_mode: AttachmentMode::Stale,
1012 : }) = &new_location_config.mode
1013 : {
1014 : if let Some(flush_timeout) = flush {
1015 : match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
1016 : Ok(Err(e)) => {
1017 : return Err(UpsertLocationError::Flush(e));
1018 : }
1019 : Ok(Ok(_)) => return Ok(Some(tenant)),
1020 : Err(_) => {
1021 : tracing::warn!(
1022 : timeout_ms = flush_timeout.as_millis(),
1023 : "Timed out waiting for flush to remote storage, proceeding anyway."
1024 : )
1025 : }
1026 : }
1027 : }
1028 : }
1029 :
1030 : return Ok(Some(tenant));
1031 : }
1032 : Some(FastPathModified::Secondary(_secondary_tenant)) => {
1033 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
1034 : .await
1035 : .fatal_err("write tenant shard config");
1036 :
1037 : return Ok(None);
1038 : }
1039 : None => {
1040 : // Proceed with the general case procedure, where we will shutdown & remove any existing
1041 : // slot contents and replace with a fresh one
1042 : }
1043 : };
1044 :
1045 : // General case for upserts to TenantsMap, excluding the case above: we will substitute an
1046 : // InProgress value to the slot while we make whatever changes are required. The state for
1047 : // the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
1048 : // the state is ill-defined while we're in transition. Transitions are async, but fast: we do
1049 : // not do significant I/O, and shutdowns should be prompt via cancellation tokens.
1050 : let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)
1051 0 : .map_err(|e| match e {
1052 : TenantSlotError::NotFound(_) => {
1053 0 : unreachable!("Called with mode Any")
1054 : }
1055 0 : TenantSlotError::InProgress => UpsertLocationError::InProgress,
1056 0 : TenantSlotError::MapState(s) => UpsertLocationError::Unavailable(s),
1057 0 : })?;
1058 :
1059 : match slot_guard.get_old_value() {
1060 : Some(TenantSlot::Attached(tenant)) => {
1061 : // The case where we keep a Tenant alive was covered above in the special case
1062 : // for Attached->Attached transitions in the same generation. By this point,
1063 : // if we see an attached tenant we know it will be discarded and should be
1064 : // shut down.
1065 : let (_guard, progress) = utils::completion::channel();
1066 :
1067 : match tenant.get_attach_mode() {
1068 : AttachmentMode::Single | AttachmentMode::Multi => {
1069 : // Before we leave our state as the presumed holder of the latest generation,
1070 : // flush any outstanding deletions to reduce the risk of leaking objects.
1071 : self.resources.deletion_queue_client.flush_advisory()
1072 : }
1073 : AttachmentMode::Stale => {
1074 : // If we're stale there's not point trying to flush deletions
1075 : }
1076 : };
1077 :
1078 : info!("Shutting down attached tenant");
1079 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1080 : Ok(()) => {}
1081 : Err(barrier) => {
1082 : info!("Shutdown already in progress, waiting for it to complete");
1083 : barrier.wait().await;
1084 : }
1085 : }
1086 : slot_guard.drop_old_value().expect("We just shut it down");
1087 :
1088 : // Edge case: if we were called with SpawnMode::Create, but a Tenant already existed, then
1089 : // the caller thinks they're creating but the tenant already existed. We must switch to
1090 : // Eager mode so that when starting this Tenant we properly probe remote storage for timelines,
1091 : // rather than assuming it to be empty.
1092 : spawn_mode = SpawnMode::Eager;
1093 : }
1094 : Some(TenantSlot::Secondary(state)) => {
1095 : info!("Shutting down secondary tenant");
1096 : state.shutdown().await;
1097 : }
1098 : Some(TenantSlot::InProgress(_)) => {
1099 : // This should never happen: acquire_slot should error out
1100 : // if the contents of a slot were InProgress.
1101 : return Err(UpsertLocationError::InternalError(anyhow::anyhow!(
1102 : "Acquired an InProgress slot, this is a bug."
1103 : )));
1104 : }
1105 : None => {
1106 : // Slot was vacant, nothing needs shutting down.
1107 : }
1108 : }
1109 :
1110 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1111 : let timelines_path = self.conf.timelines_path(&tenant_shard_id);
1112 :
1113 : // Directory structure is the same for attached and secondary modes:
1114 : // create it if it doesn't exist. Timeline load/creation expects the
1115 : // timelines/ subdir to already exist.
1116 : //
1117 : // Does not need to be fsync'd because local storage is just a cache.
1118 : tokio::fs::create_dir_all(&timelines_path)
1119 : .await
1120 : .fatal_err("create timelines/ dir");
1121 :
1122 : // Before activating either secondary or attached mode, persist the
1123 : // configuration, so that on restart we will re-attach (or re-start
1124 : // secondary) on the tenant.
1125 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
1126 : .await
1127 : .fatal_err("write tenant shard config");
1128 :
1129 : let new_slot = match &new_location_config.mode {
1130 : LocationMode::Secondary(secondary_config) => {
1131 : let shard_identity = new_location_config.shard;
1132 : TenantSlot::Secondary(SecondaryTenant::new(
1133 : tenant_shard_id,
1134 : shard_identity,
1135 : new_location_config.tenant_conf,
1136 : secondary_config,
1137 : ))
1138 : }
1139 : LocationMode::Attached(_attach_config) => {
1140 : let shard_identity = new_location_config.shard;
1141 :
1142 : // Testing hack: if we are configured with no control plane, then drop the generation
1143 : // from upserts. This enables creating generation-less tenants even though neon_local
1144 : // always uses generations when calling the location conf API.
1145 : let attached_conf = if cfg!(feature = "testing") {
1146 : let mut conf = AttachedTenantConf::try_from(new_location_config)
1147 : .map_err(UpsertLocationError::BadRequest)?;
1148 : if self.conf.control_plane_api.is_none() {
1149 : conf.location.generation = Generation::none();
1150 : }
1151 : conf
1152 : } else {
1153 : AttachedTenantConf::try_from(new_location_config)
1154 : .map_err(UpsertLocationError::BadRequest)?
1155 : };
1156 :
1157 : let tenant = tenant_spawn(
1158 : self.conf,
1159 : tenant_shard_id,
1160 : &tenant_path,
1161 : self.resources.clone(),
1162 : attached_conf,
1163 : shard_identity,
1164 : None,
1165 : spawn_mode,
1166 : ctx,
1167 : )
1168 0 : .map_err(|_: GlobalShutDown| {
1169 0 : UpsertLocationError::Unavailable(TenantMapError::ShuttingDown)
1170 0 : })?;
1171 :
1172 : TenantSlot::Attached(tenant)
1173 : }
1174 : };
1175 :
1176 : let attached_tenant = if let TenantSlot::Attached(tenant) = &new_slot {
1177 : Some(tenant.clone())
1178 : } else {
1179 : None
1180 : };
1181 :
1182 : match slot_guard.upsert(new_slot) {
1183 : Err(TenantSlotUpsertError::InternalError(e)) => {
1184 : Err(UpsertLocationError::InternalError(anyhow::anyhow!(e)))
1185 : }
1186 : Err(TenantSlotUpsertError::MapState(e)) => Err(UpsertLocationError::Unavailable(e)),
1187 : Err(TenantSlotUpsertError::ShuttingDown((new_slot, _completion))) => {
1188 : // If we just called tenant_spawn() on a new tenant, and can't insert it into our map, then
1189 : // we must not leak it: this would violate the invariant that after shutdown_all_tenants, all tenants
1190 : // are shutdown.
1191 : //
1192 : // We must shut it down inline here.
1193 : match new_slot {
1194 : TenantSlot::InProgress(_) => {
1195 : // Unreachable because we never insert an InProgress
1196 : unreachable!()
1197 : }
1198 : TenantSlot::Attached(tenant) => {
1199 : let (_guard, progress) = utils::completion::channel();
1200 : info!(
1201 : "Shutting down just-spawned tenant, because tenant manager is shut down"
1202 : );
1203 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1204 : Ok(()) => {
1205 : info!("Finished shutting down just-spawned tenant");
1206 : }
1207 : Err(barrier) => {
1208 : info!("Shutdown already in progress, waiting for it to complete");
1209 : barrier.wait().await;
1210 : }
1211 : }
1212 : }
1213 : TenantSlot::Secondary(secondary_tenant) => {
1214 : secondary_tenant.shutdown().await;
1215 : }
1216 : }
1217 :
1218 : Err(UpsertLocationError::Unavailable(
1219 : TenantMapError::ShuttingDown,
1220 : ))
1221 : }
1222 : Ok(()) => Ok(attached_tenant),
1223 : }
1224 : }
1225 :
1226 : /// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
1227 : /// LocationConf that was last used to attach it. Optionally, the local file cache may be
1228 : /// dropped before re-attaching.
1229 : ///
1230 : /// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
1231 : /// where an issue is identified that would go away with a restart of the tenant.
1232 : ///
1233 : /// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
1234 : /// to respect the cancellation tokens used in normal shutdown().
1235 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
1236 : pub(crate) async fn reset_tenant(
1237 : &self,
1238 : tenant_shard_id: TenantShardId,
1239 : drop_cache: bool,
1240 : ctx: &RequestContext,
1241 : ) -> anyhow::Result<()> {
1242 : let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1243 : let Some(old_slot) = slot_guard.get_old_value() else {
1244 : anyhow::bail!("Tenant not found when trying to reset");
1245 : };
1246 :
1247 : let Some(tenant) = old_slot.get_attached() else {
1248 : slot_guard.revert();
1249 : anyhow::bail!("Tenant is not in attached state");
1250 : };
1251 :
1252 : let (_guard, progress) = utils::completion::channel();
1253 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1254 : Ok(()) => {
1255 : slot_guard.drop_old_value()?;
1256 : }
1257 : Err(_barrier) => {
1258 : slot_guard.revert();
1259 : anyhow::bail!("Cannot reset Tenant, already shutting down");
1260 : }
1261 : }
1262 :
1263 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1264 : let timelines_path = self.conf.timelines_path(&tenant_shard_id);
1265 : let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
1266 :
1267 : if drop_cache {
1268 : tracing::info!("Dropping local file cache");
1269 :
1270 : match tokio::fs::read_dir(&timelines_path).await {
1271 : Err(e) => {
1272 : tracing::warn!("Failed to list timelines while dropping cache: {}", e);
1273 : }
1274 : Ok(mut entries) => {
1275 : while let Some(entry) = entries.next_entry().await? {
1276 : tokio::fs::remove_dir_all(entry.path()).await?;
1277 : }
1278 : }
1279 : }
1280 : }
1281 :
1282 : let shard_identity = config.shard;
1283 : let tenant = tenant_spawn(
1284 : self.conf,
1285 : tenant_shard_id,
1286 : &tenant_path,
1287 : self.resources.clone(),
1288 : AttachedTenantConf::try_from(config)?,
1289 : shard_identity,
1290 : None,
1291 : SpawnMode::Eager,
1292 : ctx,
1293 : )?;
1294 :
1295 : slot_guard.upsert(TenantSlot::Attached(tenant))?;
1296 :
1297 : Ok(())
1298 : }
1299 :
1300 0 : pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec<Arc<Tenant>> {
1301 0 : let locked = self.tenants.read().unwrap();
1302 0 : match &*locked {
1303 0 : TenantsMap::Initializing => Vec::new(),
1304 0 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => map
1305 0 : .values()
1306 0 : .filter_map(|slot| {
1307 0 : slot.get_attached()
1308 0 : .and_then(|t| if t.is_active() { Some(t.clone()) } else { None })
1309 0 : })
1310 0 : .collect(),
1311 : }
1312 0 : }
1313 : // Do some synchronous work for all tenant slots in Secondary state. The provided
1314 : // callback should be small and fast, as it will be called inside the global
1315 : // TenantsMap lock.
1316 0 : pub(crate) fn foreach_secondary_tenants<F>(&self, mut func: F)
1317 0 : where
1318 0 : // TODO: let the callback return a hint to drop out of the loop early
1319 0 : F: FnMut(&TenantShardId, &Arc<SecondaryTenant>),
1320 0 : {
1321 0 : let locked = self.tenants.read().unwrap();
1322 :
1323 0 : let map = match &*locked {
1324 0 : TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
1325 0 : TenantsMap::Open(m) => m,
1326 : };
1327 :
1328 0 : for (tenant_id, slot) in map {
1329 0 : if let TenantSlot::Secondary(state) = slot {
1330 : // Only expose secondary tenants that are not currently shutting down
1331 0 : if !state.cancel.is_cancelled() {
1332 0 : func(tenant_id, state)
1333 0 : }
1334 0 : }
1335 : }
1336 0 : }
1337 :
1338 : /// Total list of all tenant slots: this includes attached, secondary, and InProgress.
1339 0 : pub(crate) fn list(&self) -> Vec<(TenantShardId, TenantSlot)> {
1340 0 : let locked = self.tenants.read().unwrap();
1341 0 : match &*locked {
1342 0 : TenantsMap::Initializing => Vec::new(),
1343 0 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
1344 0 : map.iter().map(|(k, v)| (*k, v.clone())).collect()
1345 : }
1346 : }
1347 0 : }
1348 :
1349 0 : pub(crate) fn get(&self, tenant_shard_id: TenantShardId) -> Option<TenantSlot> {
1350 0 : let locked = self.tenants.read().unwrap();
1351 0 : match &*locked {
1352 0 : TenantsMap::Initializing => None,
1353 0 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
1354 0 : map.get(&tenant_shard_id).cloned()
1355 : }
1356 : }
1357 0 : }
1358 :
1359 : /// If a tenant is attached, detach it. Then remove its data from remote storage.
1360 : ///
1361 : /// A tenant is considered deleted once it is gone from remote storage. It is the caller's
1362 : /// responsibility to avoid trying to attach the tenant again or use it any way once deletion
1363 : /// has started: this operation is not atomic, and must be retried until it succeeds.
1364 : ///
1365 : /// As a special case, if an unsharded tenant ID is given for a sharded tenant, it will remove
1366 : /// all tenant shards in remote storage (removing all paths with the tenant prefix). The storage
1367 : /// controller uses this to purge all remote tenant data, including any stale parent shards that
1368 : /// may remain after splits. Ideally, this special case would be handled elsewhere. See:
1369 : /// <https://github.com/neondatabase/neon/pull/9394>.
1370 0 : pub(crate) async fn delete_tenant(
1371 0 : &self,
1372 0 : tenant_shard_id: TenantShardId,
1373 0 : ) -> Result<(), DeleteTenantError> {
1374 0 : super::span::debug_assert_current_span_has_tenant_id();
1375 :
1376 0 : async fn delete_local(
1377 0 : conf: &PageServerConf,
1378 0 : background_purges: &BackgroundPurges,
1379 0 : tenant_shard_id: &TenantShardId,
1380 0 : ) -> anyhow::Result<()> {
1381 0 : let local_tenant_directory = conf.tenant_path(tenant_shard_id);
1382 0 : let tmp_dir = safe_rename_tenant_dir(&local_tenant_directory)
1383 0 : .await
1384 0 : .with_context(|| {
1385 0 : format!("local tenant directory {local_tenant_directory:?} rename")
1386 0 : })?;
1387 0 : background_purges.spawn(tmp_dir);
1388 0 : Ok(())
1389 0 : }
1390 :
1391 0 : let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1392 0 : match &slot_guard.old_value {
1393 0 : Some(TenantSlot::Attached(tenant)) => {
1394 0 : // Legacy deletion flow: the tenant remains attached, goes to Stopping state, and
1395 0 : // deletion will be resumed across restarts.
1396 0 : let tenant = tenant.clone();
1397 0 : let (_guard, progress) = utils::completion::channel();
1398 0 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1399 0 : Ok(()) => {}
1400 0 : Err(barrier) => {
1401 0 : info!("Shutdown already in progress, waiting for it to complete");
1402 0 : barrier.wait().await;
1403 : }
1404 : }
1405 0 : delete_local(self.conf, &self.background_purges, &tenant_shard_id).await?;
1406 : }
1407 0 : Some(TenantSlot::Secondary(secondary_tenant)) => {
1408 0 : secondary_tenant.shutdown().await;
1409 :
1410 0 : delete_local(self.conf, &self.background_purges, &tenant_shard_id).await?;
1411 : }
1412 0 : Some(TenantSlot::InProgress(_)) => unreachable!(),
1413 0 : None => {}
1414 : };
1415 :
1416 : // Fall through: local state for this tenant is no longer present, proceed with remote delete.
1417 : // - We use a retry wrapper here so that common transient S3 errors (e.g. 503, 429) do not result
1418 : // in 500 responses to delete requests.
1419 : // - We keep the `SlotGuard` during this I/O, so that if a concurrent delete request comes in, it will
1420 : // 503/retry, rather than kicking off a wasteful concurrent deletion.
1421 : // NB: this also deletes partial prefixes, i.e. a <tenant_id> path will delete all
1422 : // <tenant_id>_<shard_id>/* objects. See method comment for why.
1423 0 : backoff::retry(
1424 0 : || async move {
1425 0 : self.resources
1426 0 : .remote_storage
1427 0 : .delete_prefix(&remote_tenant_path(&tenant_shard_id), &self.cancel)
1428 0 : .await
1429 0 : },
1430 0 : |_| false, // backoff::retry handles cancellation
1431 0 : 1,
1432 0 : 3,
1433 0 : &format!("delete_tenant[tenant_shard_id={tenant_shard_id}]"),
1434 0 : &self.cancel,
1435 0 : )
1436 0 : .await
1437 0 : .unwrap_or(Err(TimeoutOrCancel::Cancel.into()))
1438 0 : .map_err(|err| {
1439 0 : if TimeoutOrCancel::caused_by_cancel(&err) {
1440 0 : return DeleteTenantError::Cancelled;
1441 0 : }
1442 0 : DeleteTenantError::Other(err)
1443 0 : })
1444 0 : }
1445 :
1446 : #[instrument(skip_all, fields(tenant_id=%tenant.get_tenant_shard_id().tenant_id, shard_id=%tenant.get_tenant_shard_id().shard_slug(), new_shard_count=%new_shard_count.literal()))]
1447 : pub(crate) async fn shard_split(
1448 : &self,
1449 : tenant: Arc<Tenant>,
1450 : new_shard_count: ShardCount,
1451 : new_stripe_size: Option<ShardStripeSize>,
1452 : ctx: &RequestContext,
1453 : ) -> anyhow::Result<Vec<TenantShardId>> {
1454 : let tenant_shard_id = *tenant.get_tenant_shard_id();
1455 : let r = self
1456 : .do_shard_split(tenant, new_shard_count, new_stripe_size, ctx)
1457 : .await;
1458 : if r.is_err() {
1459 : // Shard splitting might have left the original shard in a partially shut down state (it
1460 : // stops the shard's remote timeline client). Reset it to ensure we leave things in
1461 : // a working state.
1462 : if self.get(tenant_shard_id).is_some() {
1463 : tracing::warn!("Resetting after shard split failure");
1464 : if let Err(e) = self.reset_tenant(tenant_shard_id, false, ctx).await {
1465 : // Log this error because our return value will still be the original error, not this one. This is
1466 : // a severe error: if this happens, we might be leaving behind a tenant that is not fully functional
1467 : // (e.g. has uploads disabled). We can't do anything else: if reset fails then shutting the tenant down or
1468 : // setting it broken probably won't help either.
1469 : tracing::error!("Failed to reset: {e}");
1470 : }
1471 : }
1472 : }
1473 :
1474 : r
1475 : }
1476 :
1477 0 : pub(crate) async fn do_shard_split(
1478 0 : &self,
1479 0 : tenant: Arc<Tenant>,
1480 0 : new_shard_count: ShardCount,
1481 0 : new_stripe_size: Option<ShardStripeSize>,
1482 0 : ctx: &RequestContext,
1483 0 : ) -> anyhow::Result<Vec<TenantShardId>> {
1484 0 : let tenant_shard_id = *tenant.get_tenant_shard_id();
1485 0 :
1486 0 : // Validate the incoming request
1487 0 : if new_shard_count.count() <= tenant_shard_id.shard_count.count() {
1488 0 : anyhow::bail!("Requested shard count is not an increase");
1489 0 : }
1490 0 : let expansion_factor = new_shard_count.count() / tenant_shard_id.shard_count.count();
1491 0 : if !expansion_factor.is_power_of_two() {
1492 0 : anyhow::bail!("Requested split is not a power of two");
1493 0 : }
1494 :
1495 0 : if let Some(new_stripe_size) = new_stripe_size {
1496 0 : if tenant.get_shard_stripe_size() != new_stripe_size
1497 0 : && tenant_shard_id.shard_count.count() > 1
1498 : {
1499 : // This tenant already has multiple shards, it is illegal to try and change its stripe size
1500 0 : anyhow::bail!(
1501 0 : "Shard stripe size may not be modified once tenant has multiple shards"
1502 0 : );
1503 0 : }
1504 0 : }
1505 :
1506 : // Plan: identify what the new child shards will be
1507 0 : let child_shards = tenant_shard_id.split(new_shard_count);
1508 0 : tracing::info!(
1509 0 : "Shard {} splits into: {}",
1510 0 : tenant_shard_id.to_index(),
1511 0 : child_shards
1512 0 : .iter()
1513 0 : .map(|id| format!("{}", id.to_index()))
1514 0 : .join(",")
1515 : );
1516 :
1517 0 : fail::fail_point!("shard-split-pre-prepare", |_| Err(anyhow::anyhow!(
1518 0 : "failpoint"
1519 0 : )));
1520 :
1521 0 : let parent_shard_identity = tenant.shard_identity;
1522 0 : let parent_tenant_conf = tenant.get_tenant_conf();
1523 0 : let parent_generation = tenant.generation;
1524 :
1525 : // Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
1526 0 : if let Err(e) = tenant.split_prepare(&child_shards).await {
1527 : // If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
1528 : // have been left in a partially-shut-down state.
1529 0 : tracing::warn!("Failed to prepare for split: {e}, reloading Tenant before returning");
1530 0 : return Err(e);
1531 0 : }
1532 0 :
1533 0 : fail::fail_point!("shard-split-post-prepare", |_| Err(anyhow::anyhow!(
1534 0 : "failpoint"
1535 0 : )));
1536 :
1537 0 : self.resources.deletion_queue_client.flush_advisory();
1538 0 :
1539 0 : // Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
1540 0 : drop(tenant);
1541 0 : let mut parent_slot_guard =
1542 0 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1543 0 : let parent = match parent_slot_guard.get_old_value() {
1544 0 : Some(TenantSlot::Attached(t)) => t,
1545 0 : Some(TenantSlot::Secondary(_)) => anyhow::bail!("Tenant location in secondary mode"),
1546 : Some(TenantSlot::InProgress(_)) => {
1547 : // tenant_map_acquire_slot never returns InProgress, if a slot was InProgress
1548 : // it would return an error.
1549 0 : unreachable!()
1550 : }
1551 : None => {
1552 : // We don't actually need the parent shard to still be attached to do our work, but it's
1553 : // a weird enough situation that the caller probably didn't want us to continue working
1554 : // if they had detached the tenant they requested the split on.
1555 0 : anyhow::bail!("Detached parent shard in the middle of split!")
1556 : }
1557 : };
1558 0 : fail::fail_point!("shard-split-pre-hardlink", |_| Err(anyhow::anyhow!(
1559 0 : "failpoint"
1560 0 : )));
1561 : // Optimization: hardlink layers from the parent into the children, so that they don't have to
1562 : // re-download & duplicate the data referenced in their initial IndexPart
1563 0 : self.shard_split_hardlink(parent, child_shards.clone())
1564 0 : .await?;
1565 0 : fail::fail_point!("shard-split-post-hardlink", |_| Err(anyhow::anyhow!(
1566 0 : "failpoint"
1567 0 : )));
1568 :
1569 : // Take a snapshot of where the parent's WAL ingest had got to: we will wait for
1570 : // child shards to reach this point.
1571 0 : let mut target_lsns = HashMap::new();
1572 0 : for timeline in parent.timelines.lock().unwrap().clone().values() {
1573 0 : target_lsns.insert(timeline.timeline_id, timeline.get_last_record_lsn());
1574 0 : }
1575 :
1576 : // TODO: we should have the parent shard stop its WAL ingest here, it's a waste of resources
1577 : // and could slow down the children trying to catch up.
1578 :
1579 : // Phase 3: Spawn the child shards
1580 0 : for child_shard in &child_shards {
1581 0 : let mut child_shard_identity = parent_shard_identity;
1582 0 : if let Some(new_stripe_size) = new_stripe_size {
1583 0 : child_shard_identity.stripe_size = new_stripe_size;
1584 0 : }
1585 0 : child_shard_identity.count = child_shard.shard_count;
1586 0 : child_shard_identity.number = child_shard.shard_number;
1587 0 :
1588 0 : let child_location_conf = LocationConf {
1589 0 : mode: LocationMode::Attached(AttachedLocationConfig {
1590 0 : generation: parent_generation,
1591 0 : attach_mode: AttachmentMode::Single,
1592 0 : }),
1593 0 : shard: child_shard_identity,
1594 0 : tenant_conf: parent_tenant_conf.clone(),
1595 0 : };
1596 0 :
1597 0 : self.upsert_location(
1598 0 : *child_shard,
1599 0 : child_location_conf,
1600 0 : None,
1601 0 : SpawnMode::Eager,
1602 0 : ctx,
1603 0 : )
1604 0 : .await?;
1605 : }
1606 :
1607 0 : fail::fail_point!("shard-split-post-child-conf", |_| Err(anyhow::anyhow!(
1608 0 : "failpoint"
1609 0 : )));
1610 :
1611 : // Phase 4: wait for child chards WAL ingest to catch up to target LSN
1612 0 : for child_shard_id in &child_shards {
1613 0 : let child_shard_id = *child_shard_id;
1614 0 : let child_shard = {
1615 0 : let locked = self.tenants.read().unwrap();
1616 0 : let peek_slot =
1617 0 : tenant_map_peek_slot(&locked, &child_shard_id, TenantSlotPeekMode::Read)?;
1618 0 : peek_slot.and_then(|s| s.get_attached()).cloned()
1619 : };
1620 0 : if let Some(t) = child_shard {
1621 : // Wait for the child shard to become active: this should be very quick because it only
1622 : // has to download the index_part that we just uploaded when creating it.
1623 0 : if let Err(e) = t.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await {
1624 : // This is not fatal: we have durably created the child shard. It just makes the
1625 : // split operation less seamless for clients, as we will may detach the parent
1626 : // shard before the child shards are fully ready to serve requests.
1627 0 : tracing::warn!("Failed to wait for shard {child_shard_id} to activate: {e}");
1628 0 : continue;
1629 0 : }
1630 0 :
1631 0 : let timelines = t.timelines.lock().unwrap().clone();
1632 0 : for timeline in timelines.values() {
1633 0 : let Some(target_lsn) = target_lsns.get(&timeline.timeline_id) else {
1634 0 : continue;
1635 : };
1636 :
1637 0 : tracing::info!(
1638 0 : "Waiting for child shard {}/{} to reach target lsn {}...",
1639 0 : child_shard_id,
1640 0 : timeline.timeline_id,
1641 : target_lsn
1642 : );
1643 :
1644 0 : fail::fail_point!("shard-split-lsn-wait", |_| Err(anyhow::anyhow!(
1645 0 : "failpoint"
1646 0 : )));
1647 0 : if let Err(e) = timeline
1648 0 : .wait_lsn(
1649 0 : *target_lsn,
1650 0 : crate::tenant::timeline::WaitLsnWaiter::Tenant,
1651 0 : crate::tenant::timeline::WaitLsnTimeout::Default,
1652 0 : ctx,
1653 0 : )
1654 0 : .await
1655 : {
1656 : // Failure here might mean shutdown, in any case this part is an optimization
1657 : // and we shouldn't hold up the split operation.
1658 0 : tracing::warn!(
1659 0 : "Failed to wait for timeline {} to reach lsn {target_lsn}: {e}",
1660 0 : timeline.timeline_id
1661 : );
1662 : } else {
1663 0 : tracing::info!(
1664 0 : "Child shard {}/{} reached target lsn {}",
1665 0 : child_shard_id,
1666 0 : timeline.timeline_id,
1667 : target_lsn
1668 : );
1669 : }
1670 : }
1671 0 : }
1672 : }
1673 :
1674 : // Phase 5: Shut down the parent shard, and erase it from disk
1675 0 : let (_guard, progress) = completion::channel();
1676 0 : match parent.shutdown(progress, ShutdownMode::Hard).await {
1677 0 : Ok(()) => {}
1678 0 : Err(other) => {
1679 0 : other.wait().await;
1680 : }
1681 : }
1682 0 : let local_tenant_directory = self.conf.tenant_path(&tenant_shard_id);
1683 0 : let tmp_path = safe_rename_tenant_dir(&local_tenant_directory)
1684 0 : .await
1685 0 : .with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))?;
1686 0 : self.background_purges.spawn(tmp_path);
1687 0 :
1688 0 : fail::fail_point!("shard-split-pre-finish", |_| Err(anyhow::anyhow!(
1689 0 : "failpoint"
1690 0 : )));
1691 :
1692 0 : parent_slot_guard.drop_old_value()?;
1693 :
1694 : // Phase 6: Release the InProgress on the parent shard
1695 0 : drop(parent_slot_guard);
1696 0 :
1697 0 : Ok(child_shards)
1698 0 : }
1699 :
1700 : /// Part of [`Self::shard_split`]: hard link parent shard layers into child shards, as an optimization
1701 : /// to avoid the children downloading them again.
1702 : ///
1703 : /// For each resident layer in the parent shard, we will hard link it into all of the child shards.
1704 0 : async fn shard_split_hardlink(
1705 0 : &self,
1706 0 : parent_shard: &Tenant,
1707 0 : child_shards: Vec<TenantShardId>,
1708 0 : ) -> anyhow::Result<()> {
1709 0 : debug_assert_current_span_has_tenant_id();
1710 0 :
1711 0 : let parent_path = self.conf.tenant_path(parent_shard.get_tenant_shard_id());
1712 0 : let (parent_timelines, parent_layers) = {
1713 0 : let mut parent_layers = Vec::new();
1714 0 : let timelines = parent_shard.timelines.lock().unwrap().clone();
1715 0 : let parent_timelines = timelines.keys().cloned().collect::<Vec<_>>();
1716 0 : for timeline in timelines.values() {
1717 0 : tracing::info!(timeline_id=%timeline.timeline_id, "Loading list of layers to hardlink");
1718 0 : let layers = timeline.layers.read().await;
1719 :
1720 0 : for layer in layers.likely_resident_layers() {
1721 0 : let relative_path = layer
1722 0 : .local_path()
1723 0 : .strip_prefix(&parent_path)
1724 0 : .context("Removing prefix from parent layer path")?;
1725 0 : parent_layers.push(relative_path.to_owned());
1726 : }
1727 : }
1728 :
1729 0 : if parent_layers.is_empty() {
1730 0 : tracing::info!("Ancestor shard has no resident layer to hard link");
1731 0 : }
1732 :
1733 0 : (parent_timelines, parent_layers)
1734 0 : };
1735 0 :
1736 0 : let mut child_prefixes = Vec::new();
1737 0 : let mut create_dirs = Vec::new();
1738 :
1739 0 : for child in child_shards {
1740 0 : let child_prefix = self.conf.tenant_path(&child);
1741 0 : create_dirs.push(child_prefix.clone());
1742 0 : create_dirs.extend(
1743 0 : parent_timelines
1744 0 : .iter()
1745 0 : .map(|t| self.conf.timeline_path(&child, t)),
1746 0 : );
1747 0 :
1748 0 : child_prefixes.push(child_prefix);
1749 0 : }
1750 :
1751 : // Since we will do a large number of small filesystem metadata operations, batch them into
1752 : // spawn_blocking calls rather than doing each one as a tokio::fs round-trip.
1753 0 : let span = tracing::Span::current();
1754 0 : let jh = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> {
1755 0 : // Run this synchronous code in the same log context as the outer function that spawned it.
1756 0 : let _span = span.enter();
1757 0 :
1758 0 : tracing::info!("Creating {} directories", create_dirs.len());
1759 0 : for dir in &create_dirs {
1760 0 : if let Err(e) = std::fs::create_dir_all(dir) {
1761 : // Ignore AlreadyExists errors, drop out on all other errors
1762 0 : match e.kind() {
1763 0 : std::io::ErrorKind::AlreadyExists => {}
1764 : _ => {
1765 0 : return Err(anyhow::anyhow!(e).context(format!("Creating {dir}")));
1766 : }
1767 : }
1768 0 : }
1769 : }
1770 :
1771 0 : for child_prefix in child_prefixes {
1772 0 : tracing::info!(
1773 0 : "Hard-linking {} parent layers into child path {}",
1774 0 : parent_layers.len(),
1775 : child_prefix
1776 : );
1777 0 : for relative_layer in &parent_layers {
1778 0 : let parent_path = parent_path.join(relative_layer);
1779 0 : let child_path = child_prefix.join(relative_layer);
1780 0 : if let Err(e) = std::fs::hard_link(&parent_path, &child_path) {
1781 0 : match e.kind() {
1782 0 : std::io::ErrorKind::AlreadyExists => {}
1783 : std::io::ErrorKind::NotFound => {
1784 0 : tracing::info!(
1785 0 : "Layer {} not found during hard-linking, evicted during split?",
1786 : relative_layer
1787 : );
1788 : }
1789 : _ => {
1790 0 : return Err(anyhow::anyhow!(e).context(format!(
1791 0 : "Hard linking {relative_layer} into {child_prefix}"
1792 0 : )));
1793 : }
1794 : }
1795 0 : }
1796 : }
1797 : }
1798 :
1799 : // Durability is not required for correctness, but if we crashed during split and
1800 : // then came restarted with empty timeline dirs, it would be very inefficient to
1801 : // re-populate from remote storage.
1802 0 : tracing::info!("fsyncing {} directories", create_dirs.len());
1803 0 : for dir in create_dirs {
1804 0 : if let Err(e) = crashsafe::fsync(&dir) {
1805 : // Something removed a newly created timeline dir out from underneath us? Extremely
1806 : // unexpected, but not worth panic'ing over as this whole function is just an
1807 : // optimization.
1808 0 : tracing::warn!("Failed to fsync directory {dir}: {e}")
1809 0 : }
1810 : }
1811 :
1812 0 : Ok(parent_layers.len())
1813 0 : });
1814 0 :
1815 0 : match jh.await {
1816 0 : Ok(Ok(layer_count)) => {
1817 0 : tracing::info!(count = layer_count, "Hard linked layers into child shards");
1818 : }
1819 0 : Ok(Err(e)) => {
1820 0 : // This is an optimization, so we tolerate failure.
1821 0 : tracing::warn!("Error hard-linking layers, proceeding anyway: {e}")
1822 : }
1823 0 : Err(e) => {
1824 0 : // This is something totally unexpected like a panic, so bail out.
1825 0 : anyhow::bail!("Error joining hard linking task: {e}");
1826 : }
1827 : }
1828 :
1829 0 : Ok(())
1830 0 : }
1831 :
1832 : ///
1833 : /// Shut down all tenants. This runs as part of pageserver shutdown.
1834 : ///
1835 : /// NB: We leave the tenants in the map, so that they remain accessible through
1836 : /// the management API until we shut it down. If we removed the shut-down tenants
1837 : /// from the tenants map, the management API would return 404 for these tenants,
1838 : /// because TenantsMap::get() now returns `None`.
1839 : /// That could be easily misinterpreted by control plane, the consumer of the
1840 : /// management API. For example, it could attach the tenant on a different pageserver.
1841 : /// We would then be in split-brain once this pageserver restarts.
1842 : #[instrument(skip_all)]
1843 : pub(crate) async fn shutdown(&self) {
1844 : self.cancel.cancel();
1845 :
1846 : shutdown_all_tenants0(self.tenants).await
1847 : }
1848 :
1849 0 : pub(crate) async fn detach_tenant(
1850 0 : &self,
1851 0 : conf: &'static PageServerConf,
1852 0 : tenant_shard_id: TenantShardId,
1853 0 : deletion_queue_client: &DeletionQueueClient,
1854 0 : ) -> Result<(), TenantStateError> {
1855 0 : let tmp_path = self
1856 0 : .detach_tenant0(conf, tenant_shard_id, deletion_queue_client)
1857 0 : .await?;
1858 0 : self.background_purges.spawn(tmp_path);
1859 0 :
1860 0 : Ok(())
1861 0 : }
1862 :
1863 0 : async fn detach_tenant0(
1864 0 : &self,
1865 0 : conf: &'static PageServerConf,
1866 0 : tenant_shard_id: TenantShardId,
1867 0 : deletion_queue_client: &DeletionQueueClient,
1868 0 : ) -> Result<Utf8PathBuf, TenantStateError> {
1869 0 : let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
1870 0 : let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
1871 0 : safe_rename_tenant_dir(&local_tenant_directory)
1872 0 : .await
1873 0 : .with_context(|| {
1874 0 : format!("local tenant directory {local_tenant_directory:?} rename")
1875 0 : })
1876 0 : };
1877 :
1878 0 : let removal_result = remove_tenant_from_memory(
1879 0 : self.tenants,
1880 0 : tenant_shard_id,
1881 0 : tenant_dir_rename_operation(tenant_shard_id),
1882 0 : )
1883 0 : .await;
1884 :
1885 : // Flush pending deletions, so that they have a good chance of passing validation
1886 : // before this tenant is potentially re-attached elsewhere.
1887 0 : deletion_queue_client.flush_advisory();
1888 0 :
1889 0 : removal_result
1890 0 : }
1891 :
1892 0 : pub(crate) fn list_tenants(
1893 0 : &self,
1894 0 : ) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
1895 0 : let tenants = self.tenants.read().unwrap();
1896 0 : let m = match &*tenants {
1897 0 : TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
1898 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
1899 0 : };
1900 0 : Ok(m.iter()
1901 0 : .filter_map(|(id, tenant)| match tenant {
1902 0 : TenantSlot::Attached(tenant) => {
1903 0 : Some((*id, tenant.current_state(), tenant.generation()))
1904 : }
1905 0 : TenantSlot::Secondary(_) => None,
1906 0 : TenantSlot::InProgress(_) => None,
1907 0 : })
1908 0 : .collect())
1909 0 : }
1910 :
1911 : /// Completes an earlier prepared timeline detach ancestor.
1912 0 : pub(crate) async fn complete_detaching_timeline_ancestor(
1913 0 : &self,
1914 0 : tenant_shard_id: TenantShardId,
1915 0 : timeline_id: TimelineId,
1916 0 : prepared: PreparedTimelineDetach,
1917 0 : mut attempt: detach_ancestor::Attempt,
1918 0 : ctx: &RequestContext,
1919 0 : ) -> Result<HashSet<TimelineId>, detach_ancestor::Error> {
1920 : use detach_ancestor::Error;
1921 :
1922 0 : let slot_guard =
1923 0 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist).map_err(
1924 0 : |e| {
1925 : use TenantSlotError::*;
1926 :
1927 0 : match e {
1928 0 : MapState(TenantMapError::ShuttingDown) => Error::ShuttingDown,
1929 0 : NotFound(_) | InProgress | MapState(_) => Error::DetachReparent(e.into()),
1930 : }
1931 0 : },
1932 0 : )?;
1933 :
1934 0 : let tenant = {
1935 0 : let old_slot = slot_guard
1936 0 : .get_old_value()
1937 0 : .as_ref()
1938 0 : .expect("requested MustExist");
1939 :
1940 0 : let Some(tenant) = old_slot.get_attached() else {
1941 0 : return Err(Error::DetachReparent(anyhow::anyhow!(
1942 0 : "Tenant is not in attached state"
1943 0 : )));
1944 : };
1945 :
1946 0 : if !tenant.is_active() {
1947 0 : return Err(Error::DetachReparent(anyhow::anyhow!(
1948 0 : "Tenant is not active"
1949 0 : )));
1950 0 : }
1951 0 :
1952 0 : tenant.clone()
1953 : };
1954 :
1955 0 : let timeline = tenant
1956 0 : .get_timeline(timeline_id, true)
1957 0 : .map_err(Error::NotFound)?;
1958 :
1959 0 : let resp = timeline
1960 0 : .detach_from_ancestor_and_reparent(&tenant, prepared, ctx)
1961 0 : .await?;
1962 :
1963 0 : let mut slot_guard = slot_guard;
1964 :
1965 0 : let tenant = if resp.reset_tenant_required() {
1966 0 : attempt.before_reset_tenant();
1967 0 :
1968 0 : let (_guard, progress) = utils::completion::channel();
1969 0 : match tenant.shutdown(progress, ShutdownMode::Reload).await {
1970 0 : Ok(()) => {
1971 0 : slot_guard.drop_old_value().expect("it was just shutdown");
1972 0 : }
1973 0 : Err(_barrier) => {
1974 0 : slot_guard.revert();
1975 0 : // this really should not happen, at all, unless a shutdown without acquiring
1976 0 : // tenant slot was already going? regardless, on restart the attempt tracking
1977 0 : // will reset to retryable.
1978 0 : return Err(Error::ShuttingDown);
1979 : }
1980 : }
1981 :
1982 0 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1983 0 : let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)
1984 0 : .map_err(|e| Error::DetachReparent(e.into()))?;
1985 :
1986 0 : let shard_identity = config.shard;
1987 0 : let tenant = tenant_spawn(
1988 0 : self.conf,
1989 0 : tenant_shard_id,
1990 0 : &tenant_path,
1991 0 : self.resources.clone(),
1992 0 : AttachedTenantConf::try_from(config).map_err(Error::DetachReparent)?,
1993 0 : shard_identity,
1994 0 : None,
1995 0 : SpawnMode::Eager,
1996 0 : ctx,
1997 0 : )
1998 0 : .map_err(|_| Error::ShuttingDown)?;
1999 :
2000 : {
2001 0 : let mut g = tenant.ongoing_timeline_detach.lock().unwrap();
2002 0 : assert!(
2003 0 : g.is_none(),
2004 0 : "there cannot be any new timeline detach ancestor on newly created tenant"
2005 : );
2006 0 : *g = Some((attempt.timeline_id, attempt.new_barrier()));
2007 0 : }
2008 0 :
2009 0 : // if we bail out here, we will not allow a new attempt, which should be fine.
2010 0 : // pageserver should be shutting down regardless? tenant_reset would help, unless it
2011 0 : // runs into the same problem.
2012 0 : slot_guard
2013 0 : .upsert(TenantSlot::Attached(tenant.clone()))
2014 0 : .map_err(|e| match e {
2015 0 : TenantSlotUpsertError::ShuttingDown(_) => Error::ShuttingDown,
2016 0 : other => Error::DetachReparent(other.into()),
2017 0 : })?;
2018 0 : tenant
2019 : } else {
2020 0 : tracing::info!("skipping tenant_reset as no changes made required it");
2021 0 : tenant
2022 : };
2023 :
2024 0 : if let Some(reparented) = resp.completed() {
2025 : // finally ask the restarted tenant to complete the detach
2026 : //
2027 : // rationale for 9999s: we don't really have a timetable here; if retried, the caller
2028 : // will get an 503.
2029 0 : tenant
2030 0 : .wait_to_become_active(std::time::Duration::from_secs(9999))
2031 0 : .await
2032 0 : .map_err(|e| {
2033 : use GetActiveTenantError::{Cancelled, WillNotBecomeActive};
2034 : use pageserver_api::models::TenantState;
2035 0 : match e {
2036 : Cancelled | WillNotBecomeActive(TenantState::Stopping { .. }) => {
2037 0 : Error::ShuttingDown
2038 : }
2039 0 : other => Error::Complete(other.into()),
2040 : }
2041 0 : })?;
2042 :
2043 0 : utils::pausable_failpoint!(
2044 0 : "timeline-detach-ancestor::after_activating_before_finding-pausable"
2045 0 : );
2046 :
2047 0 : let timeline = tenant
2048 0 : .get_timeline(attempt.timeline_id, true)
2049 0 : .map_err(Error::NotFound)?;
2050 :
2051 0 : timeline
2052 0 : .complete_detaching_timeline_ancestor(&tenant, attempt, ctx)
2053 0 : .await
2054 0 : .map(|()| reparented)
2055 : } else {
2056 : // at least the latest versions have now been downloaded and refreshed; be ready to
2057 : // retry another time.
2058 0 : Err(Error::FailedToReparentAll)
2059 : }
2060 0 : }
2061 :
2062 : /// A page service client sends a TenantId, and to look up the correct Tenant we must
2063 : /// resolve this to a fully qualified TenantShardId.
2064 : ///
2065 : /// During shard splits: we shall see parent shards in InProgress state and skip them, and
2066 : /// instead match on child shards which should appear in Attached state. Very early in a shard
2067 : /// split, or in other cases where a shard is InProgress, we will return our own InProgress result
2068 : /// to instruct the caller to wait for that to finish before querying again.
2069 0 : pub(crate) fn resolve_attached_shard(
2070 0 : &self,
2071 0 : tenant_id: &TenantId,
2072 0 : selector: ShardSelector,
2073 0 : ) -> ShardResolveResult {
2074 0 : let tenants = self.tenants.read().unwrap();
2075 0 : let mut want_shard = None;
2076 0 : let mut any_in_progress = None;
2077 0 :
2078 0 : match &*tenants {
2079 0 : TenantsMap::Initializing => ShardResolveResult::NotFound,
2080 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
2081 0 : for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
2082 : // Ignore all slots that don't contain an attached tenant
2083 0 : let tenant = match &slot.1 {
2084 0 : TenantSlot::Attached(t) => t,
2085 0 : TenantSlot::InProgress(barrier) => {
2086 0 : // We might still find a usable shard, but in case we don't, remember that
2087 0 : // we saw at least one InProgress slot, so that we can distinguish this case
2088 0 : // from a simple NotFound in our return value.
2089 0 : any_in_progress = Some(barrier.clone());
2090 0 : continue;
2091 : }
2092 0 : _ => continue,
2093 : };
2094 :
2095 0 : match selector {
2096 0 : ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
2097 0 : return ShardResolveResult::Found(tenant.clone());
2098 : }
2099 0 : ShardSelector::Page(key) => {
2100 0 : // First slot we see for this tenant, calculate the expected shard number
2101 0 : // for the key: we will use this for checking if this and subsequent
2102 0 : // slots contain the key, rather than recalculating the hash each time.
2103 0 : if want_shard.is_none() {
2104 0 : want_shard = Some(tenant.shard_identity.get_shard_number(&key));
2105 0 : }
2106 :
2107 0 : if Some(tenant.shard_identity.number) == want_shard {
2108 0 : return ShardResolveResult::Found(tenant.clone());
2109 0 : }
2110 : }
2111 0 : ShardSelector::Known(shard)
2112 0 : if tenant.shard_identity.shard_index() == shard =>
2113 0 : {
2114 0 : return ShardResolveResult::Found(tenant.clone());
2115 : }
2116 0 : _ => continue,
2117 : }
2118 : }
2119 :
2120 : // Fall through: we didn't find a slot that was in Attached state & matched our selector. If
2121 : // we found one or more InProgress slot, indicate to caller that they should retry later. Otherwise
2122 : // this requested shard simply isn't found.
2123 0 : if let Some(barrier) = any_in_progress {
2124 0 : ShardResolveResult::InProgress(barrier)
2125 : } else {
2126 0 : ShardResolveResult::NotFound
2127 : }
2128 : }
2129 : }
2130 0 : }
2131 :
2132 : /// Calculate the tenant shards' contributions to this pageserver's utilization metrics. The
2133 : /// returned values are:
2134 : /// - the number of bytes of local disk space this pageserver's shards are requesting, i.e.
2135 : /// how much space they would use if not impacted by disk usage eviction.
2136 : /// - the number of tenant shards currently on this pageserver, including attached
2137 : /// and secondary.
2138 : ///
2139 : /// This function is quite expensive: callers are expected to cache the result and
2140 : /// limit how often they call it.
2141 0 : pub(crate) fn calculate_utilization(&self) -> Result<(u64, u32), TenantMapListError> {
2142 0 : let tenants = self.tenants.read().unwrap();
2143 0 : let m = match &*tenants {
2144 0 : TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
2145 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
2146 0 : };
2147 0 : let shard_count = m.len();
2148 0 : let mut wanted_bytes = 0;
2149 :
2150 0 : for tenant_slot in m.values() {
2151 0 : match tenant_slot {
2152 0 : TenantSlot::InProgress(_barrier) => {
2153 0 : // While a slot is being changed, we can't know how much storage it wants. This
2154 0 : // means this function's output can fluctuate if a lot of changes are going on
2155 0 : // (such as transitions from secondary to attached).
2156 0 : //
2157 0 : // We could wait for the barrier and retry, but it's important that the utilization
2158 0 : // API is responsive, and the data quality impact is not very significant.
2159 0 : continue;
2160 : }
2161 0 : TenantSlot::Attached(tenant) => {
2162 0 : wanted_bytes += tenant.local_storage_wanted();
2163 0 : }
2164 0 : TenantSlot::Secondary(secondary) => {
2165 0 : let progress = secondary.progress.lock().unwrap();
2166 0 : wanted_bytes += if progress.heatmap_mtime.is_some() {
2167 : // If we have heatmap info, then we will 'want' the sum
2168 : // of the size of layers in the heatmap: this is how much space
2169 : // we would use if not doing any eviction.
2170 0 : progress.bytes_total
2171 : } else {
2172 : // In the absence of heatmap info, assume that the secondary location simply
2173 : // needs as much space as it is currently using.
2174 0 : secondary.resident_size_metric.get()
2175 : }
2176 : }
2177 : }
2178 : }
2179 :
2180 0 : Ok((wanted_bytes, shard_count as u32))
2181 0 : }
2182 :
2183 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))]
2184 : pub(crate) async fn immediate_gc(
2185 : &self,
2186 : tenant_shard_id: TenantShardId,
2187 : timeline_id: TimelineId,
2188 : gc_req: TimelineGcRequest,
2189 : cancel: CancellationToken,
2190 : ctx: &RequestContext,
2191 : ) -> Result<GcResult, ApiError> {
2192 : let tenant = {
2193 : let guard = self.tenants.read().unwrap();
2194 : guard
2195 : .get(&tenant_shard_id)
2196 : .cloned()
2197 0 : .with_context(|| format!("tenant {tenant_shard_id}"))
2198 0 : .map_err(|e| ApiError::NotFound(e.into()))?
2199 : };
2200 :
2201 0 : let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
2202 : // Use tenant's pitr setting
2203 : let pitr = tenant.get_pitr_interval();
2204 :
2205 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
2206 :
2207 : // Run in task_mgr to avoid race with tenant_detach operation
2208 : let ctx: RequestContext =
2209 : ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
2210 :
2211 0 : let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
2212 :
2213 : fail::fail_point!("immediate_gc_task_pre");
2214 :
2215 : #[allow(unused_mut)]
2216 : let mut result = tenant
2217 : .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
2218 : .await;
2219 : // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
2220 : // better once the types support it.
2221 :
2222 : #[cfg(feature = "testing")]
2223 : {
2224 : // we need to synchronize with drop completion for python tests without polling for
2225 : // log messages
2226 : if let Ok(result) = result.as_mut() {
2227 : let mut js = tokio::task::JoinSet::new();
2228 : for layer in std::mem::take(&mut result.doomed_layers) {
2229 : js.spawn(layer.wait_drop());
2230 : }
2231 : tracing::info!(
2232 : total = js.len(),
2233 : "starting to wait for the gc'd layers to be dropped"
2234 : );
2235 : while let Some(res) = js.join_next().await {
2236 : res.expect("wait_drop should not panic");
2237 : }
2238 : }
2239 :
2240 : let timeline = tenant.get_timeline(timeline_id, false).ok();
2241 0 : let rtc = timeline.as_ref().map(|x| &x.remote_client);
2242 :
2243 : if let Some(rtc) = rtc {
2244 : // layer drops schedule actions on remote timeline client to actually do the
2245 : // deletions; don't care about the shutdown error, just exit fast
2246 : drop(rtc.wait_completion().await);
2247 : }
2248 : }
2249 :
2250 0 : result.map_err(|e| match e {
2251 0 : GcError::TenantCancelled | GcError::TimelineCancelled => ApiError::ShuttingDown,
2252 : GcError::TimelineNotFound => {
2253 0 : ApiError::NotFound(anyhow::anyhow!("Timeline not found").into())
2254 : }
2255 0 : other => ApiError::InternalServerError(anyhow::anyhow!(other)),
2256 0 : })
2257 : }
2258 : }
2259 :
2260 : #[derive(Debug, thiserror::Error)]
2261 : pub(crate) enum GetTenantError {
2262 : /// NotFound is a TenantId rather than TenantShardId, because this error type is used from
2263 : /// getters that use a TenantId and a ShardSelector, not just getters that target a specific shard.
2264 : #[error("Tenant {0} not found")]
2265 : NotFound(TenantId),
2266 :
2267 : #[error("Tenant {0} not found")]
2268 : ShardNotFound(TenantShardId),
2269 :
2270 : #[error("Tenant {0} is not active")]
2271 : NotActive(TenantShardId),
2272 :
2273 : // Initializing or shutting down: cannot authoritatively say whether we have this tenant
2274 : #[error("Tenant map is not available: {0}")]
2275 : MapState(#[from] TenantMapError),
2276 : }
2277 :
2278 : #[derive(thiserror::Error, Debug)]
2279 : pub(crate) enum GetActiveTenantError {
2280 : /// We may time out either while TenantSlot is InProgress, or while the Tenant
2281 : /// is in a non-Active state
2282 : #[error(
2283 : "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
2284 : )]
2285 : WaitForActiveTimeout {
2286 : latest_state: Option<TenantState>,
2287 : wait_time: Duration,
2288 : },
2289 :
2290 : /// The TenantSlot is absent, or in secondary mode
2291 : #[error(transparent)]
2292 : NotFound(#[from] GetTenantError),
2293 :
2294 : /// Cancellation token fired while we were waiting
2295 : #[error("cancelled")]
2296 : Cancelled,
2297 :
2298 : /// Tenant exists, but is in a state that cannot become active (e.g. Stopping, Broken)
2299 : #[error("will not become active. Current state: {0}")]
2300 : WillNotBecomeActive(TenantState),
2301 :
2302 : /// Broken is logically a subset of WillNotBecomeActive, but a distinct error is useful as
2303 : /// WillNotBecomeActive is a permitted error under some circumstances, whereas broken should
2304 : /// never happen.
2305 : #[error("Tenant is broken: {0}")]
2306 : Broken(String),
2307 :
2308 : #[error("reconnect to switch tenant id")]
2309 : SwitchedTenant,
2310 : }
2311 :
2312 : #[derive(Debug, thiserror::Error)]
2313 : pub(crate) enum DeleteTimelineError {
2314 : #[error("Tenant {0}")]
2315 : Tenant(#[from] GetTenantError),
2316 :
2317 : #[error("Timeline {0}")]
2318 : Timeline(#[from] crate::tenant::DeleteTimelineError),
2319 : }
2320 :
2321 : #[derive(Debug, thiserror::Error)]
2322 : pub(crate) enum TenantStateError {
2323 : #[error("Tenant {0} is stopping")]
2324 : IsStopping(TenantShardId),
2325 : #[error(transparent)]
2326 : SlotError(#[from] TenantSlotError),
2327 : #[error(transparent)]
2328 : SlotUpsertError(#[from] TenantSlotUpsertError),
2329 : #[error(transparent)]
2330 : Other(#[from] anyhow::Error),
2331 : }
2332 :
2333 : #[derive(Debug, thiserror::Error)]
2334 : pub(crate) enum TenantMapListError {
2335 : #[error("tenant map is still initiailizing")]
2336 : Initializing,
2337 : }
2338 :
2339 : #[derive(Debug, thiserror::Error)]
2340 : pub(crate) enum TenantMapInsertError {
2341 : #[error(transparent)]
2342 : SlotError(#[from] TenantSlotError),
2343 : #[error(transparent)]
2344 : SlotUpsertError(#[from] TenantSlotUpsertError),
2345 : #[error(transparent)]
2346 : Other(#[from] anyhow::Error),
2347 : }
2348 :
2349 : /// Superset of TenantMapError: issues that can occur when acquiring a slot
2350 : /// for a particular tenant ID.
2351 : #[derive(Debug, thiserror::Error)]
2352 : pub(crate) enum TenantSlotError {
2353 : /// When acquiring a slot with the expectation that the tenant already exists.
2354 : #[error("Tenant {0} not found")]
2355 : NotFound(TenantShardId),
2356 :
2357 : // Tried to read a slot that is currently being mutated by another administrative
2358 : // operation.
2359 : #[error("tenant has a state change in progress, try again later")]
2360 : InProgress,
2361 :
2362 : #[error(transparent)]
2363 : MapState(#[from] TenantMapError),
2364 : }
2365 :
2366 : /// Superset of TenantMapError: issues that can occur when using a SlotGuard
2367 : /// to insert a new value.
2368 : #[derive(thiserror::Error)]
2369 : pub(crate) enum TenantSlotUpsertError {
2370 : /// An error where the slot is in an unexpected state, indicating a code bug
2371 : #[error("Internal error updating Tenant")]
2372 : InternalError(Cow<'static, str>),
2373 :
2374 : #[error(transparent)]
2375 : MapState(TenantMapError),
2376 :
2377 : // If we encounter TenantManager shutdown during upsert, we must carry the Completion
2378 : // from the SlotGuard, so that the caller can hold it while they clean up: otherwise
2379 : // TenantManager shutdown might race ahead before we're done cleaning up any Tenant that
2380 : // was protected by the SlotGuard.
2381 : #[error("Shutting down")]
2382 : ShuttingDown((TenantSlot, utils::completion::Completion)),
2383 : }
2384 :
2385 : impl std::fmt::Debug for TenantSlotUpsertError {
2386 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2387 0 : match self {
2388 0 : Self::InternalError(reason) => write!(f, "Internal Error {reason}"),
2389 0 : Self::MapState(map_error) => write!(f, "Tenant map state: {map_error:?}"),
2390 0 : Self::ShuttingDown(_completion) => write!(f, "Tenant map shutting down"),
2391 : }
2392 0 : }
2393 : }
2394 :
2395 : #[derive(Debug, thiserror::Error)]
2396 : enum TenantSlotDropError {
2397 : /// It is only legal to drop a TenantSlot if its contents are fully shut down
2398 : #[error("Tenant was not shut down")]
2399 : NotShutdown,
2400 : }
2401 :
2402 : /// Errors that can happen any time we are walking the tenant map to try and acquire
2403 : /// the TenantSlot for a particular tenant.
2404 : #[derive(Debug, thiserror::Error)]
2405 : pub(crate) enum TenantMapError {
2406 : // Tried to read while initializing
2407 : #[error("tenant map is still initializing")]
2408 : StillInitializing,
2409 :
2410 : // Tried to read while shutting down
2411 : #[error("tenant map is shutting down")]
2412 : ShuttingDown,
2413 : }
2414 :
2415 : /// Guards a particular tenant_id's content in the TenantsMap.
2416 : ///
2417 : /// While this structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
2418 : /// for this tenant, which acts as a marker for any operations targeting
2419 : /// this tenant to retry later, or wait for the InProgress state to end.
2420 : ///
2421 : /// This structure enforces the important invariant that we do not have overlapping
2422 : /// tasks that will try use local storage for a the same tenant ID: we enforce that
2423 : /// the previous contents of a slot have been shut down before the slot can be
2424 : /// left empty or used for something else
2425 : ///
2426 : /// Holders of a SlotGuard should explicitly dispose of it, using either `upsert`
2427 : /// to provide a new value, or `revert` to put the slot back into its initial
2428 : /// state. If the SlotGuard is dropped without calling either of these, then
2429 : /// we will leave the slot empty if our `old_value` is already shut down, else
2430 : /// we will replace the slot with `old_value` (equivalent to doing a revert).
2431 : ///
2432 : /// The `old_value` may be dropped before the SlotGuard is dropped, by calling
2433 : /// `drop_old_value`. It is an error to call this without shutting down
2434 : /// the conents of `old_value`.
2435 : pub(crate) struct SlotGuard {
2436 : tenant_shard_id: TenantShardId,
2437 : old_value: Option<TenantSlot>,
2438 : upserted: bool,
2439 :
2440 : /// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will
2441 : /// release any waiters as soon as this SlotGuard is dropped.
2442 : completion: utils::completion::Completion,
2443 : }
2444 :
2445 : impl SlotGuard {
2446 4 : fn new(
2447 4 : tenant_shard_id: TenantShardId,
2448 4 : old_value: Option<TenantSlot>,
2449 4 : completion: utils::completion::Completion,
2450 4 : ) -> Self {
2451 4 : Self {
2452 4 : tenant_shard_id,
2453 4 : old_value,
2454 4 : upserted: false,
2455 4 : completion,
2456 4 : }
2457 4 : }
2458 :
2459 : /// Get any value that was present in the slot before we acquired ownership
2460 : /// of it: in state transitions, this will be the old state.
2461 : ///
2462 : // FIXME: get_ prefix
2463 : // FIXME: this should be .as_ref() -- unsure why no clippy
2464 4 : fn get_old_value(&self) -> &Option<TenantSlot> {
2465 4 : &self.old_value
2466 4 : }
2467 :
2468 : /// Emplace a new value in the slot. This consumes the guard, and after
2469 : /// returning, the slot is no longer protected from concurrent changes.
2470 0 : fn upsert(mut self, new_value: TenantSlot) -> Result<(), TenantSlotUpsertError> {
2471 0 : if !self.old_value_is_shutdown() {
2472 : // This is a bug: callers should never try to drop an old value without
2473 : // shutting it down
2474 0 : return Err(TenantSlotUpsertError::InternalError(
2475 0 : "Old TenantSlot value not shut down".into(),
2476 0 : ));
2477 0 : }
2478 :
2479 0 : let replaced = {
2480 0 : let mut locked = TENANTS.write().unwrap();
2481 0 :
2482 0 : if let TenantSlot::InProgress(_) = new_value {
2483 : // It is never expected to try and upsert InProgress via this path: it should
2484 : // only be written via the tenant_map_acquire_slot path. If we hit this it's a bug.
2485 0 : return Err(TenantSlotUpsertError::InternalError(
2486 0 : "Attempt to upsert an InProgress state".into(),
2487 0 : ));
2488 0 : }
2489 :
2490 0 : let m = match &mut *locked {
2491 : TenantsMap::Initializing => {
2492 0 : return Err(TenantSlotUpsertError::MapState(
2493 0 : TenantMapError::StillInitializing,
2494 0 : ));
2495 : }
2496 : TenantsMap::ShuttingDown(_) => {
2497 0 : return Err(TenantSlotUpsertError::ShuttingDown((
2498 0 : new_value,
2499 0 : self.completion.clone(),
2500 0 : )));
2501 : }
2502 0 : TenantsMap::Open(m) => m,
2503 0 : };
2504 0 :
2505 0 : METRICS.slot_inserted(&new_value);
2506 0 :
2507 0 : let replaced = m.insert(self.tenant_shard_id, new_value);
2508 0 : self.upserted = true;
2509 0 : if let Some(replaced) = replaced.as_ref() {
2510 0 : METRICS.slot_removed(replaced);
2511 0 : }
2512 :
2513 0 : replaced
2514 : };
2515 :
2516 : // Sanity check: on an upsert we should always be replacing an InProgress marker
2517 0 : match replaced {
2518 : Some(TenantSlot::InProgress(_)) => {
2519 : // Expected case: we find our InProgress in the map: nothing should have
2520 : // replaced it because the code that acquires slots will not grant another
2521 : // one for the same TenantId.
2522 0 : Ok(())
2523 : }
2524 : None => {
2525 0 : METRICS.unexpected_errors.inc();
2526 0 : error!(
2527 : tenant_shard_id = %self.tenant_shard_id,
2528 0 : "Missing InProgress marker during tenant upsert, this is a bug."
2529 : );
2530 0 : Err(TenantSlotUpsertError::InternalError(
2531 0 : "Missing InProgress marker during tenant upsert".into(),
2532 0 : ))
2533 : }
2534 0 : Some(slot) => {
2535 0 : METRICS.unexpected_errors.inc();
2536 0 : error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot);
2537 0 : Err(TenantSlotUpsertError::InternalError(
2538 0 : "Unexpected contents of TenantSlot".into(),
2539 0 : ))
2540 : }
2541 : }
2542 0 : }
2543 :
2544 : /// Replace the InProgress slot with whatever was in the guard when we started
2545 0 : fn revert(mut self) {
2546 0 : if let Some(value) = self.old_value.take() {
2547 0 : match self.upsert(value) {
2548 0 : Err(TenantSlotUpsertError::InternalError(_)) => {
2549 0 : // We already logged the error, nothing else we can do.
2550 0 : }
2551 : Err(
2552 : TenantSlotUpsertError::MapState(_) | TenantSlotUpsertError::ShuttingDown(_),
2553 0 : ) => {
2554 0 : // If the map is shutting down, we need not replace anything
2555 0 : }
2556 0 : Ok(()) => {}
2557 : }
2558 0 : }
2559 0 : }
2560 :
2561 : /// We may never drop our old value until it is cleanly shut down: otherwise we might leave
2562 : /// rogue background tasks that would write to the local tenant directory that this guard
2563 : /// is responsible for protecting
2564 4 : fn old_value_is_shutdown(&self) -> bool {
2565 4 : match self.old_value.as_ref() {
2566 4 : Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(),
2567 0 : Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(),
2568 : Some(TenantSlot::InProgress(_)) => {
2569 : // A SlotGuard cannot be constructed for a slot that was already InProgress
2570 0 : unreachable!()
2571 : }
2572 0 : None => true,
2573 : }
2574 4 : }
2575 :
2576 : /// The guard holder is done with the old value of the slot: they are obliged to already
2577 : /// shut it down before we reach this point.
2578 4 : fn drop_old_value(&mut self) -> Result<(), TenantSlotDropError> {
2579 4 : if !self.old_value_is_shutdown() {
2580 0 : Err(TenantSlotDropError::NotShutdown)
2581 : } else {
2582 4 : self.old_value.take();
2583 4 : Ok(())
2584 : }
2585 4 : }
2586 : }
2587 :
2588 : impl Drop for SlotGuard {
2589 4 : fn drop(&mut self) {
2590 4 : if self.upserted {
2591 0 : return;
2592 4 : }
2593 4 : // Our old value is already shutdown, or it never existed: it is safe
2594 4 : // for us to fully release the TenantSlot back into an empty state
2595 4 :
2596 4 : let mut locked = TENANTS.write().unwrap();
2597 :
2598 4 : let m = match &mut *locked {
2599 : TenantsMap::Initializing => {
2600 : // There is no map, this should never happen.
2601 4 : return;
2602 : }
2603 : TenantsMap::ShuttingDown(_) => {
2604 : // When we transition to shutdown, InProgress elements are removed
2605 : // from the map, so we do not need to clean up our Inprogress marker.
2606 : // See [`shutdown_all_tenants0`]
2607 0 : return;
2608 : }
2609 0 : TenantsMap::Open(m) => m,
2610 : };
2611 :
2612 : use std::collections::btree_map::Entry;
2613 0 : match m.entry(self.tenant_shard_id) {
2614 0 : Entry::Occupied(mut entry) => {
2615 0 : if !matches!(entry.get(), TenantSlot::InProgress(_)) {
2616 0 : METRICS.unexpected_errors.inc();
2617 0 : error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get());
2618 0 : }
2619 :
2620 0 : if self.old_value_is_shutdown() {
2621 0 : METRICS.slot_removed(entry.get());
2622 0 : entry.remove();
2623 0 : } else {
2624 0 : let inserting = self.old_value.take().unwrap();
2625 0 : METRICS.slot_inserted(&inserting);
2626 0 : let replaced = entry.insert(inserting);
2627 0 : METRICS.slot_removed(&replaced);
2628 0 : }
2629 : }
2630 : Entry::Vacant(_) => {
2631 0 : METRICS.unexpected_errors.inc();
2632 0 : error!(
2633 : tenant_shard_id = %self.tenant_shard_id,
2634 0 : "Missing InProgress marker during SlotGuard drop, this is a bug."
2635 : );
2636 : }
2637 : }
2638 4 : }
2639 : }
2640 :
2641 : enum TenantSlotPeekMode {
2642 : /// In Read mode, peek will be permitted to see the slots even if the pageserver is shutting down
2643 : Read,
2644 : /// In Write mode, trying to peek at a slot while the pageserver is shutting down is an error
2645 : Write,
2646 : }
2647 :
2648 0 : fn tenant_map_peek_slot<'a>(
2649 0 : tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
2650 0 : tenant_shard_id: &TenantShardId,
2651 0 : mode: TenantSlotPeekMode,
2652 0 : ) -> Result<Option<&'a TenantSlot>, TenantMapError> {
2653 0 : match tenants.deref() {
2654 0 : TenantsMap::Initializing => Err(TenantMapError::StillInitializing),
2655 0 : TenantsMap::ShuttingDown(m) => match mode {
2656 : TenantSlotPeekMode::Read => Ok(Some(
2657 : // When reading in ShuttingDown state, we must translate None results
2658 : // into a ShuttingDown error, because absence of a tenant shard ID in the map
2659 : // isn't a reliable indicator of the tenant being gone: it might have been
2660 : // InProgress when shutdown started, and cleaned up from that state such
2661 : // that it's now no longer in the map. Callers will have to wait until
2662 : // we next start up to get a proper answer. This avoids incorrect 404 API responses.
2663 0 : m.get(tenant_shard_id).ok_or(TenantMapError::ShuttingDown)?,
2664 : )),
2665 0 : TenantSlotPeekMode::Write => Err(TenantMapError::ShuttingDown),
2666 : },
2667 0 : TenantsMap::Open(m) => Ok(m.get(tenant_shard_id)),
2668 : }
2669 0 : }
2670 :
2671 : enum TenantSlotAcquireMode {
2672 : /// Acquire the slot irrespective of current state, or whether it already exists
2673 : Any,
2674 : /// Return an error if trying to acquire a slot and it doesn't already exist
2675 : MustExist,
2676 : }
2677 :
2678 0 : fn tenant_map_acquire_slot(
2679 0 : tenant_shard_id: &TenantShardId,
2680 0 : mode: TenantSlotAcquireMode,
2681 0 : ) -> Result<SlotGuard, TenantSlotError> {
2682 0 : tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode)
2683 0 : }
2684 :
2685 4 : fn tenant_map_acquire_slot_impl(
2686 4 : tenant_shard_id: &TenantShardId,
2687 4 : tenants: &std::sync::RwLock<TenantsMap>,
2688 4 : mode: TenantSlotAcquireMode,
2689 4 : ) -> Result<SlotGuard, TenantSlotError> {
2690 : use TenantSlotAcquireMode::*;
2691 4 : METRICS.tenant_slot_writes.inc();
2692 4 :
2693 4 : let mut locked = tenants.write().unwrap();
2694 4 : let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug());
2695 4 : let _guard = span.enter();
2696 :
2697 4 : let m = match &mut *locked {
2698 0 : TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
2699 0 : TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
2700 4 : TenantsMap::Open(m) => m,
2701 : };
2702 :
2703 : use std::collections::btree_map::Entry;
2704 :
2705 4 : let entry = m.entry(*tenant_shard_id);
2706 4 :
2707 4 : match entry {
2708 0 : Entry::Vacant(v) => match mode {
2709 : MustExist => {
2710 0 : tracing::debug!("Vacant && MustExist: return NotFound");
2711 0 : Err(TenantSlotError::NotFound(*tenant_shard_id))
2712 : }
2713 : _ => {
2714 0 : let (completion, barrier) = utils::completion::channel();
2715 0 : let inserting = TenantSlot::InProgress(barrier);
2716 0 : METRICS.slot_inserted(&inserting);
2717 0 : v.insert(inserting);
2718 0 : tracing::debug!("Vacant, inserted InProgress");
2719 0 : Ok(SlotGuard::new(*tenant_shard_id, None, completion))
2720 : }
2721 : },
2722 4 : Entry::Occupied(mut o) => {
2723 4 : // Apply mode-driven checks
2724 4 : match (o.get(), mode) {
2725 : (TenantSlot::InProgress(_), _) => {
2726 0 : tracing::debug!("Occupied, failing for InProgress");
2727 0 : Err(TenantSlotError::InProgress)
2728 : }
2729 : _ => {
2730 : // Happy case: the slot was not in any state that violated our mode
2731 4 : let (completion, barrier) = utils::completion::channel();
2732 4 : let in_progress = TenantSlot::InProgress(barrier);
2733 4 : METRICS.slot_inserted(&in_progress);
2734 4 : let old_value = o.insert(in_progress);
2735 4 : METRICS.slot_removed(&old_value);
2736 4 : tracing::debug!("Occupied, replaced with InProgress");
2737 4 : Ok(SlotGuard::new(
2738 4 : *tenant_shard_id,
2739 4 : Some(old_value),
2740 4 : completion,
2741 4 : ))
2742 : }
2743 : }
2744 : }
2745 : }
2746 4 : }
2747 :
2748 : /// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
2749 : /// Allows to remove other tenant resources manually, via `tenant_cleanup`.
2750 : /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
2751 : /// operation would be needed to remove it.
2752 4 : async fn remove_tenant_from_memory<V, F>(
2753 4 : tenants: &std::sync::RwLock<TenantsMap>,
2754 4 : tenant_shard_id: TenantShardId,
2755 4 : tenant_cleanup: F,
2756 4 : ) -> Result<V, TenantStateError>
2757 4 : where
2758 4 : F: std::future::Future<Output = anyhow::Result<V>>,
2759 4 : {
2760 4 : let mut slot_guard =
2761 4 : tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
2762 :
2763 : // allow pageserver shutdown to await for our completion
2764 4 : let (_guard, progress) = completion::channel();
2765 :
2766 : // The SlotGuard allows us to manipulate the Tenant object without fear of some
2767 : // concurrent API request doing something else for the same tenant ID.
2768 4 : let attached_tenant = match slot_guard.get_old_value() {
2769 4 : Some(TenantSlot::Attached(tenant)) => {
2770 4 : // whenever we remove a tenant from memory, we don't want to flush and wait for upload
2771 4 : let shutdown_mode = ShutdownMode::Hard;
2772 4 :
2773 4 : // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
2774 4 : // that we can continue safely to cleanup.
2775 4 : match tenant.shutdown(progress, shutdown_mode).await {
2776 4 : Ok(()) => {}
2777 0 : Err(_other) => {
2778 0 : // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
2779 0 : // wait for it but return an error right away because these are distinct requests.
2780 0 : slot_guard.revert();
2781 0 : return Err(TenantStateError::IsStopping(tenant_shard_id));
2782 : }
2783 : }
2784 4 : Some(tenant)
2785 : }
2786 0 : Some(TenantSlot::Secondary(secondary_state)) => {
2787 0 : tracing::info!("Shutting down in secondary mode");
2788 0 : secondary_state.shutdown().await;
2789 0 : None
2790 : }
2791 : Some(TenantSlot::InProgress(_)) => {
2792 : // Acquiring a slot guarantees its old value was not InProgress
2793 0 : unreachable!();
2794 : }
2795 0 : None => None,
2796 : };
2797 :
2798 4 : match tenant_cleanup
2799 4 : .await
2800 4 : .with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
2801 : {
2802 4 : Ok(hook_value) => {
2803 4 : // Success: drop the old TenantSlot::Attached.
2804 4 : slot_guard
2805 4 : .drop_old_value()
2806 4 : .expect("We just called shutdown");
2807 4 :
2808 4 : Ok(hook_value)
2809 : }
2810 0 : Err(e) => {
2811 : // If we had a Tenant, set it to Broken and put it back in the TenantsMap
2812 0 : if let Some(attached_tenant) = attached_tenant {
2813 0 : attached_tenant.set_broken(e.to_string()).await;
2814 0 : }
2815 : // Leave the broken tenant in the map
2816 0 : slot_guard.revert();
2817 0 :
2818 0 : Err(TenantStateError::Other(e))
2819 : }
2820 : }
2821 4 : }
2822 :
2823 : use http_utils::error::ApiError;
2824 : use pageserver_api::models::TimelineGcRequest;
2825 :
2826 : use crate::tenant::gc_result::GcResult;
2827 :
2828 : #[cfg(test)]
2829 : mod tests {
2830 : use std::collections::BTreeMap;
2831 : use std::sync::Arc;
2832 :
2833 : use tracing::Instrument;
2834 :
2835 : use super::super::harness::TenantHarness;
2836 : use super::TenantsMap;
2837 : use crate::tenant::mgr::TenantSlot;
2838 :
2839 : #[tokio::test(start_paused = true)]
2840 4 : async fn shutdown_awaits_in_progress_tenant() {
2841 4 : // Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully
2842 4 : // wait for it to complete before proceeding.
2843 4 :
2844 4 : let h = TenantHarness::create("shutdown_awaits_in_progress_tenant")
2845 4 : .await
2846 4 : .unwrap();
2847 4 : let (t, _ctx) = h.load().await;
2848 4 :
2849 4 : // harness loads it to active, which is forced and nothing is running on the tenant
2850 4 :
2851 4 : let id = t.tenant_shard_id();
2852 4 :
2853 4 : // tenant harness configures the logging and we cannot escape it
2854 4 : let span = h.span();
2855 4 : let _e = span.enter();
2856 4 :
2857 4 : let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
2858 4 : let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
2859 4 :
2860 4 : // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
2861 4 : // permit it to proceed: that will stick the tenant in InProgress
2862 4 :
2863 4 : let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
2864 4 : let (until_cleanup_started, cleanup_started) = utils::completion::channel();
2865 4 : let mut remove_tenant_from_memory_task = {
2866 4 : let jh = tokio::spawn({
2867 4 : let tenants = tenants.clone();
2868 4 : async move {
2869 4 : let cleanup = async move {
2870 4 : drop(until_cleanup_started);
2871 4 : can_complete_cleanup.wait().await;
2872 4 : anyhow::Ok(())
2873 4 : };
2874 4 : super::remove_tenant_from_memory(&tenants, id, cleanup).await
2875 4 : }
2876 4 : .instrument(h.span())
2877 4 : });
2878 4 :
2879 4 : // now the long cleanup should be in place, with the stopping state
2880 4 : cleanup_started.wait().await;
2881 4 : jh
2882 4 : };
2883 4 :
2884 4 : let mut shutdown_task = {
2885 4 : let (until_shutdown_started, shutdown_started) = utils::completion::channel();
2886 4 :
2887 4 : let shutdown_task = tokio::spawn(async move {
2888 4 : drop(until_shutdown_started);
2889 4 : super::shutdown_all_tenants0(&tenants).await;
2890 4 : });
2891 4 :
2892 4 : shutdown_started.wait().await;
2893 4 : shutdown_task
2894 4 : };
2895 4 :
2896 4 : let long_time = std::time::Duration::from_secs(15);
2897 4 : tokio::select! {
2898 4 : _ = &mut shutdown_task => unreachable!("shutdown should block on remove_tenant_from_memory completing"),
2899 4 : _ = &mut remove_tenant_from_memory_task => unreachable!("remove_tenant_from_memory_task should not complete until explicitly unblocked"),
2900 4 : _ = tokio::time::sleep(long_time) => {},
2901 4 : }
2902 4 :
2903 4 : drop(until_cleanup_completed);
2904 4 :
2905 4 : // Now that we allow it to proceed, shutdown should complete immediately
2906 4 : remove_tenant_from_memory_task.await.unwrap().unwrap();
2907 4 : shutdown_task.await.unwrap();
2908 4 : }
2909 : }
|