Line data Source code
1 : //! This module acts as a switchboard to access different repositories managed by this
2 : //! page server.
3 :
4 : use std::borrow::Cow;
5 : use std::cmp::Ordering;
6 : use std::collections::{BTreeMap, HashMap, HashSet};
7 : use std::ops::Deref;
8 : use std::sync::Arc;
9 : use std::time::Duration;
10 :
11 : use anyhow::Context;
12 : use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
13 : use futures::StreamExt;
14 : use itertools::Itertools;
15 : use pageserver_api::key::Key;
16 : use pageserver_api::models::{DetachBehavior, LocationConfigMode};
17 : use pageserver_api::shard::{
18 : ShardCount, ShardIdentity, ShardIndex, ShardNumber, ShardStripeSize, TenantShardId,
19 : };
20 : use pageserver_api::upcall_api::ReAttachResponseTenant;
21 : use rand::Rng;
22 : use rand::distr::Alphanumeric;
23 : use remote_storage::TimeoutOrCancel;
24 : use sysinfo::SystemExt;
25 : use tokio::fs;
26 : use tokio::task::JoinSet;
27 : use tokio_util::sync::CancellationToken;
28 : use tracing::*;
29 : use utils::crashsafe::path_with_suffix_extension;
30 : use utils::fs_ext::PathExt;
31 : use utils::generation::Generation;
32 : use utils::id::{TenantId, TimelineId};
33 : use utils::{backoff, completion, crashsafe};
34 :
35 : use super::remote_timeline_client::remote_tenant_path;
36 : use super::secondary::SecondaryTenant;
37 : use super::timeline::detach_ancestor::{self, PreparedTimelineDetach};
38 : use super::{GlobalShutDown, TenantSharedResources};
39 : use crate::config::PageServerConf;
40 : use crate::context::{DownloadBehavior, RequestContext};
41 : use crate::controller_upcall_client::{
42 : RetryForeverError, StorageControllerUpcallApi, StorageControllerUpcallClient,
43 : };
44 : use crate::deletion_queue::DeletionQueueClient;
45 : use crate::http::routes::ACTIVE_TENANT_TIMEOUT;
46 : use crate::metrics::{LOCAL_DATA_LOSS_SUSPECTED, TENANT, TENANT_MANAGER as METRICS};
47 : use crate::task_mgr::{BACKGROUND_RUNTIME, TaskKind};
48 : use crate::tenant::config::{
49 : AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, SecondaryLocationConfig,
50 : };
51 : use crate::tenant::span::debug_assert_current_span_has_tenant_id;
52 : use crate::tenant::storage_layer::inmemory_layer;
53 : use crate::tenant::timeline::ShutdownMode;
54 : use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
55 : use crate::tenant::{
56 : AttachedTenantConf, GcError, LoadConfigError, SpawnMode, TenantShard, TenantState,
57 : };
58 : use crate::virtual_file::MaybeFatalIo;
59 : use crate::{InitializationOrder, TEMP_FILE_SUFFIX};
60 :
61 : /// For a tenant that appears in TenantsMap, it may either be
62 : /// - `Attached`: has a full Tenant object, is elegible to service
63 : /// reads and ingest WAL.
64 : /// - `Secondary`: is only keeping a local cache warm.
65 : ///
66 : /// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
67 : /// that way we avoid having to carefully switch a tenant's ingestion etc on and off during
68 : /// its lifetime, and we can preserve some important safety invariants like `Tenant` always
69 : /// having a properly acquired generation (Secondary doesn't need a generation)
70 : #[derive(Clone)]
71 : pub(crate) enum TenantSlot {
72 : Attached(Arc<TenantShard>),
73 : Secondary(Arc<SecondaryTenant>),
74 : /// In this state, other administrative operations acting on the TenantId should
75 : /// block, or return a retry indicator equivalent to HTTP 503.
76 : InProgress(utils::completion::Barrier),
77 : }
78 :
79 : impl std::fmt::Debug for TenantSlot {
80 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 0 : match self {
82 0 : Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()),
83 0 : Self::Secondary(_) => write!(f, "Secondary"),
84 0 : Self::InProgress(_) => write!(f, "InProgress"),
85 : }
86 0 : }
87 : }
88 :
89 : impl TenantSlot {
90 : /// Return the `Tenant` in this slot if attached, else None
91 0 : fn get_attached(&self) -> Option<&Arc<TenantShard>> {
92 0 : match self {
93 0 : Self::Attached(t) => Some(t),
94 0 : Self::Secondary(_) => None,
95 0 : Self::InProgress(_) => None,
96 : }
97 0 : }
98 : }
99 :
100 : /// The tenants known to the pageserver.
101 : /// The enum variants are used to distinguish the different states that the pageserver can be in.
102 : pub(crate) enum TenantsMap {
103 : /// [`init_tenant_mgr`] is not done yet.
104 : Initializing,
105 : /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
106 : /// New tenants can be added using [`TenantManager::tenant_map_acquire_slot`].
107 : Open(BTreeMap<TenantShardId, TenantSlot>),
108 : /// The pageserver has entered shutdown mode via [`TenantManager::shutdown`].
109 : /// Existing tenants are still accessible, but no new tenants can be created.
110 : ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
111 : }
112 :
113 : /// When resolving a TenantId to a shard, we may be looking for the 0th
114 : /// shard, or we might be looking for whichever shard holds a particular page.
115 : #[derive(Copy, Clone)]
116 : pub(crate) enum ShardSelector {
117 : /// Only return the 0th shard, if it is present. If a non-0th shard is present,
118 : /// ignore it.
119 : Zero,
120 : /// Pick the shard that holds this key
121 : Page(Key),
122 : /// The shard ID is known: pick the given shard
123 : Known(ShardIndex),
124 : }
125 :
126 : /// A convenience for use with the re_attach ControllerUpcallClient function: rather
127 : /// than the serializable struct, we build this enum that encapsulates
128 : /// the invariant that attached tenants always have generations.
129 : ///
130 : /// This represents the subset of a LocationConfig that we receive during re-attach.
131 : pub(crate) enum TenantStartupMode {
132 : Attached((AttachmentMode, Generation, ShardStripeSize)),
133 : Secondary,
134 : }
135 :
136 : impl TenantStartupMode {
137 : /// Return the generation & mode that should be used when starting
138 : /// this tenant.
139 : ///
140 : /// If this returns None, the re-attach struct is in an invalid state and
141 : /// should be ignored in the response.
142 0 : fn from_reattach_tenant(rart: ReAttachResponseTenant) -> Option<Self> {
143 0 : match (rart.mode, rart.r#gen) {
144 0 : (LocationConfigMode::Detached, _) => None,
145 0 : (LocationConfigMode::Secondary, _) => Some(Self::Secondary),
146 0 : (LocationConfigMode::AttachedMulti, Some(g)) => Some(Self::Attached((
147 0 : AttachmentMode::Multi,
148 0 : Generation::new(g),
149 0 : rart.stripe_size,
150 0 : ))),
151 0 : (LocationConfigMode::AttachedSingle, Some(g)) => Some(Self::Attached((
152 0 : AttachmentMode::Single,
153 0 : Generation::new(g),
154 0 : rart.stripe_size,
155 0 : ))),
156 0 : (LocationConfigMode::AttachedStale, Some(g)) => Some(Self::Attached((
157 0 : AttachmentMode::Stale,
158 0 : Generation::new(g),
159 0 : rart.stripe_size,
160 0 : ))),
161 : _ => {
162 0 : tracing::warn!(
163 0 : "Received invalid re-attach state for tenant {}: {rart:?}",
164 : rart.id
165 : );
166 0 : None
167 : }
168 : }
169 0 : }
170 : }
171 :
172 : /// Result type for looking up a TenantId to a specific shard
173 : pub(crate) enum ShardResolveResult {
174 : NotFound,
175 : Found(Arc<TenantShard>),
176 : // Wait for this barrrier, then query again
177 : InProgress(utils::completion::Barrier),
178 : }
179 :
180 : impl TenantsMap {
181 : /// Convenience function for typical usage, where we want to get a `Tenant` object, for
182 : /// working with attached tenants. If the TenantId is in the map but in Secondary state,
183 : /// None is returned.
184 0 : pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc<TenantShard>> {
185 0 : match self {
186 0 : TenantsMap::Initializing => None,
187 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
188 0 : m.get(tenant_shard_id).and_then(|slot| slot.get_attached())
189 : }
190 : }
191 0 : }
192 :
193 : #[cfg(all(debug_assertions, not(test)))]
194 0 : pub(crate) fn len(&self) -> usize {
195 0 : match self {
196 0 : TenantsMap::Initializing => 0,
197 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.len(),
198 : }
199 0 : }
200 : }
201 :
202 : /// Precursor to deletion of a tenant dir: we do a fast rename to a tmp path, and then
203 : /// the slower actual deletion in the background.
204 : ///
205 : /// This is "safe" in that that it won't leave behind a partially deleted directory
206 : /// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting
207 : /// the contents.
208 : ///
209 : /// This is pageserver-specific, as it relies on future processes after a crash to check
210 : /// for TEMP_FILE_SUFFIX when loading things.
211 0 : async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<Utf8PathBuf> {
212 0 : let parent = path
213 0 : .as_ref()
214 0 : .parent()
215 : // It is invalid to call this function with a relative path. Tenant directories
216 : // should always have a parent.
217 0 : .ok_or(std::io::Error::new(
218 0 : std::io::ErrorKind::InvalidInput,
219 : "Path must be absolute",
220 0 : ))?;
221 0 : let rand_suffix = rand::rng()
222 0 : .sample_iter(&Alphanumeric)
223 0 : .take(8)
224 0 : .map(char::from)
225 0 : .collect::<String>()
226 0 : + TEMP_FILE_SUFFIX;
227 0 : let tmp_path = path_with_suffix_extension(&path, &rand_suffix);
228 0 : fs::rename(path.as_ref(), &tmp_path).await?;
229 0 : fs::File::open(parent)
230 0 : .await?
231 0 : .sync_all()
232 0 : .await
233 0 : .maybe_fatal_err("safe_rename_tenant_dir")?;
234 0 : Ok(tmp_path)
235 0 : }
236 :
237 : /// See [`Self::spawn`].
238 : #[derive(Clone, Default)]
239 : pub struct BackgroundPurges(tokio_util::task::TaskTracker);
240 :
241 : impl BackgroundPurges {
242 : /// When we have moved a tenant's content to a temporary directory, we may delete it lazily in
243 : /// the background, and thereby avoid blocking any API requests on this deletion completing.
244 : ///
245 : /// Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
246 : /// Thus the [`BackgroundPurges`] type to keep track of these tasks.
247 0 : pub fn spawn(&self, tmp_path: Utf8PathBuf) {
248 : // because on shutdown we close and wait, we are misusing TaskTracker a bit.
249 : //
250 : // so first acquire a token, then check if the tracker has been closed. the tracker might get closed
251 : // right after, but at least the shutdown will wait for what we are spawning next.
252 0 : let token = self.0.token();
253 :
254 0 : if self.0.is_closed() {
255 0 : warn!(
256 : %tmp_path,
257 0 : "trying to spawn background purge during shutdown, ignoring"
258 : );
259 0 : return;
260 0 : }
261 :
262 0 : let span = info_span!(parent: None, "background_purge", %tmp_path);
263 :
264 0 : let task = move || {
265 0 : let _token = token;
266 0 : let _entered = span.entered();
267 0 : if let Err(error) = std::fs::remove_dir_all(tmp_path.as_path()) {
268 : // should we fatal_io_error here?
269 0 : warn!(%error, "failed to purge tenant directory");
270 0 : }
271 0 : };
272 :
273 0 : BACKGROUND_RUNTIME.spawn_blocking(task);
274 0 : }
275 :
276 : /// When this future completes, all background purges have completed.
277 : /// The first poll of the future will already lock out new background purges spawned via [`Self::spawn`].
278 : ///
279 : /// Concurrent calls will coalesce.
280 : ///
281 : /// # Cancellation-Safety
282 : ///
283 : /// If this future is dropped before polled to completion, concurrent and subsequent
284 : /// instances of this future will continue to be correct.
285 : #[instrument(skip_all)]
286 : pub async fn shutdown(&self) {
287 : // forbid new tasks (can be called many times)
288 : self.0.close();
289 : self.0.wait().await;
290 : }
291 : }
292 :
293 : /// Responsible for storing and mutating the collection of all tenants
294 : /// that this pageserver has state for.
295 : ///
296 : /// Every Tenant and SecondaryTenant instance lives inside the TenantManager.
297 : ///
298 : /// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
299 : /// the same tenant twice concurrently, or trying to configure the same tenant into secondary
300 : /// and attached modes concurrently.
301 : pub struct TenantManager {
302 : conf: &'static PageServerConf,
303 : tenants: std::sync::RwLock<TenantsMap>,
304 : resources: TenantSharedResources,
305 :
306 : // Long-running operations that happen outside of a [`Tenant`] lifetime should respect this token.
307 : // This is for edge cases like tenant deletion. In normal cases (within a Tenant lifetime),
308 : // tenants have their own cancellation tokens, which we fire individually in [`Self::shutdown`], or
309 : // when the tenant detaches.
310 : cancel: CancellationToken,
311 :
312 : background_purges: BackgroundPurges,
313 : }
314 :
315 0 : fn emergency_generations(
316 0 : tenant_confs: &HashMap<TenantShardId, Result<LocationConf, LoadConfigError>>,
317 0 : ) -> HashMap<TenantShardId, TenantStartupMode> {
318 0 : tenant_confs
319 0 : .iter()
320 0 : .filter_map(|(tid, lc)| {
321 0 : let lc = match lc {
322 0 : Ok(lc) => lc,
323 0 : Err(_) => return None,
324 : };
325 : Some((
326 0 : *tid,
327 0 : match &lc.mode {
328 0 : LocationMode::Attached(alc) => TenantStartupMode::Attached((
329 0 : alc.attach_mode,
330 0 : alc.generation,
331 0 : lc.shard.stripe_size,
332 0 : )),
333 0 : LocationMode::Secondary(_) => TenantStartupMode::Secondary,
334 : },
335 : ))
336 0 : })
337 0 : .collect()
338 0 : }
339 :
340 0 : async fn init_load_generations(
341 0 : conf: &'static PageServerConf,
342 0 : tenant_confs: &HashMap<TenantShardId, Result<LocationConf, LoadConfigError>>,
343 0 : resources: &TenantSharedResources,
344 0 : cancel: &CancellationToken,
345 0 : ) -> anyhow::Result<Option<HashMap<TenantShardId, TenantStartupMode>>> {
346 0 : let generations = if conf.control_plane_emergency_mode {
347 0 : error!(
348 0 : "Emergency mode! Tenants will be attached unsafely using their last known generation"
349 : );
350 0 : emergency_generations(tenant_confs)
351 : } else {
352 0 : let client = StorageControllerUpcallClient::new(conf, cancel);
353 0 : info!("Calling {} API to re-attach tenants", client.base_url());
354 : // If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
355 0 : let empty_local_disk = tenant_confs.is_empty();
356 0 : match client.re_attach(conf, empty_local_disk).await {
357 0 : Ok(tenants) => tenants
358 0 : .into_iter()
359 0 : .flat_map(|(id, rart)| {
360 0 : TenantStartupMode::from_reattach_tenant(rart).map(|tsm| (id, tsm))
361 0 : })
362 0 : .collect(),
363 : Err(RetryForeverError::ShuttingDown) => {
364 0 : anyhow::bail!("Shut down while waiting for control plane re-attach response")
365 : }
366 : }
367 : };
368 :
369 : // The deletion queue needs to know about the startup attachment state to decide which (if any) stored
370 : // deletion list entries may still be valid. We provide that by pushing a recovery operation into
371 : // the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
372 : // are processed, even though we don't block on recovery completing here.
373 0 : let attached_tenants = generations
374 0 : .iter()
375 0 : .flat_map(|(id, start_mode)| {
376 0 : match start_mode {
377 0 : TenantStartupMode::Attached((_mode, generation, _stripe_size)) => Some(generation),
378 0 : TenantStartupMode::Secondary => None,
379 : }
380 0 : .map(|gen_| (*id, *gen_))
381 0 : })
382 0 : .collect();
383 0 : resources.deletion_queue_client.recover(attached_tenants)?;
384 :
385 0 : Ok(Some(generations))
386 0 : }
387 :
388 : /// Given a directory discovered in the pageserver's tenants/ directory, attempt
389 : /// to load a tenant config from it.
390 : ///
391 : /// If we cleaned up something expected (like an empty dir or a temp dir), return None.
392 0 : fn load_tenant_config(
393 0 : conf: &'static PageServerConf,
394 0 : tenant_shard_id: TenantShardId,
395 0 : dentry: Utf8DirEntry,
396 0 : ) -> Option<Result<LocationConf, LoadConfigError>> {
397 0 : let tenant_dir_path = dentry.path().to_path_buf();
398 0 : if crate::is_temporary(&tenant_dir_path) {
399 0 : info!("Found temporary tenant directory, removing: {tenant_dir_path}");
400 : // No need to use safe_remove_tenant_dir_all because this is already
401 : // a temporary path
402 0 : std::fs::remove_dir_all(&tenant_dir_path).fatal_err("delete temporary tenant dir");
403 0 : return None;
404 0 : }
405 :
406 : // This case happens if we crash during attachment before writing a config into the dir
407 0 : let is_empty = tenant_dir_path
408 0 : .is_empty_dir()
409 0 : .fatal_err("Checking for empty tenant dir");
410 0 : if is_empty {
411 0 : info!("removing empty tenant directory {tenant_dir_path:?}");
412 0 : std::fs::remove_dir(&tenant_dir_path).fatal_err("delete empty tenant dir");
413 0 : return None;
414 0 : }
415 :
416 0 : Some(TenantShard::load_tenant_config(conf, &tenant_shard_id))
417 0 : }
418 :
419 : /// Initial stage of load: walk the local tenants directory, clean up any temp files,
420 : /// and load configurations for the tenants we found.
421 : ///
422 : /// Do this in parallel, because we expect 10k+ tenants, so serial execution can take
423 : /// seconds even on reasonably fast drives.
424 0 : async fn init_load_tenant_configs(
425 0 : conf: &'static PageServerConf,
426 0 : ) -> HashMap<TenantShardId, Result<LocationConf, LoadConfigError>> {
427 0 : let tenants_dir = conf.tenants_path();
428 :
429 0 : let dentries = tokio::task::spawn_blocking(move || -> Vec<Utf8DirEntry> {
430 0 : let context = format!("read tenants dir {tenants_dir}");
431 0 : let dir_entries = tenants_dir.read_dir_utf8().fatal_err(&context);
432 :
433 0 : dir_entries
434 0 : .collect::<Result<Vec<_>, std::io::Error>>()
435 0 : .fatal_err(&context)
436 0 : })
437 0 : .await
438 0 : .expect("Config load task panicked");
439 :
440 0 : let mut configs = HashMap::new();
441 :
442 0 : let mut join_set = JoinSet::new();
443 0 : for dentry in dentries {
444 0 : let tenant_shard_id = match dentry.file_name().parse::<TenantShardId>() {
445 0 : Ok(id) => id,
446 : Err(_) => {
447 0 : warn!(
448 0 : "Invalid tenant path (garbage in our repo directory?): '{}'",
449 0 : dentry.file_name()
450 : );
451 0 : continue;
452 : }
453 : };
454 :
455 0 : join_set.spawn_blocking(move || {
456 0 : (
457 0 : tenant_shard_id,
458 0 : load_tenant_config(conf, tenant_shard_id, dentry),
459 0 : )
460 0 : });
461 : }
462 :
463 0 : while let Some(r) = join_set.join_next().await {
464 0 : let (tenant_shard_id, tenant_config) = r.expect("Panic in config load task");
465 0 : if let Some(tenant_config) = tenant_config {
466 0 : configs.insert(tenant_shard_id, tenant_config);
467 0 : }
468 : }
469 :
470 0 : configs
471 0 : }
472 :
473 : #[derive(Debug, thiserror::Error)]
474 : pub(crate) enum DeleteTenantError {
475 : #[error("Tenant map slot error {0}")]
476 : SlotError(#[from] TenantSlotError),
477 :
478 : #[error("Cancelled")]
479 : Cancelled,
480 :
481 : #[error(transparent)]
482 : Other(#[from] anyhow::Error),
483 : }
484 :
485 : /// Initialize repositories at `Initializing` state.
486 0 : pub fn init(
487 0 : conf: &'static PageServerConf,
488 0 : background_purges: BackgroundPurges,
489 0 : resources: TenantSharedResources,
490 0 : cancel: CancellationToken,
491 0 : ) -> TenantManager {
492 0 : TenantManager {
493 0 : conf,
494 0 : tenants: std::sync::RwLock::new(TenantsMap::Initializing),
495 0 : resources,
496 0 : cancel,
497 0 : background_purges,
498 0 : }
499 0 : }
500 :
501 : /// Transition repositories from `Initializing` state to `Open` state with locally available timelines.
502 : /// Timelines that are only partially available locally (remote storage has more data than this pageserver)
503 : /// are scheduled for download and added to the tenant once download is completed.
504 : #[instrument(skip_all)]
505 : pub async fn init_tenant_mgr(
506 : tenant_manager: Arc<TenantManager>,
507 : init_order: InitializationOrder,
508 : ) -> anyhow::Result<()> {
509 : debug_assert!(matches!(
510 : *tenant_manager.tenants.read().unwrap(),
511 : TenantsMap::Initializing
512 : ));
513 : let mut tenants = BTreeMap::new();
514 :
515 : let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
516 :
517 : let conf = tenant_manager.conf;
518 : let resources = &tenant_manager.resources;
519 : let cancel = &tenant_manager.cancel;
520 : let background_purges = &tenant_manager.background_purges;
521 :
522 : // Initialize dynamic limits that depend on system resources
523 : let system_memory =
524 : sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory())
525 : .total_memory();
526 : let max_ephemeral_layer_bytes =
527 : conf.ephemeral_bytes_per_memory_kb as u64 * (system_memory / 1024);
528 : tracing::info!(
529 : "Initialized ephemeral layer size limit to {max_ephemeral_layer_bytes}, for {system_memory} bytes of memory"
530 : );
531 : inmemory_layer::GLOBAL_RESOURCES.max_dirty_bytes.store(
532 : max_ephemeral_layer_bytes,
533 : std::sync::atomic::Ordering::Relaxed,
534 : );
535 :
536 : // Scan local filesystem for attached tenants
537 : let tenant_configs = init_load_tenant_configs(conf).await;
538 :
539 : // Determine which tenants are to be secondary or attached, and in which generation
540 : let tenant_modes = init_load_generations(conf, &tenant_configs, resources, cancel).await?;
541 :
542 : // Hadron local SSD check: Raise an alert if our local filesystem does not contain any tenants but the re-attach request returned tenants.
543 : // This can happen if the PS suffered a Kubernetes node failure resulting in loss of all local data, but recovered quickly on another node
544 : // so the Storage Controller has not had the time to move tenants out.
545 : let data_loss_suspected = if let Some(tenant_modes) = &tenant_modes {
546 : tenant_configs.is_empty() && !tenant_modes.is_empty()
547 : } else {
548 : false
549 : };
550 : if data_loss_suspected {
551 : tracing::error!(
552 : "Local data loss suspected: no tenants found on local filesystem, but re-attach request returned tenants"
553 : );
554 : }
555 : LOCAL_DATA_LOSS_SUSPECTED.set(if data_loss_suspected { 1 } else { 0 });
556 :
557 : tracing::info!(
558 : "Attaching {} tenants at startup, warming up {} at a time",
559 : tenant_configs.len(),
560 : conf.concurrent_tenant_warmup.initial_permits()
561 : );
562 : TENANT.startup_scheduled.inc_by(tenant_configs.len() as u64);
563 :
564 : // Accumulate futures for writing tenant configs, so that we can execute in parallel
565 : let mut config_write_futs = Vec::new();
566 :
567 : // Update the location configs according to the re-attach response and persist them to disk
568 : tracing::info!("Updating {} location configs", tenant_configs.len());
569 : for (tenant_shard_id, location_conf) in tenant_configs {
570 : let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
571 :
572 : let mut location_conf = match location_conf {
573 : Ok(l) => l,
574 : Err(e) => {
575 : // This should only happen in the case of a serialization bug or critical local I/O error: we cannot load this tenant
576 : error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to load tenant config, failed to {e:#}");
577 : continue;
578 : }
579 : };
580 :
581 : // FIXME: if we were attached, and get demoted to secondary on re-attach, we
582 : // don't have a place to get a config.
583 : // (https://github.com/neondatabase/neon/issues/5377)
584 : const DEFAULT_SECONDARY_CONF: SecondaryLocationConfig =
585 : SecondaryLocationConfig { warm: true };
586 :
587 : if let Some(tenant_modes) = &tenant_modes {
588 : // We have a generation map: treat it as the authority for whether
589 : // this tenant is really attached.
590 : match tenant_modes.get(&tenant_shard_id) {
591 : None => {
592 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Detaching tenant, control plane omitted it in re-attach response");
593 :
594 : match safe_rename_tenant_dir(&tenant_dir_path).await {
595 : Ok(tmp_path) => {
596 : background_purges.spawn(tmp_path);
597 : }
598 : Err(e) => {
599 : error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
600 : "Failed to move detached tenant directory '{tenant_dir_path}': {e:?}");
601 : }
602 : };
603 :
604 : // We deleted local content: move on to next tenant, don't try and spawn this one.
605 : continue;
606 : }
607 : Some(TenantStartupMode::Secondary) => {
608 : if !matches!(location_conf.mode, LocationMode::Secondary(_)) {
609 : location_conf.mode = LocationMode::Secondary(DEFAULT_SECONDARY_CONF);
610 : }
611 : }
612 : Some(TenantStartupMode::Attached((attach_mode, generation, stripe_size))) => {
613 : let old_gen_higher = match &location_conf.mode {
614 : LocationMode::Attached(AttachedLocationConfig {
615 : generation: old_generation,
616 : attach_mode: _attach_mode,
617 : }) => {
618 : if old_generation > generation {
619 : Some(old_generation)
620 : } else {
621 : None
622 : }
623 : }
624 : _ => None,
625 : };
626 : if let Some(old_generation) = old_gen_higher {
627 : tracing::error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
628 : "Control plane gave decreasing generation ({generation:?}) in re-attach response for tenant that was attached in generation {:?}, demoting to secondary",
629 : old_generation
630 : );
631 :
632 : // We cannot safely attach this tenant given a bogus generation number, but let's avoid throwing away
633 : // local disk content: demote to secondary rather than detaching.
634 : location_conf.mode = LocationMode::Secondary(DEFAULT_SECONDARY_CONF);
635 : } else {
636 : location_conf.attach_in_generation(*attach_mode, *generation, *stripe_size);
637 : }
638 : }
639 : }
640 : } else {
641 : // Legacy mode: no generation information, any tenant present
642 : // on local disk may activate
643 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Starting tenant in legacy mode, no generation",);
644 : };
645 :
646 : // Presence of a generation number implies attachment: attach the tenant
647 : // if it wasn't already, and apply the generation number.
648 0 : config_write_futs.push(async move {
649 0 : let r =
650 0 : TenantShard::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await;
651 0 : (tenant_shard_id, location_conf, r)
652 0 : });
653 : }
654 :
655 : // Execute config writes with concurrency, to avoid bottlenecking on local FS write latency
656 : tracing::info!(
657 : "Writing {} location config files...",
658 : config_write_futs.len()
659 : );
660 : let config_write_results = futures::stream::iter(config_write_futs)
661 : .buffer_unordered(16)
662 : .collect::<Vec<_>>()
663 : .await;
664 :
665 : tracing::info!(
666 : "Spawning {} tenant shard locations...",
667 : config_write_results.len()
668 : );
669 : // For those shards that have live configurations, construct `Tenant` or `SecondaryTenant` objects and start them running
670 : for (tenant_shard_id, location_conf, config_write_result) in config_write_results {
671 : // Writing a config to local disk is foundational to startup up tenants: panic if we can't.
672 : config_write_result.fatal_err("write tenant shard config file");
673 :
674 : let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
675 : let shard_identity = location_conf.shard;
676 : let slot = match location_conf.mode {
677 : LocationMode::Attached(attached_conf) => TenantSlot::Attached(
678 : tenant_spawn(
679 : conf,
680 : tenant_shard_id,
681 : &tenant_dir_path,
682 : resources.clone(),
683 : AttachedTenantConf::new(conf, location_conf.tenant_conf, attached_conf),
684 : shard_identity,
685 : Some(init_order.clone()),
686 : SpawnMode::Lazy,
687 : &ctx,
688 : )
689 : .expect("global shutdown during init_tenant_mgr cannot happen"),
690 : ),
691 : LocationMode::Secondary(secondary_conf) => {
692 : info!(
693 : tenant_id = %tenant_shard_id.tenant_id,
694 : shard_id = %tenant_shard_id.shard_slug(),
695 : "Starting secondary tenant"
696 : );
697 : TenantSlot::Secondary(SecondaryTenant::new(
698 : tenant_shard_id,
699 : shard_identity,
700 : location_conf.tenant_conf,
701 : &secondary_conf,
702 : ))
703 : }
704 : };
705 :
706 : METRICS.slot_inserted(&slot);
707 : tenants.insert(tenant_shard_id, slot);
708 : }
709 :
710 : info!("Processed {} local tenants at startup", tenants.len());
711 :
712 : let mut tenant_map = tenant_manager.tenants.write().unwrap();
713 : *tenant_map = TenantsMap::Open(tenants);
714 :
715 : Ok(())
716 : }
717 :
718 : /// Wrapper for Tenant::spawn that checks invariants before running
719 : #[allow(clippy::too_many_arguments)]
720 0 : fn tenant_spawn(
721 0 : conf: &'static PageServerConf,
722 0 : tenant_shard_id: TenantShardId,
723 0 : tenant_path: &Utf8Path,
724 0 : resources: TenantSharedResources,
725 0 : location_conf: AttachedTenantConf,
726 0 : shard_identity: ShardIdentity,
727 0 : init_order: Option<InitializationOrder>,
728 0 : mode: SpawnMode,
729 0 : ctx: &RequestContext,
730 0 : ) -> Result<Arc<TenantShard>, GlobalShutDown> {
731 : // All these conditions should have been satisfied by our caller: the tenant dir exists, is a well formed
732 : // path, and contains a configuration file. Assertions that do synchronous I/O are limited to debug mode
733 : // to avoid impacting prod runtime performance.
734 0 : assert!(!crate::is_temporary(tenant_path));
735 0 : debug_assert!(tenant_path.is_dir());
736 0 : debug_assert!(
737 0 : conf.tenant_location_config_path(&tenant_shard_id)
738 0 : .try_exists()
739 0 : .unwrap()
740 : );
741 :
742 0 : TenantShard::spawn(
743 0 : conf,
744 0 : tenant_shard_id,
745 0 : resources,
746 0 : location_conf,
747 0 : shard_identity,
748 0 : init_order,
749 0 : mode,
750 0 : ctx,
751 : )
752 0 : }
753 :
754 : #[derive(thiserror::Error, Debug)]
755 : pub(crate) enum UpsertLocationError {
756 : #[error("Bad config request: {0}")]
757 : BadRequest(anyhow::Error),
758 :
759 : #[error("Cannot change config in this state: {0}")]
760 : Unavailable(#[from] TenantMapError),
761 :
762 : #[error("Tenant is already being modified")]
763 : InProgress,
764 :
765 : #[error("Failed to flush: {0}")]
766 : Flush(anyhow::Error),
767 :
768 : /// This error variant is for unexpected situations (soft assertions) where the system is in an unexpected state.
769 : #[error("Internal error: {0}")]
770 : InternalError(anyhow::Error),
771 : }
772 :
773 : impl TenantManager {
774 : /// Convenience function so that anyone with a TenantManager can get at the global configuration, without
775 : /// having to pass it around everywhere as a separate object.
776 0 : pub(crate) fn get_conf(&self) -> &'static PageServerConf {
777 0 : self.conf
778 0 : }
779 :
780 : /// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or currently
781 : /// undergoing a state change (i.e. slot is InProgress).
782 : ///
783 : /// The return TenantShard is not guaranteed to be active: check its status after obtaing it, or
784 : /// use [`TenantShard::wait_to_become_active`] before using it if you will do I/O on it.
785 0 : pub(crate) fn get_attached_tenant_shard(
786 0 : &self,
787 0 : tenant_shard_id: TenantShardId,
788 0 : ) -> Result<Arc<TenantShard>, GetTenantError> {
789 0 : let locked = self.tenants.read().unwrap();
790 :
791 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
792 :
793 0 : match peek_slot {
794 0 : Some(TenantSlot::Attached(tenant)) => Ok(Arc::clone(tenant)),
795 0 : Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
796 : None | Some(TenantSlot::Secondary(_)) => {
797 0 : Err(GetTenantError::ShardNotFound(tenant_shard_id))
798 : }
799 : }
800 0 : }
801 :
802 0 : pub(crate) fn get_secondary_tenant_shard(
803 0 : &self,
804 0 : tenant_shard_id: TenantShardId,
805 0 : ) -> Option<Arc<SecondaryTenant>> {
806 0 : let locked = self.tenants.read().unwrap();
807 :
808 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
809 0 : .ok()
810 0 : .flatten();
811 :
812 0 : match peek_slot {
813 0 : Some(TenantSlot::Secondary(s)) => Some(s.clone()),
814 0 : _ => None,
815 : }
816 0 : }
817 :
818 : /// Whether the `TenantManager` is responsible for the tenant shard
819 0 : pub(crate) fn manages_tenant_shard(&self, tenant_shard_id: TenantShardId) -> bool {
820 0 : let locked = self.tenants.read().unwrap();
821 :
822 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
823 0 : .ok()
824 0 : .flatten();
825 :
826 0 : peek_slot.is_some()
827 0 : }
828 :
829 : /// Returns whether a local shard exists that's a child of the given tenant shard. Note that
830 : /// this just checks for any shard with a larger shard count, and it may not be a direct child
831 : /// of the given shard (their keyspace may not overlap).
832 0 : pub(crate) fn has_child_shard(&self, tenant_id: TenantId, shard_index: ShardIndex) -> bool {
833 0 : match &*self.tenants.read().unwrap() {
834 0 : TenantsMap::Initializing => false,
835 0 : TenantsMap::Open(slots) | TenantsMap::ShuttingDown(slots) => slots
836 0 : .range(TenantShardId::tenant_range(tenant_id))
837 0 : .any(|(tsid, _)| tsid.shard_count > shard_index.shard_count),
838 : }
839 0 : }
840 :
841 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
842 : pub(crate) async fn upsert_location(
843 : &self,
844 : tenant_shard_id: TenantShardId,
845 : new_location_config: LocationConf,
846 : flush: Option<Duration>,
847 : mut spawn_mode: SpawnMode,
848 : ctx: &RequestContext,
849 : ) -> Result<Option<Arc<TenantShard>>, UpsertLocationError> {
850 : debug_assert_current_span_has_tenant_id();
851 : info!("configuring tenant location to state {new_location_config:?}");
852 :
853 : enum FastPathModified {
854 : Attached(Arc<TenantShard>),
855 : Secondary(Arc<SecondaryTenant>),
856 : }
857 :
858 : // Special case fast-path for updates to existing slots: if our upsert is only updating configuration,
859 : // then we do not need to set the slot to InProgress, we can just call into the
860 : // existng tenant.
861 : let fast_path_taken = {
862 : let locked = self.tenants.read().unwrap();
863 : let peek_slot =
864 : tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
865 : match (&new_location_config.mode, peek_slot) {
866 : (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
867 : match attach_conf.generation.cmp(&tenant.generation) {
868 : Ordering::Equal => {
869 : // A transition from Attached to Attached in the same generation, we may
870 : // take our fast path and just provide the updated configuration
871 : // to the tenant.
872 : tenant.set_new_location_config(
873 : AttachedTenantConf::try_from(
874 : self.conf,
875 : new_location_config.clone(),
876 : )
877 : .map_err(UpsertLocationError::BadRequest)?,
878 : );
879 :
880 : Some(FastPathModified::Attached(tenant.clone()))
881 : }
882 : Ordering::Less => {
883 : return Err(UpsertLocationError::BadRequest(anyhow::anyhow!(
884 : "Generation {:?} is less than existing {:?}",
885 : attach_conf.generation,
886 : tenant.generation
887 : )));
888 : }
889 : Ordering::Greater => {
890 : // Generation advanced, fall through to general case of replacing `Tenant` object
891 : None
892 : }
893 : }
894 : }
895 : (
896 : LocationMode::Secondary(secondary_conf),
897 : Some(TenantSlot::Secondary(secondary_tenant)),
898 : ) => {
899 : secondary_tenant.set_config(secondary_conf);
900 : secondary_tenant.set_tenant_conf(&new_location_config.tenant_conf);
901 : Some(FastPathModified::Secondary(secondary_tenant.clone()))
902 : }
903 : _ => {
904 : // Not an Attached->Attached transition, fall through to general case
905 : None
906 : }
907 : }
908 : };
909 :
910 : // Fast-path continued: having dropped out of the self.tenants lock, do the async
911 : // phase of writing config and/or waiting for flush, before returning.
912 : match fast_path_taken {
913 : Some(FastPathModified::Attached(tenant)) => {
914 : tenant
915 : .shard_identity
916 : .assert_equal(new_location_config.shard);
917 : TenantShard::persist_tenant_config(
918 : self.conf,
919 : &tenant_shard_id,
920 : &new_location_config,
921 : )
922 : .await
923 : .fatal_err("write tenant shard config");
924 :
925 : // Transition to AttachedStale means we may well hold a valid generation
926 : // still, and have been requested to go stale as part of a migration. If
927 : // the caller set `flush`, then flush to remote storage.
928 : if let LocationMode::Attached(AttachedLocationConfig {
929 : generation: _,
930 : attach_mode: AttachmentMode::Stale,
931 : }) = &new_location_config.mode
932 : {
933 : if let Some(flush_timeout) = flush {
934 : match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
935 : Ok(Err(e)) => {
936 : return Err(UpsertLocationError::Flush(e));
937 : }
938 : Ok(Ok(_)) => return Ok(Some(tenant)),
939 : Err(_) => {
940 : tracing::warn!(
941 : timeout_ms = flush_timeout.as_millis(),
942 : "Timed out waiting for flush to remote storage, proceeding anyway."
943 : )
944 : }
945 : }
946 : }
947 : }
948 :
949 : return Ok(Some(tenant));
950 : }
951 : Some(FastPathModified::Secondary(secondary_tenant)) => {
952 : secondary_tenant
953 : .shard_identity
954 : .assert_equal(new_location_config.shard);
955 : TenantShard::persist_tenant_config(
956 : self.conf,
957 : &tenant_shard_id,
958 : &new_location_config,
959 : )
960 : .await
961 : .fatal_err("write tenant shard config");
962 :
963 : return Ok(None);
964 : }
965 : None => {
966 : // Proceed with the general case procedure, where we will shutdown & remove any existing
967 : // slot contents and replace with a fresh one
968 : }
969 : };
970 :
971 : // General case for upserts to TenantsMap, excluding the case above: we will substitute an
972 : // InProgress value to the slot while we make whatever changes are required. The state for
973 : // the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
974 : // the state is ill-defined while we're in transition. Transitions are async, but fast: we do
975 : // not do significant I/O, and shutdowns should be prompt via cancellation tokens.
976 : let mut slot_guard = self
977 : .tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)
978 0 : .map_err(|e| match e {
979 : TenantSlotError::NotFound(_) => {
980 0 : unreachable!("Called with mode Any")
981 : }
982 0 : TenantSlotError::InProgress => UpsertLocationError::InProgress,
983 0 : TenantSlotError::MapState(s) => UpsertLocationError::Unavailable(s),
984 0 : })?;
985 :
986 : match slot_guard.get_old_value() {
987 : Some(TenantSlot::Attached(tenant)) => {
988 : tenant
989 : .shard_identity
990 : .assert_equal(new_location_config.shard);
991 :
992 : // The case where we keep a Tenant alive was covered above in the special case
993 : // for Attached->Attached transitions in the same generation. By this point,
994 : // if we see an attached tenant we know it will be discarded and should be
995 : // shut down.
996 : let (_guard, progress) = utils::completion::channel();
997 :
998 : match tenant.get_attach_mode() {
999 : AttachmentMode::Single | AttachmentMode::Multi => {
1000 : // Before we leave our state as the presumed holder of the latest generation,
1001 : // flush any outstanding deletions to reduce the risk of leaking objects.
1002 : self.resources.deletion_queue_client.flush_advisory()
1003 : }
1004 : AttachmentMode::Stale => {
1005 : // If we're stale there's not point trying to flush deletions
1006 : }
1007 : };
1008 :
1009 : info!("Shutting down attached tenant");
1010 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1011 : Ok(()) => {}
1012 : Err(barrier) => {
1013 : info!("Shutdown already in progress, waiting for it to complete");
1014 : barrier.wait().await;
1015 : }
1016 : }
1017 : slot_guard.drop_old_value().expect("We just shut it down");
1018 :
1019 : // Edge case: if we were called with SpawnMode::Create, but a Tenant already existed, then
1020 : // the caller thinks they're creating but the tenant already existed. We must switch to
1021 : // Eager mode so that when starting this Tenant we properly probe remote storage for timelines,
1022 : // rather than assuming it to be empty.
1023 : spawn_mode = SpawnMode::Eager;
1024 : }
1025 : Some(TenantSlot::Secondary(secondary_tenant)) => {
1026 : secondary_tenant
1027 : .shard_identity
1028 : .assert_equal(new_location_config.shard);
1029 :
1030 : info!("Shutting down secondary tenant");
1031 : secondary_tenant.shutdown().await;
1032 : }
1033 : Some(TenantSlot::InProgress(_)) => {
1034 : // This should never happen: acquire_slot should error out
1035 : // if the contents of a slot were InProgress.
1036 : return Err(UpsertLocationError::InternalError(anyhow::anyhow!(
1037 : "Acquired an InProgress slot, this is a bug."
1038 : )));
1039 : }
1040 : None => {
1041 : // Slot was vacant, nothing needs shutting down.
1042 : }
1043 : }
1044 :
1045 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1046 : let timelines_path = self.conf.timelines_path(&tenant_shard_id);
1047 :
1048 : // Directory structure is the same for attached and secondary modes:
1049 : // create it if it doesn't exist. Timeline load/creation expects the
1050 : // timelines/ subdir to already exist.
1051 : //
1052 : // Does not need to be fsync'd because local storage is just a cache.
1053 : tokio::fs::create_dir_all(&timelines_path)
1054 : .await
1055 : .fatal_err("create timelines/ dir");
1056 :
1057 : // Before activating either secondary or attached mode, persist the
1058 : // configuration, so that on restart we will re-attach (or re-start
1059 : // secondary) on the tenant.
1060 : TenantShard::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
1061 : .await
1062 : .fatal_err("write tenant shard config");
1063 :
1064 : let new_slot = match &new_location_config.mode {
1065 : LocationMode::Secondary(secondary_config) => {
1066 : let shard_identity = new_location_config.shard;
1067 : TenantSlot::Secondary(SecondaryTenant::new(
1068 : tenant_shard_id,
1069 : shard_identity,
1070 : new_location_config.tenant_conf,
1071 : secondary_config,
1072 : ))
1073 : }
1074 : LocationMode::Attached(_attach_config) => {
1075 : let shard_identity = new_location_config.shard;
1076 :
1077 : // Testing hack: if we are configured with no control plane, then drop the generation
1078 : // from upserts. This enables creating generation-less tenants even though neon_local
1079 : // always uses generations when calling the location conf API.
1080 : let attached_conf = AttachedTenantConf::try_from(self.conf, new_location_config)
1081 : .map_err(UpsertLocationError::BadRequest)?;
1082 :
1083 : let tenant = tenant_spawn(
1084 : self.conf,
1085 : tenant_shard_id,
1086 : &tenant_path,
1087 : self.resources.clone(),
1088 : attached_conf,
1089 : shard_identity,
1090 : None,
1091 : spawn_mode,
1092 : ctx,
1093 : )
1094 0 : .map_err(|_: GlobalShutDown| {
1095 0 : UpsertLocationError::Unavailable(TenantMapError::ShuttingDown)
1096 0 : })?;
1097 :
1098 : TenantSlot::Attached(tenant)
1099 : }
1100 : };
1101 :
1102 : let attached_tenant = if let TenantSlot::Attached(tenant) = &new_slot {
1103 : Some(tenant.clone())
1104 : } else {
1105 : None
1106 : };
1107 :
1108 : match slot_guard.upsert(new_slot) {
1109 : Err(TenantSlotUpsertError::InternalError(e)) => {
1110 : Err(UpsertLocationError::InternalError(anyhow::anyhow!(e)))
1111 : }
1112 : Err(TenantSlotUpsertError::MapState(e)) => Err(UpsertLocationError::Unavailable(e)),
1113 : Err(TenantSlotUpsertError::ShuttingDown((new_slot, _completion))) => {
1114 : // If we just called tenant_spawn() on a new tenant, and can't insert it into our map, then
1115 : // we must not leak it: this would violate the invariant that after shutdown_all_tenants, all tenants
1116 : // are shutdown.
1117 : //
1118 : // We must shut it down inline here.
1119 : match new_slot {
1120 : TenantSlot::InProgress(_) => {
1121 : // Unreachable because we never insert an InProgress
1122 : unreachable!()
1123 : }
1124 : TenantSlot::Attached(tenant) => {
1125 : let (_guard, progress) = utils::completion::channel();
1126 : info!(
1127 : "Shutting down just-spawned tenant, because tenant manager is shut down"
1128 : );
1129 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1130 : Ok(()) => {
1131 : info!("Finished shutting down just-spawned tenant");
1132 : }
1133 : Err(barrier) => {
1134 : info!("Shutdown already in progress, waiting for it to complete");
1135 : barrier.wait().await;
1136 : }
1137 : }
1138 : }
1139 : TenantSlot::Secondary(secondary_tenant) => {
1140 : secondary_tenant.shutdown().await;
1141 : }
1142 : }
1143 :
1144 : Err(UpsertLocationError::Unavailable(
1145 : TenantMapError::ShuttingDown,
1146 : ))
1147 : }
1148 : Ok(()) => Ok(attached_tenant),
1149 : }
1150 : }
1151 :
1152 1 : fn tenant_map_acquire_slot(
1153 1 : &self,
1154 1 : tenant_shard_id: &TenantShardId,
1155 1 : mode: TenantSlotAcquireMode,
1156 1 : ) -> Result<SlotGuard, TenantSlotError> {
1157 : use TenantSlotAcquireMode::*;
1158 1 : METRICS.tenant_slot_writes.inc();
1159 :
1160 1 : let mut locked = self.tenants.write().unwrap();
1161 1 : let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug());
1162 1 : let _guard = span.enter();
1163 :
1164 1 : let m = match &mut *locked {
1165 0 : TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
1166 0 : TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
1167 1 : TenantsMap::Open(m) => m,
1168 : };
1169 :
1170 : use std::collections::btree_map::Entry;
1171 :
1172 1 : let entry = m.entry(*tenant_shard_id);
1173 :
1174 1 : match entry {
1175 0 : Entry::Vacant(v) => match mode {
1176 : MustExist => {
1177 0 : tracing::debug!("Vacant && MustExist: return NotFound");
1178 0 : Err(TenantSlotError::NotFound(*tenant_shard_id))
1179 : }
1180 : _ => {
1181 0 : let (completion, barrier) = utils::completion::channel();
1182 0 : let inserting = TenantSlot::InProgress(barrier);
1183 0 : METRICS.slot_inserted(&inserting);
1184 0 : v.insert(inserting);
1185 0 : tracing::debug!("Vacant, inserted InProgress");
1186 0 : Ok(SlotGuard::new(
1187 0 : *tenant_shard_id,
1188 0 : None,
1189 0 : completion,
1190 0 : &self.tenants,
1191 0 : ))
1192 : }
1193 : },
1194 1 : Entry::Occupied(mut o) => {
1195 : // Apply mode-driven checks
1196 1 : match (o.get(), mode) {
1197 : (TenantSlot::InProgress(_), _) => {
1198 0 : tracing::debug!("Occupied, failing for InProgress");
1199 0 : Err(TenantSlotError::InProgress)
1200 : }
1201 : _ => {
1202 : // Happy case: the slot was not in any state that violated our mode
1203 1 : let (completion, barrier) = utils::completion::channel();
1204 1 : let in_progress = TenantSlot::InProgress(barrier);
1205 1 : METRICS.slot_inserted(&in_progress);
1206 1 : let old_value = o.insert(in_progress);
1207 1 : METRICS.slot_removed(&old_value);
1208 1 : tracing::debug!("Occupied, replaced with InProgress");
1209 1 : Ok(SlotGuard::new(
1210 1 : *tenant_shard_id,
1211 1 : Some(old_value),
1212 1 : completion,
1213 1 : &self.tenants,
1214 1 : ))
1215 : }
1216 : }
1217 : }
1218 : }
1219 1 : }
1220 :
1221 : /// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
1222 : /// LocationConf that was last used to attach it. Optionally, the local file cache may be
1223 : /// dropped before re-attaching.
1224 : ///
1225 : /// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
1226 : /// where an issue is identified that would go away with a restart of the tenant.
1227 : ///
1228 : /// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
1229 : /// to respect the cancellation tokens used in normal shutdown().
1230 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
1231 : pub(crate) async fn reset_tenant(
1232 : &self,
1233 : tenant_shard_id: TenantShardId,
1234 : drop_cache: bool,
1235 : ctx: &RequestContext,
1236 : ) -> anyhow::Result<()> {
1237 : let mut slot_guard =
1238 : self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1239 : let Some(old_slot) = slot_guard.get_old_value() else {
1240 : anyhow::bail!("Tenant not found when trying to reset");
1241 : };
1242 :
1243 : let Some(tenant) = old_slot.get_attached() else {
1244 : slot_guard.revert();
1245 : anyhow::bail!("Tenant is not in attached state");
1246 : };
1247 :
1248 : let (_guard, progress) = utils::completion::channel();
1249 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1250 : Ok(()) => {
1251 : slot_guard.drop_old_value()?;
1252 : }
1253 : Err(_barrier) => {
1254 : slot_guard.revert();
1255 : anyhow::bail!("Cannot reset Tenant, already shutting down");
1256 : }
1257 : }
1258 :
1259 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1260 : let timelines_path = self.conf.timelines_path(&tenant_shard_id);
1261 : let config = TenantShard::load_tenant_config(self.conf, &tenant_shard_id)?;
1262 :
1263 : if drop_cache {
1264 : tracing::info!("Dropping local file cache");
1265 :
1266 : match tokio::fs::read_dir(&timelines_path).await {
1267 : Err(e) => {
1268 : tracing::warn!("Failed to list timelines while dropping cache: {}", e);
1269 : }
1270 : Ok(mut entries) => {
1271 : while let Some(entry) = entries.next_entry().await? {
1272 : tokio::fs::remove_dir_all(entry.path()).await?;
1273 : }
1274 : }
1275 : }
1276 : }
1277 :
1278 : let shard_identity = config.shard;
1279 : let tenant = tenant_spawn(
1280 : self.conf,
1281 : tenant_shard_id,
1282 : &tenant_path,
1283 : self.resources.clone(),
1284 : AttachedTenantConf::try_from(self.conf, config)?,
1285 : shard_identity,
1286 : None,
1287 : SpawnMode::Eager,
1288 : ctx,
1289 : )?;
1290 :
1291 : slot_guard.upsert(TenantSlot::Attached(tenant))?;
1292 :
1293 : Ok(())
1294 : }
1295 :
1296 0 : pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec<Arc<TenantShard>> {
1297 0 : let locked = self.tenants.read().unwrap();
1298 0 : match &*locked {
1299 0 : TenantsMap::Initializing => Vec::new(),
1300 0 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => map
1301 0 : .values()
1302 0 : .filter_map(|slot| {
1303 0 : slot.get_attached()
1304 0 : .and_then(|t| if t.is_active() { Some(t.clone()) } else { None })
1305 0 : })
1306 0 : .collect(),
1307 : }
1308 0 : }
1309 : // Do some synchronous work for all tenant slots in Secondary state. The provided
1310 : // callback should be small and fast, as it will be called inside the global
1311 : // TenantsMap lock.
1312 0 : pub(crate) fn foreach_secondary_tenants<F>(&self, mut func: F)
1313 0 : where
1314 0 : // TODO: let the callback return a hint to drop out of the loop early
1315 0 : F: FnMut(&TenantShardId, &Arc<SecondaryTenant>),
1316 : {
1317 0 : let locked = self.tenants.read().unwrap();
1318 :
1319 0 : let map = match &*locked {
1320 0 : TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
1321 0 : TenantsMap::Open(m) => m,
1322 : };
1323 :
1324 0 : for (tenant_id, slot) in map {
1325 0 : if let TenantSlot::Secondary(state) = slot {
1326 : // Only expose secondary tenants that are not currently shutting down
1327 0 : if !state.cancel.is_cancelled() {
1328 0 : func(tenant_id, state)
1329 0 : }
1330 0 : }
1331 : }
1332 0 : }
1333 :
1334 : /// Total list of all tenant slots: this includes attached, secondary, and InProgress.
1335 0 : pub(crate) fn list(&self) -> Vec<(TenantShardId, TenantSlot)> {
1336 0 : let locked = self.tenants.read().unwrap();
1337 0 : match &*locked {
1338 0 : TenantsMap::Initializing => Vec::new(),
1339 0 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
1340 0 : map.iter().map(|(k, v)| (*k, v.clone())).collect()
1341 : }
1342 : }
1343 0 : }
1344 :
1345 0 : pub(crate) fn get(&self, tenant_shard_id: TenantShardId) -> Option<TenantSlot> {
1346 0 : let locked = self.tenants.read().unwrap();
1347 0 : match &*locked {
1348 0 : TenantsMap::Initializing => None,
1349 0 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
1350 0 : map.get(&tenant_shard_id).cloned()
1351 : }
1352 : }
1353 0 : }
1354 :
1355 : /// If a tenant is attached, detach it. Then remove its data from remote storage.
1356 : ///
1357 : /// A tenant is considered deleted once it is gone from remote storage. It is the caller's
1358 : /// responsibility to avoid trying to attach the tenant again or use it any way once deletion
1359 : /// has started: this operation is not atomic, and must be retried until it succeeds.
1360 : ///
1361 : /// As a special case, if an unsharded tenant ID is given for a sharded tenant, it will remove
1362 : /// all tenant shards in remote storage (removing all paths with the tenant prefix). The storage
1363 : /// controller uses this to purge all remote tenant data, including any stale parent shards that
1364 : /// may remain after splits. Ideally, this special case would be handled elsewhere. See:
1365 : /// <https://github.com/neondatabase/neon/pull/9394>.
1366 0 : pub(crate) async fn delete_tenant(
1367 0 : &self,
1368 0 : tenant_shard_id: TenantShardId,
1369 0 : ) -> Result<(), DeleteTenantError> {
1370 0 : super::span::debug_assert_current_span_has_tenant_id();
1371 :
1372 0 : async fn delete_local(
1373 0 : conf: &PageServerConf,
1374 0 : background_purges: &BackgroundPurges,
1375 0 : tenant_shard_id: &TenantShardId,
1376 0 : ) -> anyhow::Result<()> {
1377 0 : let local_tenant_directory = conf.tenant_path(tenant_shard_id);
1378 0 : let tmp_dir = safe_rename_tenant_dir(&local_tenant_directory)
1379 0 : .await
1380 0 : .with_context(|| {
1381 0 : format!("local tenant directory {local_tenant_directory:?} rename")
1382 0 : })?;
1383 0 : background_purges.spawn(tmp_dir);
1384 0 : Ok(())
1385 0 : }
1386 :
1387 0 : let slot_guard =
1388 0 : self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1389 0 : match &slot_guard.old_value {
1390 0 : Some(TenantSlot::Attached(tenant)) => {
1391 : // Legacy deletion flow: the tenant remains attached, goes to Stopping state, and
1392 : // deletion will be resumed across restarts.
1393 0 : let tenant = tenant.clone();
1394 0 : let (_guard, progress) = utils::completion::channel();
1395 0 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1396 0 : Ok(()) => {}
1397 0 : Err(barrier) => {
1398 0 : info!("Shutdown already in progress, waiting for it to complete");
1399 0 : barrier.wait().await;
1400 : }
1401 : }
1402 0 : delete_local(self.conf, &self.background_purges, &tenant_shard_id).await?;
1403 : }
1404 0 : Some(TenantSlot::Secondary(secondary_tenant)) => {
1405 0 : secondary_tenant.shutdown().await;
1406 :
1407 0 : delete_local(self.conf, &self.background_purges, &tenant_shard_id).await?;
1408 : }
1409 0 : Some(TenantSlot::InProgress(_)) => unreachable!(),
1410 0 : None => {}
1411 : };
1412 :
1413 : // Fall through: local state for this tenant is no longer present, proceed with remote delete.
1414 : // - We use a retry wrapper here so that common transient S3 errors (e.g. 503, 429) do not result
1415 : // in 500 responses to delete requests.
1416 : // - We keep the `SlotGuard` during this I/O, so that if a concurrent delete request comes in, it will
1417 : // 503/retry, rather than kicking off a wasteful concurrent deletion.
1418 : // NB: this also deletes partial prefixes, i.e. a <tenant_id> path will delete all
1419 : // <tenant_id>_<shard_id>/* objects. See method comment for why.
1420 0 : backoff::retry(
1421 0 : || async move {
1422 0 : self.resources
1423 0 : .remote_storage
1424 0 : .delete_prefix(&remote_tenant_path(&tenant_shard_id), &self.cancel)
1425 0 : .await
1426 0 : },
1427 : |_| false, // backoff::retry handles cancellation
1428 : 1,
1429 : 3,
1430 0 : &format!("delete_tenant[tenant_shard_id={tenant_shard_id}]"),
1431 0 : &self.cancel,
1432 : )
1433 0 : .await
1434 0 : .unwrap_or(Err(TimeoutOrCancel::Cancel.into()))
1435 0 : .map_err(|err| {
1436 0 : if TimeoutOrCancel::caused_by_cancel(&err) {
1437 0 : return DeleteTenantError::Cancelled;
1438 0 : }
1439 0 : DeleteTenantError::Other(err)
1440 0 : })
1441 0 : }
1442 :
1443 : #[instrument(skip_all, fields(tenant_id=%tenant.get_tenant_shard_id().tenant_id, shard_id=%tenant.get_tenant_shard_id().shard_slug(), new_shard_count=%new_shard_count.literal()))]
1444 : pub(crate) async fn shard_split(
1445 : &self,
1446 : tenant: Arc<TenantShard>,
1447 : new_shard_count: ShardCount,
1448 : new_stripe_size: Option<ShardStripeSize>,
1449 : ctx: &RequestContext,
1450 : ) -> anyhow::Result<Vec<TenantShardId>> {
1451 : let tenant_shard_id = *tenant.get_tenant_shard_id();
1452 : let r = self
1453 : .do_shard_split(tenant, new_shard_count, new_stripe_size, ctx)
1454 : .await;
1455 : if r.is_err() {
1456 : // Shard splitting might have left the original shard in a partially shut down state (it
1457 : // stops the shard's remote timeline client). Reset it to ensure we leave things in
1458 : // a working state.
1459 : if self.get(tenant_shard_id).is_some() {
1460 : tracing::warn!("Resetting after shard split failure");
1461 : if let Err(e) = self.reset_tenant(tenant_shard_id, false, ctx).await {
1462 : // Log this error because our return value will still be the original error, not this one. This is
1463 : // a severe error: if this happens, we might be leaving behind a tenant that is not fully functional
1464 : // (e.g. has uploads disabled). We can't do anything else: if reset fails then shutting the tenant down or
1465 : // setting it broken probably won't help either.
1466 : tracing::error!("Failed to reset: {e}");
1467 : }
1468 : }
1469 : }
1470 :
1471 : r
1472 : }
1473 :
1474 0 : pub(crate) async fn do_shard_split(
1475 0 : &self,
1476 0 : tenant: Arc<TenantShard>,
1477 0 : new_shard_count: ShardCount,
1478 0 : new_stripe_size: Option<ShardStripeSize>,
1479 0 : ctx: &RequestContext,
1480 0 : ) -> anyhow::Result<Vec<TenantShardId>> {
1481 0 : let tenant_shard_id = *tenant.get_tenant_shard_id();
1482 :
1483 : // Validate the incoming request
1484 0 : if new_shard_count.count() <= tenant_shard_id.shard_count.count() {
1485 0 : anyhow::bail!("Requested shard count is not an increase");
1486 0 : }
1487 0 : let expansion_factor = new_shard_count.count() / tenant_shard_id.shard_count.count();
1488 0 : if !expansion_factor.is_power_of_two() {
1489 0 : anyhow::bail!("Requested split is not a power of two");
1490 0 : }
1491 :
1492 0 : if let Some(new_stripe_size) = new_stripe_size {
1493 0 : if tenant.get_shard_stripe_size() != new_stripe_size
1494 0 : && tenant_shard_id.shard_count.count() > 1
1495 : {
1496 : // This tenant already has multiple shards, it is illegal to try and change its stripe size
1497 0 : anyhow::bail!(
1498 0 : "Shard stripe size may not be modified once tenant has multiple shards"
1499 : );
1500 0 : }
1501 0 : }
1502 :
1503 : // Plan: identify what the new child shards will be
1504 0 : let child_shards = tenant_shard_id.split(new_shard_count);
1505 0 : tracing::info!(
1506 0 : "Shard {} splits into: {}",
1507 0 : tenant_shard_id.to_index(),
1508 0 : child_shards
1509 0 : .iter()
1510 0 : .map(|id| format!("{}", id.to_index()))
1511 0 : .join(",")
1512 : );
1513 :
1514 0 : fail::fail_point!("shard-split-pre-prepare", |_| Err(anyhow::anyhow!(
1515 0 : "failpoint"
1516 : )));
1517 :
1518 0 : let parent_shard_identity = tenant.shard_identity;
1519 0 : let parent_tenant_conf = tenant.get_tenant_conf();
1520 0 : let parent_generation = tenant.generation;
1521 :
1522 : // Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
1523 0 : if let Err(e) = tenant.split_prepare(&child_shards).await {
1524 : // If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
1525 : // have been left in a partially-shut-down state.
1526 0 : tracing::warn!("Failed to prepare for split: {e}, reloading Tenant before returning");
1527 0 : return Err(e);
1528 0 : }
1529 :
1530 0 : fail::fail_point!("shard-split-post-prepare", |_| Err(anyhow::anyhow!(
1531 0 : "failpoint"
1532 : )));
1533 :
1534 0 : self.resources.deletion_queue_client.flush_advisory();
1535 :
1536 : // Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
1537 : //
1538 : // TODO: keeping the parent as InProgress while spawning the children causes read
1539 : // unavailability, as we can't acquire a new timeline handle for it (existing handles appear
1540 : // to still work though, even downgraded ones). The parent should be available for reads
1541 : // until the children are ready -- potentially until *all* subsplits across all parent
1542 : // shards are complete and the compute has been notified. See:
1543 : // <https://databricks.atlassian.net/browse/LKB-672>.
1544 0 : drop(tenant);
1545 0 : let mut parent_slot_guard =
1546 0 : self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1547 0 : let parent = match parent_slot_guard.get_old_value() {
1548 0 : Some(TenantSlot::Attached(t)) => t,
1549 0 : Some(TenantSlot::Secondary(_)) => anyhow::bail!("Tenant location in secondary mode"),
1550 : Some(TenantSlot::InProgress(_)) => {
1551 : // tenant_map_acquire_slot never returns InProgress, if a slot was InProgress
1552 : // it would return an error.
1553 0 : unreachable!()
1554 : }
1555 : None => {
1556 : // We don't actually need the parent shard to still be attached to do our work, but it's
1557 : // a weird enough situation that the caller probably didn't want us to continue working
1558 : // if they had detached the tenant they requested the split on.
1559 0 : anyhow::bail!("Detached parent shard in the middle of split!")
1560 : }
1561 : };
1562 0 : fail::fail_point!("shard-split-pre-hardlink", |_| Err(anyhow::anyhow!(
1563 0 : "failpoint"
1564 : )));
1565 : // Optimization: hardlink layers from the parent into the children, so that they don't have to
1566 : // re-download & duplicate the data referenced in their initial IndexPart
1567 0 : self.shard_split_hardlink(parent, child_shards.clone())
1568 0 : .await?;
1569 0 : fail::fail_point!("shard-split-post-hardlink", |_| Err(anyhow::anyhow!(
1570 0 : "failpoint"
1571 : )));
1572 :
1573 : // Take a snapshot of where the parent's WAL ingest had got to: we will wait for
1574 : // child shards to reach this point.
1575 0 : let mut target_lsns = HashMap::new();
1576 0 : for timeline in parent.timelines.lock().unwrap().clone().values() {
1577 0 : target_lsns.insert(timeline.timeline_id, timeline.get_last_record_lsn());
1578 0 : }
1579 :
1580 : // TODO: we should have the parent shard stop its WAL ingest here, it's a waste of resources
1581 : // and could slow down the children trying to catch up.
1582 :
1583 : // Phase 3: Spawn the child shards
1584 0 : for child_shard in &child_shards {
1585 0 : let mut child_shard_identity = parent_shard_identity;
1586 0 : if let Some(new_stripe_size) = new_stripe_size {
1587 0 : child_shard_identity.stripe_size = new_stripe_size;
1588 0 : }
1589 0 : child_shard_identity.count = child_shard.shard_count;
1590 0 : child_shard_identity.number = child_shard.shard_number;
1591 :
1592 0 : let child_location_conf = LocationConf {
1593 0 : mode: LocationMode::Attached(AttachedLocationConfig {
1594 0 : generation: parent_generation,
1595 0 : attach_mode: AttachmentMode::Single,
1596 0 : }),
1597 0 : shard: child_shard_identity,
1598 0 : tenant_conf: parent_tenant_conf.clone(),
1599 0 : };
1600 :
1601 0 : self.upsert_location(
1602 0 : *child_shard,
1603 0 : child_location_conf,
1604 0 : None,
1605 0 : SpawnMode::Eager,
1606 0 : ctx,
1607 0 : )
1608 0 : .await?;
1609 : }
1610 :
1611 0 : fail::fail_point!("shard-split-post-child-conf", |_| Err(anyhow::anyhow!(
1612 0 : "failpoint"
1613 : )));
1614 :
1615 : // Phase 4: wait for child chards WAL ingest to catch up to target LSN
1616 0 : for child_shard_id in &child_shards {
1617 0 : let child_shard_id = *child_shard_id;
1618 0 : let child_shard = {
1619 0 : let locked = self.tenants.read().unwrap();
1620 0 : let peek_slot =
1621 0 : tenant_map_peek_slot(&locked, &child_shard_id, TenantSlotPeekMode::Read)?;
1622 0 : peek_slot.and_then(|s| s.get_attached()).cloned()
1623 : };
1624 0 : if let Some(t) = child_shard {
1625 : // Wait for the child shard to become active: this should be very quick because it only
1626 : // has to download the index_part that we just uploaded when creating it.
1627 0 : if let Err(e) = t.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await {
1628 : // This is not fatal: we have durably created the child shard. It just makes the
1629 : // split operation less seamless for clients, as we will may detach the parent
1630 : // shard before the child shards are fully ready to serve requests.
1631 0 : tracing::warn!("Failed to wait for shard {child_shard_id} to activate: {e}");
1632 0 : continue;
1633 0 : }
1634 :
1635 0 : let timelines = t.timelines.lock().unwrap().clone();
1636 0 : for timeline in timelines.values() {
1637 0 : let Some(target_lsn) = target_lsns.get(&timeline.timeline_id) else {
1638 0 : continue;
1639 : };
1640 :
1641 0 : tracing::info!(
1642 0 : "Waiting for child shard {}/{} to reach target lsn {}...",
1643 : child_shard_id,
1644 0 : timeline.timeline_id,
1645 : target_lsn
1646 : );
1647 :
1648 0 : fail::fail_point!("shard-split-lsn-wait", |_| Err(anyhow::anyhow!(
1649 0 : "failpoint"
1650 : )));
1651 0 : if let Err(e) = timeline
1652 0 : .wait_lsn(
1653 0 : *target_lsn,
1654 0 : crate::tenant::timeline::WaitLsnWaiter::Tenant,
1655 0 : crate::tenant::timeline::WaitLsnTimeout::Default,
1656 0 : ctx,
1657 0 : )
1658 0 : .await
1659 : {
1660 : // Failure here might mean shutdown, in any case this part is an optimization
1661 : // and we shouldn't hold up the split operation.
1662 0 : tracing::warn!(
1663 0 : "Failed to wait for timeline {} to reach lsn {target_lsn}: {e}",
1664 0 : timeline.timeline_id
1665 : );
1666 : } else {
1667 0 : tracing::info!(
1668 0 : "Child shard {}/{} reached target lsn {}",
1669 : child_shard_id,
1670 0 : timeline.timeline_id,
1671 : target_lsn
1672 : );
1673 : }
1674 : }
1675 0 : }
1676 : }
1677 :
1678 : // Phase 5: Shut down the parent shard. We leave it on disk in case the split fails and we
1679 : // have to roll back to the parent shard, avoiding a cold start. It will be cleaned up once
1680 : // the storage controller commits the split, or if all else fails, on the next restart.
1681 : //
1682 : // TODO: We don't flush the ephemeral layer here, because the split is likely to succeed and
1683 : // catching up the parent should be reasonably quick. Consider using FreezeAndFlush instead.
1684 0 : let (_guard, progress) = completion::channel();
1685 0 : match parent.shutdown(progress, ShutdownMode::Hard).await {
1686 0 : Ok(()) => {}
1687 0 : Err(other) => {
1688 0 : other.wait().await;
1689 : }
1690 : }
1691 :
1692 0 : fail::fail_point!("shard-split-pre-finish", |_| Err(anyhow::anyhow!(
1693 0 : "failpoint"
1694 : )));
1695 :
1696 0 : parent_slot_guard.drop_old_value()?;
1697 :
1698 : // Phase 6: Release the InProgress on the parent shard
1699 0 : drop(parent_slot_guard);
1700 :
1701 0 : utils::pausable_failpoint!("shard-split-post-finish-pause");
1702 :
1703 0 : Ok(child_shards)
1704 0 : }
1705 :
1706 : /// Part of [`Self::shard_split`]: hard link parent shard layers into child shards, as an optimization
1707 : /// to avoid the children downloading them again.
1708 : ///
1709 : /// For each resident layer in the parent shard, we will hard link it into all of the child shards.
1710 0 : async fn shard_split_hardlink(
1711 0 : &self,
1712 0 : parent_shard: &TenantShard,
1713 0 : child_shards: Vec<TenantShardId>,
1714 0 : ) -> anyhow::Result<()> {
1715 0 : debug_assert_current_span_has_tenant_id();
1716 :
1717 0 : let parent_path = self.conf.tenant_path(parent_shard.get_tenant_shard_id());
1718 0 : let (parent_timelines, parent_layers) = {
1719 0 : let mut parent_layers = Vec::new();
1720 0 : let timelines = parent_shard.timelines.lock().unwrap().clone();
1721 0 : let parent_timelines = timelines.keys().cloned().collect::<Vec<_>>();
1722 0 : for timeline in timelines.values() {
1723 0 : tracing::info!(timeline_id=%timeline.timeline_id, "Loading list of layers to hardlink");
1724 0 : let layers = timeline
1725 0 : .layers
1726 0 : .read(LayerManagerLockHolder::GetLayerMapInfo)
1727 0 : .await;
1728 :
1729 0 : for layer in layers.likely_resident_layers() {
1730 0 : let relative_path = layer
1731 0 : .local_path()
1732 0 : .strip_prefix(&parent_path)
1733 0 : .context("Removing prefix from parent layer path")?;
1734 0 : parent_layers.push(relative_path.to_owned());
1735 : }
1736 : }
1737 :
1738 0 : if parent_layers.is_empty() {
1739 0 : tracing::info!("Ancestor shard has no resident layer to hard link");
1740 0 : }
1741 :
1742 0 : (parent_timelines, parent_layers)
1743 : };
1744 :
1745 0 : let mut child_prefixes = Vec::new();
1746 0 : let mut create_dirs = Vec::new();
1747 :
1748 0 : for child in child_shards {
1749 0 : let child_prefix = self.conf.tenant_path(&child);
1750 0 : create_dirs.push(child_prefix.clone());
1751 0 : create_dirs.extend(
1752 0 : parent_timelines
1753 0 : .iter()
1754 0 : .map(|t| self.conf.timeline_path(&child, t)),
1755 : );
1756 :
1757 0 : child_prefixes.push(child_prefix);
1758 : }
1759 :
1760 : // Since we will do a large number of small filesystem metadata operations, batch them into
1761 : // spawn_blocking calls rather than doing each one as a tokio::fs round-trip.
1762 0 : let span = tracing::Span::current();
1763 0 : let jh = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> {
1764 : // Run this synchronous code in the same log context as the outer function that spawned it.
1765 0 : let _span = span.enter();
1766 :
1767 0 : tracing::info!("Creating {} directories", create_dirs.len());
1768 0 : for dir in &create_dirs {
1769 0 : if let Err(e) = std::fs::create_dir_all(dir) {
1770 : // Ignore AlreadyExists errors, drop out on all other errors
1771 0 : match e.kind() {
1772 0 : std::io::ErrorKind::AlreadyExists => {}
1773 : _ => {
1774 0 : return Err(anyhow::anyhow!(e).context(format!("Creating {dir}")));
1775 : }
1776 : }
1777 0 : }
1778 : }
1779 :
1780 0 : for child_prefix in child_prefixes {
1781 0 : tracing::info!(
1782 0 : "Hard-linking {} parent layers into child path {}",
1783 0 : parent_layers.len(),
1784 : child_prefix
1785 : );
1786 0 : for relative_layer in &parent_layers {
1787 0 : let parent_path = parent_path.join(relative_layer);
1788 0 : let child_path = child_prefix.join(relative_layer);
1789 0 : if let Err(e) = std::fs::hard_link(&parent_path, &child_path) {
1790 0 : match e.kind() {
1791 0 : std::io::ErrorKind::AlreadyExists => {}
1792 : std::io::ErrorKind::NotFound => {
1793 0 : tracing::info!(
1794 0 : "Layer {} not found during hard-linking, evicted during split?",
1795 : relative_layer
1796 : );
1797 : }
1798 : _ => {
1799 0 : return Err(anyhow::anyhow!(e).context(format!(
1800 0 : "Hard linking {relative_layer} into {child_prefix}"
1801 0 : )));
1802 : }
1803 : }
1804 0 : }
1805 : }
1806 : }
1807 :
1808 : // Durability is not required for correctness, but if we crashed during split and
1809 : // then came restarted with empty timeline dirs, it would be very inefficient to
1810 : // re-populate from remote storage.
1811 0 : tracing::info!("fsyncing {} directories", create_dirs.len());
1812 0 : for dir in create_dirs {
1813 0 : if let Err(e) = crashsafe::fsync(&dir) {
1814 : // Something removed a newly created timeline dir out from underneath us? Extremely
1815 : // unexpected, but not worth panic'ing over as this whole function is just an
1816 : // optimization.
1817 0 : tracing::warn!("Failed to fsync directory {dir}: {e}")
1818 0 : }
1819 : }
1820 :
1821 0 : Ok(parent_layers.len())
1822 0 : });
1823 :
1824 0 : match jh.await {
1825 0 : Ok(Ok(layer_count)) => {
1826 0 : tracing::info!(count = layer_count, "Hard linked layers into child shards");
1827 : }
1828 0 : Ok(Err(e)) => {
1829 : // This is an optimization, so we tolerate failure.
1830 0 : tracing::warn!("Error hard-linking layers, proceeding anyway: {e}")
1831 : }
1832 0 : Err(e) => {
1833 : // This is something totally unexpected like a panic, so bail out.
1834 0 : anyhow::bail!("Error joining hard linking task: {e}");
1835 : }
1836 : }
1837 :
1838 0 : Ok(())
1839 0 : }
1840 :
1841 : ///
1842 : /// Shut down all tenants. This runs as part of pageserver shutdown.
1843 : ///
1844 : /// NB: We leave the tenants in the map, so that they remain accessible through
1845 : /// the management API until we shut it down. If we removed the shut-down tenants
1846 : /// from the tenants map, the management API would return 404 for these tenants,
1847 : /// because TenantsMap::get() now returns `None`.
1848 : /// That could be easily misinterpreted by control plane, the consumer of the
1849 : /// management API. For example, it could attach the tenant on a different pageserver.
1850 : /// We would then be in split-brain once this pageserver restarts.
1851 : #[instrument(skip_all)]
1852 : pub(crate) async fn shutdown(&self) {
1853 : self.cancel.cancel();
1854 :
1855 : self.shutdown_all_tenants0().await
1856 : }
1857 :
1858 1 : async fn shutdown_all_tenants0(&self) {
1859 1 : let mut join_set = JoinSet::new();
1860 :
1861 : #[cfg(all(debug_assertions, not(test)))]
1862 : {
1863 : // Check that our metrics properly tracked the size of the tenants map. This is a convenient location to check,
1864 : // as it happens implicitly at the end of tests etc.
1865 0 : let m = self.tenants.read().unwrap();
1866 0 : debug_assert_eq!(METRICS.slots_total(), m.len() as u64);
1867 : }
1868 :
1869 : // Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
1870 1 : let (total_in_progress, total_attached) = {
1871 1 : let mut m = self.tenants.write().unwrap();
1872 1 : match &mut *m {
1873 : TenantsMap::Initializing => {
1874 0 : *m = TenantsMap::ShuttingDown(BTreeMap::default());
1875 0 : info!("tenants map is empty");
1876 0 : return;
1877 : }
1878 1 : TenantsMap::Open(tenants) => {
1879 1 : let mut shutdown_state = BTreeMap::new();
1880 1 : let mut total_in_progress = 0;
1881 1 : let mut total_attached = 0;
1882 :
1883 1 : for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
1884 1 : match v {
1885 0 : TenantSlot::Attached(t) => {
1886 0 : shutdown_state
1887 0 : .insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
1888 0 : join_set.spawn(
1889 0 : async move {
1890 0 : let res = {
1891 0 : let (_guard, shutdown_progress) = completion::channel();
1892 0 : t.shutdown(shutdown_progress, ShutdownMode::FreezeAndFlush).await
1893 : };
1894 :
1895 0 : if let Err(other_progress) = res {
1896 : // join the another shutdown in progress
1897 0 : other_progress.wait().await;
1898 0 : }
1899 :
1900 : // we cannot afford per tenant logging here, because if s3 is degraded, we are
1901 : // going to log too many lines
1902 0 : debug!("tenant successfully stopped");
1903 0 : }
1904 0 : .instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())),
1905 : );
1906 :
1907 0 : total_attached += 1;
1908 : }
1909 0 : TenantSlot::Secondary(state) => {
1910 0 : // We don't need to wait for this individually per-tenant: the
1911 0 : // downloader task will be waited on eventually, this cancel
1912 0 : // is just to encourage it to drop out if it is doing work
1913 0 : // for this tenant right now.
1914 0 : state.cancel.cancel();
1915 0 :
1916 0 : shutdown_state
1917 0 : .insert(tenant_shard_id, TenantSlot::Secondary(state));
1918 0 : }
1919 1 : TenantSlot::InProgress(notify) => {
1920 : // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
1921 : // wait for their notifications to fire in this function.
1922 1 : join_set.spawn(async move {
1923 1 : notify.wait().await;
1924 1 : });
1925 :
1926 1 : total_in_progress += 1;
1927 : }
1928 : }
1929 : }
1930 1 : *m = TenantsMap::ShuttingDown(shutdown_state);
1931 1 : (total_in_progress, total_attached)
1932 : }
1933 : TenantsMap::ShuttingDown(_) => {
1934 0 : error!(
1935 0 : "already shutting down, this function isn't supposed to be called more than once"
1936 : );
1937 0 : return;
1938 : }
1939 : }
1940 : };
1941 :
1942 1 : let started_at = std::time::Instant::now();
1943 :
1944 1 : info!(
1945 0 : "Waiting for {} InProgress tenants and {} Attached tenants to shut down",
1946 : total_in_progress, total_attached
1947 : );
1948 :
1949 1 : let total = join_set.len();
1950 1 : let mut panicked = 0;
1951 1 : let mut buffering = true;
1952 : const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
1953 1 : let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
1954 :
1955 3 : while !join_set.is_empty() {
1956 2 : tokio::select! {
1957 2 : Some(joined) = join_set.join_next() => {
1958 0 : match joined {
1959 1 : Ok(()) => {},
1960 0 : Err(join_error) if join_error.is_cancelled() => {
1961 0 : unreachable!("we are not cancelling any of the tasks");
1962 : }
1963 0 : Err(join_error) if join_error.is_panic() => {
1964 0 : // cannot really do anything, as this panic is likely a bug
1965 0 : panicked += 1;
1966 0 : }
1967 0 : Err(join_error) => {
1968 0 : warn!("unknown kind of JoinError: {join_error}");
1969 : }
1970 : }
1971 1 : if !buffering {
1972 1 : // buffer so that every 500ms since the first update (or starting) we'll log
1973 1 : // how far away we are; this is because we will get SIGKILL'd at 10s, and we
1974 1 : // are not able to log *then*.
1975 1 : buffering = true;
1976 1 : buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
1977 1 : }
1978 : },
1979 2 : _ = &mut buffered, if buffering => {
1980 1 : buffering = false;
1981 1 : info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
1982 : }
1983 : }
1984 : }
1985 :
1986 1 : if panicked > 0 {
1987 0 : warn!(
1988 : panicked,
1989 0 : total, "observed panicks while shutting down tenants"
1990 : );
1991 1 : }
1992 :
1993 : // caller will log how long we took
1994 1 : }
1995 :
1996 : /// Detaches a tenant, and removes its local files asynchronously.
1997 : ///
1998 : /// File removal is idempotent: even if the tenant has already been removed, this will still
1999 : /// remove any local files. This is used during shard splits, where we leave the parent shard's
2000 : /// files around in case we have to roll back the split.
2001 0 : pub(crate) async fn detach_tenant(
2002 0 : &self,
2003 0 : conf: &'static PageServerConf,
2004 0 : tenant_shard_id: TenantShardId,
2005 0 : deletion_queue_client: &DeletionQueueClient,
2006 0 : ) -> Result<(), TenantStateError> {
2007 0 : if let Some(tmp_path) = self
2008 0 : .detach_tenant0(conf, tenant_shard_id, deletion_queue_client)
2009 0 : .await?
2010 0 : {
2011 0 : self.background_purges.spawn(tmp_path);
2012 0 : }
2013 :
2014 0 : Ok(())
2015 0 : }
2016 :
2017 : /// Detaches a tenant. This renames the tenant directory to a temporary path and returns it,
2018 : /// allowing the caller to delete it asynchronously. Returns None if the dir is already removed.
2019 0 : async fn detach_tenant0(
2020 0 : &self,
2021 0 : conf: &'static PageServerConf,
2022 0 : tenant_shard_id: TenantShardId,
2023 0 : deletion_queue_client: &DeletionQueueClient,
2024 0 : ) -> Result<Option<Utf8PathBuf>, TenantStateError> {
2025 0 : let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
2026 0 : let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
2027 0 : if !tokio::fs::try_exists(&local_tenant_directory).await? {
2028 : // If the tenant directory doesn't exist, it's already cleaned up.
2029 0 : return Ok(None);
2030 0 : }
2031 0 : safe_rename_tenant_dir(&local_tenant_directory)
2032 0 : .await
2033 0 : .with_context(|| {
2034 0 : format!("local tenant directory {local_tenant_directory:?} rename")
2035 0 : })
2036 0 : .map(Some)
2037 0 : };
2038 :
2039 0 : let mut removal_result = self
2040 0 : .remove_tenant_from_memory(
2041 0 : tenant_shard_id,
2042 0 : tenant_dir_rename_operation(tenant_shard_id),
2043 0 : )
2044 0 : .await;
2045 :
2046 : // If the tenant was not found, it was likely already removed. Attempt to remove the tenant
2047 : // directory on disk anyway. For example, during shard splits, we shut down and remove the
2048 : // parent shard, but leave its directory on disk in case we have to roll back the split.
2049 : //
2050 : // TODO: it would be better to leave the parent shard attached until the split is committed.
2051 : // This will be needed by the gRPC page service too, such that a compute can continue to
2052 : // read from the parent shard until it's notified about the new child shards. See:
2053 : // <https://github.com/neondatabase/neon/issues/11728>.
2054 0 : if let Err(TenantStateError::SlotError(TenantSlotError::NotFound(_))) = removal_result {
2055 0 : removal_result = tenant_dir_rename_operation(tenant_shard_id)
2056 0 : .await
2057 0 : .map_err(TenantStateError::Other);
2058 0 : }
2059 :
2060 : // Flush pending deletions, so that they have a good chance of passing validation
2061 : // before this tenant is potentially re-attached elsewhere.
2062 0 : deletion_queue_client.flush_advisory();
2063 :
2064 0 : removal_result
2065 0 : }
2066 :
2067 0 : pub(crate) fn list_tenants(
2068 0 : &self,
2069 0 : ) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
2070 0 : let tenants = self.tenants.read().unwrap();
2071 0 : let m = match &*tenants {
2072 0 : TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
2073 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
2074 : };
2075 0 : Ok(m.iter()
2076 0 : .filter_map(|(id, tenant)| match tenant {
2077 0 : TenantSlot::Attached(tenant) => {
2078 0 : Some((*id, tenant.current_state(), tenant.generation()))
2079 : }
2080 0 : TenantSlot::Secondary(_) => None,
2081 0 : TenantSlot::InProgress(_) => None,
2082 0 : })
2083 0 : .collect())
2084 0 : }
2085 :
2086 : /// Completes an earlier prepared timeline detach ancestor.
2087 0 : pub(crate) async fn complete_detaching_timeline_ancestor(
2088 0 : &self,
2089 0 : tenant_shard_id: TenantShardId,
2090 0 : timeline_id: TimelineId,
2091 0 : prepared: PreparedTimelineDetach,
2092 0 : behavior: DetachBehavior,
2093 0 : mut attempt: detach_ancestor::Attempt,
2094 0 : ctx: &RequestContext,
2095 0 : ) -> Result<HashSet<TimelineId>, detach_ancestor::Error> {
2096 : use detach_ancestor::Error;
2097 :
2098 0 : let slot_guard = self
2099 0 : .tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)
2100 0 : .map_err(|e| {
2101 : use TenantSlotError::*;
2102 :
2103 0 : match e {
2104 0 : MapState(TenantMapError::ShuttingDown) => Error::ShuttingDown,
2105 0 : NotFound(_) | InProgress | MapState(_) => Error::DetachReparent(e.into()),
2106 : }
2107 0 : })?;
2108 :
2109 0 : let tenant = {
2110 0 : let old_slot = slot_guard
2111 0 : .get_old_value()
2112 0 : .as_ref()
2113 0 : .expect("requested MustExist");
2114 :
2115 0 : let Some(tenant) = old_slot.get_attached() else {
2116 0 : return Err(Error::DetachReparent(anyhow::anyhow!(
2117 0 : "Tenant is not in attached state"
2118 0 : )));
2119 : };
2120 :
2121 0 : if !tenant.is_active() {
2122 0 : return Err(Error::DetachReparent(anyhow::anyhow!(
2123 0 : "Tenant is not active"
2124 0 : )));
2125 0 : }
2126 :
2127 0 : tenant.clone()
2128 : };
2129 :
2130 0 : let timeline = tenant
2131 0 : .get_timeline(timeline_id, true)
2132 0 : .map_err(Error::NotFound)?;
2133 :
2134 0 : let resp = timeline
2135 0 : .detach_from_ancestor_and_reparent(
2136 0 : &tenant,
2137 0 : prepared,
2138 0 : attempt.ancestor_timeline_id,
2139 0 : attempt.ancestor_lsn,
2140 0 : behavior,
2141 0 : ctx,
2142 0 : )
2143 0 : .await?;
2144 :
2145 0 : let mut slot_guard = slot_guard;
2146 :
2147 0 : let tenant = if resp.reset_tenant_required() {
2148 0 : attempt.before_reset_tenant();
2149 :
2150 0 : let (_guard, progress) = utils::completion::channel();
2151 0 : match tenant.shutdown(progress, ShutdownMode::Reload).await {
2152 0 : Ok(()) => {
2153 0 : slot_guard.drop_old_value().expect("it was just shutdown");
2154 0 : }
2155 0 : Err(_barrier) => {
2156 0 : slot_guard.revert();
2157 : // this really should not happen, at all, unless a shutdown without acquiring
2158 : // tenant slot was already going? regardless, on restart the attempt tracking
2159 : // will reset to retryable.
2160 0 : return Err(Error::ShuttingDown);
2161 : }
2162 : }
2163 :
2164 0 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
2165 0 : let config = TenantShard::load_tenant_config(self.conf, &tenant_shard_id)
2166 0 : .map_err(|e| Error::DetachReparent(e.into()))?;
2167 :
2168 0 : let shard_identity = config.shard;
2169 0 : let tenant = tenant_spawn(
2170 0 : self.conf,
2171 0 : tenant_shard_id,
2172 0 : &tenant_path,
2173 0 : self.resources.clone(),
2174 0 : AttachedTenantConf::try_from(self.conf, config).map_err(Error::DetachReparent)?,
2175 0 : shard_identity,
2176 0 : None,
2177 0 : SpawnMode::Eager,
2178 0 : ctx,
2179 : )
2180 0 : .map_err(|_| Error::ShuttingDown)?;
2181 :
2182 : {
2183 0 : let mut g = tenant.ongoing_timeline_detach.lock().unwrap();
2184 0 : assert!(
2185 0 : g.is_none(),
2186 0 : "there cannot be any new timeline detach ancestor on newly created tenant"
2187 : );
2188 0 : *g = Some((attempt.timeline_id, attempt.new_barrier()));
2189 : }
2190 :
2191 : // if we bail out here, we will not allow a new attempt, which should be fine.
2192 : // pageserver should be shutting down regardless? tenant_reset would help, unless it
2193 : // runs into the same problem.
2194 0 : slot_guard
2195 0 : .upsert(TenantSlot::Attached(tenant.clone()))
2196 0 : .map_err(|e| match e {
2197 0 : TenantSlotUpsertError::ShuttingDown(_) => Error::ShuttingDown,
2198 0 : other => Error::DetachReparent(other.into()),
2199 0 : })?;
2200 0 : tenant
2201 : } else {
2202 0 : tracing::info!("skipping tenant_reset as no changes made required it");
2203 0 : tenant
2204 : };
2205 :
2206 0 : if let Some(reparented) = resp.completed() {
2207 : // finally ask the restarted tenant to complete the detach
2208 : //
2209 : // rationale for 9999s: we don't really have a timetable here; if retried, the caller
2210 : // will get an 503.
2211 0 : tenant
2212 0 : .wait_to_become_active(std::time::Duration::from_secs(9999))
2213 0 : .await
2214 0 : .map_err(|e| {
2215 : use GetActiveTenantError::{Cancelled, WillNotBecomeActive};
2216 : use pageserver_api::models::TenantState;
2217 0 : match e {
2218 : Cancelled | WillNotBecomeActive(TenantState::Stopping { .. }) => {
2219 0 : Error::ShuttingDown
2220 : }
2221 0 : other => Error::Complete(other.into()),
2222 : }
2223 0 : })?;
2224 :
2225 0 : utils::pausable_failpoint!(
2226 : "timeline-detach-ancestor::after_activating_before_finding-pausable"
2227 : );
2228 :
2229 0 : let timeline = tenant
2230 0 : .get_timeline(attempt.timeline_id, true)
2231 0 : .map_err(Error::NotFound)?;
2232 :
2233 0 : timeline
2234 0 : .complete_detaching_timeline_ancestor(&tenant, attempt, ctx)
2235 0 : .await
2236 0 : .map(|()| reparented)
2237 : } else {
2238 : // at least the latest versions have now been downloaded and refreshed; be ready to
2239 : // retry another time.
2240 0 : Err(Error::FailedToReparentAll)
2241 : }
2242 0 : }
2243 :
2244 : /// A page service client sends a TenantId, and to look up the correct Tenant we must
2245 : /// resolve this to a fully qualified TenantShardId.
2246 : ///
2247 : /// During shard splits: we shall see parent shards in InProgress state and skip them, and
2248 : /// instead match on child shards which should appear in Attached state. Very early in a shard
2249 : /// split, or in other cases where a shard is InProgress, we will return our own InProgress result
2250 : /// to instruct the caller to wait for that to finish before querying again.
2251 0 : pub(crate) fn resolve_attached_shard(
2252 0 : &self,
2253 0 : tenant_id: &TenantId,
2254 0 : selector: ShardSelector,
2255 0 : ) -> ShardResolveResult {
2256 0 : let tenants = self.tenants.read().unwrap();
2257 0 : let mut want_shard: Option<ShardIndex> = None;
2258 0 : let mut any_in_progress = None;
2259 :
2260 0 : match &*tenants {
2261 0 : TenantsMap::Initializing => ShardResolveResult::NotFound,
2262 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
2263 0 : for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
2264 : // Ignore all slots that don't contain an attached tenant
2265 0 : let tenant = match &slot.1 {
2266 0 : TenantSlot::Attached(t) => t,
2267 0 : TenantSlot::InProgress(barrier) => {
2268 : // We might still find a usable shard, but in case we don't, remember that
2269 : // we saw at least one InProgress slot, so that we can distinguish this case
2270 : // from a simple NotFound in our return value.
2271 0 : any_in_progress = Some(barrier.clone());
2272 0 : continue;
2273 : }
2274 0 : _ => continue,
2275 : };
2276 :
2277 0 : match selector {
2278 0 : ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
2279 0 : return ShardResolveResult::Found(tenant.clone());
2280 : }
2281 0 : ShardSelector::Page(key) => {
2282 : // Each time we find an attached slot with a different shard count,
2283 : // recompute the expected shard number: during shard splits we might
2284 : // have multiple shards with the old shard count.
2285 0 : if want_shard.is_none()
2286 0 : || want_shard.unwrap().shard_count != tenant.shard_identity.count
2287 0 : {
2288 0 : want_shard = Some(ShardIndex {
2289 0 : shard_number: tenant.shard_identity.get_shard_number(&key),
2290 0 : shard_count: tenant.shard_identity.count,
2291 0 : });
2292 0 : }
2293 :
2294 0 : if Some(ShardIndex {
2295 0 : shard_number: tenant.shard_identity.number,
2296 0 : shard_count: tenant.shard_identity.count,
2297 0 : }) == want_shard
2298 : {
2299 0 : return ShardResolveResult::Found(tenant.clone());
2300 0 : }
2301 : }
2302 0 : ShardSelector::Known(shard)
2303 0 : if tenant.shard_identity.shard_index() == shard =>
2304 : {
2305 0 : return ShardResolveResult::Found(tenant.clone());
2306 : }
2307 0 : _ => continue,
2308 : }
2309 : }
2310 :
2311 : // Fall through: we didn't find a slot that was in Attached state & matched our selector. If
2312 : // we found one or more InProgress slot, indicate to caller that they should retry later. Otherwise
2313 : // this requested shard simply isn't found.
2314 0 : if let Some(barrier) = any_in_progress {
2315 0 : ShardResolveResult::InProgress(barrier)
2316 : } else {
2317 0 : ShardResolveResult::NotFound
2318 : }
2319 : }
2320 : }
2321 0 : }
2322 :
2323 : /// Calculate the tenant shards' contributions to this pageserver's utilization metrics. The
2324 : /// returned values are:
2325 : /// - the number of bytes of local disk space this pageserver's shards are requesting, i.e.
2326 : /// how much space they would use if not impacted by disk usage eviction.
2327 : /// - the number of tenant shards currently on this pageserver, including attached
2328 : /// and secondary.
2329 : ///
2330 : /// This function is quite expensive: callers are expected to cache the result and
2331 : /// limit how often they call it.
2332 0 : pub(crate) fn calculate_utilization(&self) -> Result<(u64, u32), TenantMapListError> {
2333 0 : let tenants = self.tenants.read().unwrap();
2334 0 : let m = match &*tenants {
2335 0 : TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
2336 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
2337 : };
2338 0 : let shard_count = m.len();
2339 0 : let mut wanted_bytes = 0;
2340 :
2341 0 : for tenant_slot in m.values() {
2342 0 : match tenant_slot {
2343 0 : TenantSlot::InProgress(_barrier) => {
2344 : // While a slot is being changed, we can't know how much storage it wants. This
2345 : // means this function's output can fluctuate if a lot of changes are going on
2346 : // (such as transitions from secondary to attached).
2347 : //
2348 : // We could wait for the barrier and retry, but it's important that the utilization
2349 : // API is responsive, and the data quality impact is not very significant.
2350 0 : continue;
2351 : }
2352 0 : TenantSlot::Attached(tenant) => {
2353 0 : wanted_bytes += tenant.local_storage_wanted();
2354 0 : }
2355 0 : TenantSlot::Secondary(secondary) => {
2356 0 : let progress = secondary.progress.lock().unwrap();
2357 0 : wanted_bytes += if progress.heatmap_mtime.is_some() {
2358 : // If we have heatmap info, then we will 'want' the sum
2359 : // of the size of layers in the heatmap: this is how much space
2360 : // we would use if not doing any eviction.
2361 0 : progress.bytes_total
2362 : } else {
2363 : // In the absence of heatmap info, assume that the secondary location simply
2364 : // needs as much space as it is currently using.
2365 0 : secondary.resident_size_metric.get()
2366 : }
2367 : }
2368 : }
2369 : }
2370 :
2371 0 : Ok((wanted_bytes, shard_count as u32))
2372 0 : }
2373 :
2374 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))]
2375 : pub(crate) async fn immediate_gc(
2376 : &self,
2377 : tenant_shard_id: TenantShardId,
2378 : timeline_id: TimelineId,
2379 : gc_req: TimelineGcRequest,
2380 : cancel: CancellationToken,
2381 : ctx: &RequestContext,
2382 : ) -> Result<GcResult, ApiError> {
2383 : let tenant = {
2384 : let guard = self.tenants.read().unwrap();
2385 : guard
2386 : .get(&tenant_shard_id)
2387 : .cloned()
2388 0 : .with_context(|| format!("tenant {tenant_shard_id}"))
2389 0 : .map_err(|e| ApiError::NotFound(e.into()))?
2390 : };
2391 :
2392 0 : let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
2393 : // Use tenant's pitr setting
2394 : let pitr = tenant.get_pitr_interval();
2395 :
2396 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
2397 :
2398 : // Run in task_mgr to avoid race with tenant_detach operation
2399 : let ctx: RequestContext =
2400 : ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
2401 :
2402 : let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
2403 :
2404 : fail::fail_point!("immediate_gc_task_pre");
2405 :
2406 : #[allow(unused_mut)]
2407 : let mut result = tenant
2408 : .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
2409 : .await;
2410 : // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
2411 : // better once the types support it.
2412 :
2413 : #[cfg(feature = "testing")]
2414 : {
2415 : // we need to synchronize with drop completion for python tests without polling for
2416 : // log messages
2417 : if let Ok(result) = result.as_mut() {
2418 : let mut js = tokio::task::JoinSet::new();
2419 : for layer in std::mem::take(&mut result.doomed_layers) {
2420 : js.spawn(layer.wait_drop());
2421 : }
2422 : tracing::info!(
2423 : total = js.len(),
2424 : "starting to wait for the gc'd layers to be dropped"
2425 : );
2426 : while let Some(res) = js.join_next().await {
2427 : res.expect("wait_drop should not panic");
2428 : }
2429 : }
2430 :
2431 : let timeline = tenant.get_timeline(timeline_id, false).ok();
2432 0 : let rtc = timeline.as_ref().map(|x| &x.remote_client);
2433 :
2434 : if let Some(rtc) = rtc {
2435 : // layer drops schedule actions on remote timeline client to actually do the
2436 : // deletions; don't care about the shutdown error, just exit fast
2437 : drop(rtc.wait_completion().await);
2438 : }
2439 : }
2440 :
2441 0 : result.map_err(|e| match e {
2442 0 : GcError::TenantCancelled | GcError::TimelineCancelled => ApiError::ShuttingDown,
2443 : GcError::TimelineNotFound => {
2444 0 : ApiError::NotFound(anyhow::anyhow!("Timeline not found").into())
2445 : }
2446 0 : other => ApiError::InternalServerError(anyhow::anyhow!(other)),
2447 0 : })
2448 : }
2449 :
2450 : /// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
2451 : /// Allows to remove other tenant resources manually, via `tenant_cleanup`.
2452 : /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
2453 1 : async fn remove_tenant_from_memory<V, F>(
2454 1 : &self,
2455 1 : tenant_shard_id: TenantShardId,
2456 1 : tenant_cleanup: F,
2457 1 : ) -> Result<V, TenantStateError>
2458 1 : where
2459 1 : F: std::future::Future<Output = anyhow::Result<V>>,
2460 1 : {
2461 1 : let mut slot_guard =
2462 1 : self.tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
2463 :
2464 : // allow pageserver shutdown to await for our completion
2465 1 : let (_guard, progress) = completion::channel();
2466 :
2467 : // The SlotGuard allows us to manipulate the Tenant object without fear of some
2468 : // concurrent API request doing something else for the same tenant ID.
2469 1 : let attached_tenant = match slot_guard.get_old_value() {
2470 1 : Some(TenantSlot::Attached(tenant)) => {
2471 : // whenever we remove a tenant from memory, we don't want to flush and wait for upload
2472 1 : let shutdown_mode = ShutdownMode::Hard;
2473 :
2474 : // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
2475 : // that we can continue safely to cleanup.
2476 1 : match tenant.shutdown(progress, shutdown_mode).await {
2477 1 : Ok(()) => {}
2478 0 : Err(_other) => {
2479 : // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
2480 : // wait for it but return an error right away because these are distinct requests.
2481 0 : slot_guard.revert();
2482 0 : return Err(TenantStateError::IsStopping(tenant_shard_id));
2483 : }
2484 : }
2485 1 : Some(tenant)
2486 : }
2487 0 : Some(TenantSlot::Secondary(secondary_state)) => {
2488 0 : tracing::info!("Shutting down in secondary mode");
2489 0 : secondary_state.shutdown().await;
2490 0 : None
2491 : }
2492 : Some(TenantSlot::InProgress(_)) => {
2493 : // Acquiring a slot guarantees its old value was not InProgress
2494 0 : unreachable!();
2495 : }
2496 0 : None => None,
2497 : };
2498 :
2499 1 : match tenant_cleanup
2500 1 : .await
2501 1 : .with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
2502 : {
2503 1 : Ok(hook_value) => {
2504 : // Success: drop the old TenantSlot::Attached.
2505 1 : slot_guard
2506 1 : .drop_old_value()
2507 1 : .expect("We just called shutdown");
2508 :
2509 1 : Ok(hook_value)
2510 : }
2511 0 : Err(e) => {
2512 : // If we had a Tenant, set it to Broken and put it back in the TenantsMap
2513 0 : if let Some(attached_tenant) = attached_tenant {
2514 0 : attached_tenant.set_broken(e.to_string()).await;
2515 0 : }
2516 : // Leave the broken tenant in the map
2517 0 : slot_guard.revert();
2518 :
2519 0 : Err(TenantStateError::Other(e))
2520 : }
2521 : }
2522 1 : }
2523 : }
2524 :
2525 : #[derive(Debug, thiserror::Error)]
2526 : pub(crate) enum GetTenantError {
2527 : /// NotFound is a TenantId rather than TenantShardId, because this error type is used from
2528 : /// getters that use a TenantId and a ShardSelector, not just getters that target a specific shard.
2529 : #[error("Tenant {0} not found")]
2530 : NotFound(TenantId),
2531 :
2532 : #[error("Tenant {0} not found")]
2533 : ShardNotFound(TenantShardId),
2534 :
2535 : #[error("Tenant {0} is not active")]
2536 : NotActive(TenantShardId),
2537 :
2538 : // Initializing or shutting down: cannot authoritatively say whether we have this tenant
2539 : #[error("Tenant map is not available: {0}")]
2540 : MapState(#[from] TenantMapError),
2541 : }
2542 :
2543 : #[derive(thiserror::Error, Debug)]
2544 : pub(crate) enum GetActiveTenantError {
2545 : /// We may time out either while TenantSlot is InProgress, or while the Tenant
2546 : /// is in a non-Active state
2547 : #[error(
2548 : "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
2549 : )]
2550 : WaitForActiveTimeout {
2551 : latest_state: Option<TenantState>,
2552 : wait_time: Duration,
2553 : },
2554 :
2555 : /// The TenantSlot is absent, or in secondary mode
2556 : #[error(transparent)]
2557 : NotFound(#[from] GetTenantError),
2558 :
2559 : /// Cancellation token fired while we were waiting
2560 : #[error("cancelled")]
2561 : Cancelled,
2562 :
2563 : /// Tenant exists, but is in a state that cannot become active (e.g. Stopping, Broken)
2564 : #[error("will not become active. Current state: {0}")]
2565 : WillNotBecomeActive(TenantState),
2566 :
2567 : /// Broken is logically a subset of WillNotBecomeActive, but a distinct error is useful as
2568 : /// WillNotBecomeActive is a permitted error under some circumstances, whereas broken should
2569 : /// never happen.
2570 : #[error("Tenant is broken: {0}")]
2571 : Broken(String),
2572 :
2573 : #[error("reconnect to switch tenant id")]
2574 : SwitchedTenant,
2575 : }
2576 :
2577 : #[derive(Debug, thiserror::Error)]
2578 : pub(crate) enum DeleteTimelineError {
2579 : #[error("Tenant {0}")]
2580 : Tenant(#[from] GetTenantError),
2581 :
2582 : #[error("Timeline {0}")]
2583 : Timeline(#[from] crate::tenant::DeleteTimelineError),
2584 : }
2585 :
2586 : #[derive(Debug, thiserror::Error)]
2587 : pub(crate) enum TenantStateError {
2588 : #[error("Tenant {0} is stopping")]
2589 : IsStopping(TenantShardId),
2590 : #[error(transparent)]
2591 : SlotError(#[from] TenantSlotError),
2592 : #[error(transparent)]
2593 : SlotUpsertError(#[from] TenantSlotUpsertError),
2594 : #[error(transparent)]
2595 : Other(#[from] anyhow::Error),
2596 : }
2597 :
2598 : #[derive(Debug, thiserror::Error)]
2599 : pub(crate) enum TenantMapListError {
2600 : #[error("tenant map is still initiailizing")]
2601 : Initializing,
2602 : }
2603 :
2604 : #[derive(Debug, thiserror::Error)]
2605 : pub(crate) enum TenantMapInsertError {
2606 : #[error(transparent)]
2607 : SlotError(#[from] TenantSlotError),
2608 : #[error(transparent)]
2609 : SlotUpsertError(#[from] TenantSlotUpsertError),
2610 : #[error(transparent)]
2611 : Other(#[from] anyhow::Error),
2612 : }
2613 :
2614 : /// Superset of TenantMapError: issues that can occur when acquiring a slot
2615 : /// for a particular tenant ID.
2616 : #[derive(Debug, thiserror::Error)]
2617 : pub(crate) enum TenantSlotError {
2618 : /// When acquiring a slot with the expectation that the tenant already exists.
2619 : #[error("Tenant {0} not found")]
2620 : NotFound(TenantShardId),
2621 :
2622 : // Tried to read a slot that is currently being mutated by another administrative
2623 : // operation.
2624 : #[error("tenant has a state change in progress, try again later")]
2625 : InProgress,
2626 :
2627 : #[error(transparent)]
2628 : MapState(#[from] TenantMapError),
2629 : }
2630 :
2631 : /// Superset of TenantMapError: issues that can occur when using a SlotGuard
2632 : /// to insert a new value.
2633 : #[derive(thiserror::Error)]
2634 : pub(crate) enum TenantSlotUpsertError {
2635 : /// An error where the slot is in an unexpected state, indicating a code bug
2636 : #[error("Internal error updating Tenant")]
2637 : InternalError(Cow<'static, str>),
2638 :
2639 : #[error(transparent)]
2640 : MapState(TenantMapError),
2641 :
2642 : // If we encounter TenantManager shutdown during upsert, we must carry the Completion
2643 : // from the SlotGuard, so that the caller can hold it while they clean up: otherwise
2644 : // TenantManager shutdown might race ahead before we're done cleaning up any Tenant that
2645 : // was protected by the SlotGuard.
2646 : #[error("Shutting down")]
2647 : ShuttingDown((TenantSlot, utils::completion::Completion)),
2648 : }
2649 :
2650 : impl std::fmt::Debug for TenantSlotUpsertError {
2651 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2652 0 : match self {
2653 0 : Self::InternalError(reason) => write!(f, "Internal Error {reason}"),
2654 0 : Self::MapState(map_error) => write!(f, "Tenant map state: {map_error:?}"),
2655 0 : Self::ShuttingDown(_completion) => write!(f, "Tenant map shutting down"),
2656 : }
2657 0 : }
2658 : }
2659 :
2660 : #[derive(Debug, thiserror::Error)]
2661 : enum TenantSlotDropError {
2662 : /// It is only legal to drop a TenantSlot if its contents are fully shut down
2663 : #[error("Tenant was not shut down")]
2664 : NotShutdown,
2665 : }
2666 :
2667 : /// Errors that can happen any time we are walking the tenant map to try and acquire
2668 : /// the TenantSlot for a particular tenant.
2669 : #[derive(Debug, thiserror::Error)]
2670 : pub(crate) enum TenantMapError {
2671 : // Tried to read while initializing
2672 : #[error("tenant map is still initializing")]
2673 : StillInitializing,
2674 :
2675 : // Tried to read while shutting down
2676 : #[error("tenant map is shutting down")]
2677 : ShuttingDown,
2678 : }
2679 :
2680 : /// Guards a particular tenant_id's content in the TenantsMap.
2681 : ///
2682 : /// While this structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
2683 : /// for this tenant, which acts as a marker for any operations targeting
2684 : /// this tenant to retry later, or wait for the InProgress state to end.
2685 : ///
2686 : /// This structure enforces the important invariant that we do not have overlapping
2687 : /// tasks that will try to use local storage for a the same tenant ID: we enforce that
2688 : /// the previous contents of a slot have been shut down before the slot can be
2689 : /// left empty or used for something else
2690 : ///
2691 : /// Holders of a SlotGuard should explicitly dispose of it, using either `upsert`
2692 : /// to provide a new value, or `revert` to put the slot back into its initial
2693 : /// state. If the SlotGuard is dropped without calling either of these, then
2694 : /// we will leave the slot empty if our `old_value` is already shut down, else
2695 : /// we will replace the slot with `old_value` (equivalent to doing a revert).
2696 : ///
2697 : /// The `old_value` may be dropped before the SlotGuard is dropped, by calling
2698 : /// `drop_old_value`. It is an error to call this without shutting down
2699 : /// the conents of `old_value`.
2700 : pub(crate) struct SlotGuard<'a> {
2701 : tenant_shard_id: TenantShardId,
2702 : old_value: Option<TenantSlot>,
2703 : upserted: bool,
2704 :
2705 : /// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will
2706 : /// release any waiters as soon as this SlotGuard is dropped.
2707 : completion: utils::completion::Completion,
2708 :
2709 : tenants: &'a std::sync::RwLock<TenantsMap>,
2710 : }
2711 :
2712 : impl<'a> SlotGuard<'a> {
2713 1 : fn new(
2714 1 : tenant_shard_id: TenantShardId,
2715 1 : old_value: Option<TenantSlot>,
2716 1 : completion: utils::completion::Completion,
2717 1 : tenants: &'a std::sync::RwLock<TenantsMap>,
2718 1 : ) -> Self {
2719 1 : Self {
2720 1 : tenant_shard_id,
2721 1 : old_value,
2722 1 : upserted: false,
2723 1 : completion,
2724 1 : tenants,
2725 1 : }
2726 1 : }
2727 :
2728 : /// Get any value that was present in the slot before we acquired ownership
2729 : /// of it: in state transitions, this will be the old state.
2730 : ///
2731 : // FIXME: get_ prefix
2732 : // FIXME: this should be .as_ref() -- unsure why no clippy
2733 1 : fn get_old_value(&self) -> &Option<TenantSlot> {
2734 1 : &self.old_value
2735 1 : }
2736 :
2737 : /// Emplace a new value in the slot. This consumes the guard, and after
2738 : /// returning, the slot is no longer protected from concurrent changes.
2739 0 : fn upsert(mut self, new_value: TenantSlot) -> Result<(), TenantSlotUpsertError> {
2740 0 : if !self.old_value_is_shutdown() {
2741 : // This is a bug: callers should never try to drop an old value without
2742 : // shutting it down
2743 0 : return Err(TenantSlotUpsertError::InternalError(
2744 0 : "Old TenantSlot value not shut down".into(),
2745 0 : ));
2746 0 : }
2747 :
2748 0 : let replaced: Option<TenantSlot> = {
2749 0 : let mut locked = self.tenants.write().unwrap();
2750 :
2751 0 : if let TenantSlot::InProgress(_) = new_value {
2752 : // It is never expected to try and upsert InProgress via this path: it should
2753 : // only be written via the tenant_map_acquire_slot path. If we hit this it's a bug.
2754 0 : return Err(TenantSlotUpsertError::InternalError(
2755 0 : "Attempt to upsert an InProgress state".into(),
2756 0 : ));
2757 0 : }
2758 :
2759 0 : let m = match &mut *locked {
2760 : TenantsMap::Initializing => {
2761 0 : return Err(TenantSlotUpsertError::MapState(
2762 0 : TenantMapError::StillInitializing,
2763 0 : ));
2764 : }
2765 : TenantsMap::ShuttingDown(_) => {
2766 0 : return Err(TenantSlotUpsertError::ShuttingDown((
2767 0 : new_value,
2768 0 : self.completion.clone(),
2769 0 : )));
2770 : }
2771 0 : TenantsMap::Open(m) => m,
2772 : };
2773 :
2774 0 : METRICS.slot_inserted(&new_value);
2775 :
2776 0 : let replaced = m.insert(self.tenant_shard_id, new_value);
2777 0 : self.upserted = true;
2778 0 : if let Some(replaced) = replaced.as_ref() {
2779 0 : METRICS.slot_removed(replaced);
2780 0 : }
2781 :
2782 0 : replaced
2783 : };
2784 :
2785 : // Sanity check: on an upsert we should always be replacing an InProgress marker
2786 0 : match replaced {
2787 : Some(TenantSlot::InProgress(_)) => {
2788 : // Expected case: we find our InProgress in the map: nothing should have
2789 : // replaced it because the code that acquires slots will not grant another
2790 : // one for the same TenantId.
2791 0 : Ok(())
2792 : }
2793 : None => {
2794 0 : METRICS.unexpected_errors.inc();
2795 0 : error!(
2796 : tenant_shard_id = %self.tenant_shard_id,
2797 0 : "Missing InProgress marker during tenant upsert, this is a bug."
2798 : );
2799 0 : Err(TenantSlotUpsertError::InternalError(
2800 0 : "Missing InProgress marker during tenant upsert".into(),
2801 0 : ))
2802 : }
2803 0 : Some(slot) => {
2804 0 : METRICS.unexpected_errors.inc();
2805 0 : error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot);
2806 0 : Err(TenantSlotUpsertError::InternalError(
2807 0 : "Unexpected contents of TenantSlot".into(),
2808 0 : ))
2809 : }
2810 : }
2811 0 : }
2812 :
2813 : /// Replace the InProgress slot with whatever was in the guard when we started
2814 0 : fn revert(mut self) {
2815 0 : if let Some(value) = self.old_value.take() {
2816 0 : match self.upsert(value) {
2817 0 : Err(TenantSlotUpsertError::InternalError(_)) => {
2818 0 : // We already logged the error, nothing else we can do.
2819 0 : }
2820 : Err(
2821 : TenantSlotUpsertError::MapState(_) | TenantSlotUpsertError::ShuttingDown(_),
2822 0 : ) => {
2823 0 : // If the map is shutting down, we need not replace anything
2824 0 : }
2825 0 : Ok(()) => {}
2826 : }
2827 0 : }
2828 0 : }
2829 :
2830 : /// We may never drop our old value until it is cleanly shut down: otherwise we might leave
2831 : /// rogue background tasks that would write to the local tenant directory that this guard
2832 : /// is responsible for protecting
2833 1 : fn old_value_is_shutdown(&self) -> bool {
2834 1 : match self.old_value.as_ref() {
2835 1 : Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(),
2836 0 : Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(),
2837 : Some(TenantSlot::InProgress(_)) => {
2838 : // A SlotGuard cannot be constructed for a slot that was already InProgress
2839 0 : unreachable!()
2840 : }
2841 0 : None => true,
2842 : }
2843 1 : }
2844 :
2845 : /// The guard holder is done with the old value of the slot: they are obliged to already
2846 : /// shut it down before we reach this point.
2847 1 : fn drop_old_value(&mut self) -> Result<(), TenantSlotDropError> {
2848 1 : if !self.old_value_is_shutdown() {
2849 0 : Err(TenantSlotDropError::NotShutdown)
2850 : } else {
2851 1 : self.old_value.take();
2852 1 : Ok(())
2853 : }
2854 1 : }
2855 : }
2856 :
2857 : impl<'a> Drop for SlotGuard<'a> {
2858 1 : fn drop(&mut self) {
2859 1 : if self.upserted {
2860 0 : return;
2861 1 : }
2862 : // Our old value is already shutdown, or it never existed: it is safe
2863 : // for us to fully release the TenantSlot back into an empty state
2864 :
2865 1 : let mut locked = self.tenants.write().unwrap();
2866 :
2867 1 : let m = match &mut *locked {
2868 : TenantsMap::Initializing => {
2869 : // There is no map, this should never happen.
2870 0 : return;
2871 : }
2872 : TenantsMap::ShuttingDown(_) => {
2873 : // When we transition to shutdown, InProgress elements are removed
2874 : // from the map, so we do not need to clean up our Inprogress marker.
2875 : // See [`shutdown_all_tenants0`]
2876 1 : return;
2877 : }
2878 0 : TenantsMap::Open(m) => m,
2879 : };
2880 :
2881 : use std::collections::btree_map::Entry;
2882 0 : match m.entry(self.tenant_shard_id) {
2883 0 : Entry::Occupied(mut entry) => {
2884 0 : if !matches!(entry.get(), TenantSlot::InProgress(_)) {
2885 0 : METRICS.unexpected_errors.inc();
2886 0 : error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get());
2887 0 : }
2888 :
2889 0 : if self.old_value_is_shutdown() {
2890 0 : METRICS.slot_removed(entry.get());
2891 0 : entry.remove();
2892 0 : } else {
2893 0 : let inserting = self.old_value.take().unwrap();
2894 0 : METRICS.slot_inserted(&inserting);
2895 0 : let replaced = entry.insert(inserting);
2896 0 : METRICS.slot_removed(&replaced);
2897 0 : }
2898 : }
2899 : Entry::Vacant(_) => {
2900 0 : METRICS.unexpected_errors.inc();
2901 0 : error!(
2902 : tenant_shard_id = %self.tenant_shard_id,
2903 0 : "Missing InProgress marker during SlotGuard drop, this is a bug."
2904 : );
2905 : }
2906 : }
2907 1 : }
2908 : }
2909 :
2910 : enum TenantSlotPeekMode {
2911 : /// In Read mode, peek will be permitted to see the slots even if the pageserver is shutting down
2912 : Read,
2913 : /// In Write mode, trying to peek at a slot while the pageserver is shutting down is an error
2914 : Write,
2915 : }
2916 :
2917 0 : fn tenant_map_peek_slot<'a>(
2918 0 : tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
2919 0 : tenant_shard_id: &TenantShardId,
2920 0 : mode: TenantSlotPeekMode,
2921 0 : ) -> Result<Option<&'a TenantSlot>, TenantMapError> {
2922 0 : match tenants.deref() {
2923 0 : TenantsMap::Initializing => Err(TenantMapError::StillInitializing),
2924 0 : TenantsMap::ShuttingDown(m) => match mode {
2925 : TenantSlotPeekMode::Read => Ok(Some(
2926 : // When reading in ShuttingDown state, we must translate None results
2927 : // into a ShuttingDown error, because absence of a tenant shard ID in the map
2928 : // isn't a reliable indicator of the tenant being gone: it might have been
2929 : // InProgress when shutdown started, and cleaned up from that state such
2930 : // that it's now no longer in the map. Callers will have to wait until
2931 : // we next start up to get a proper answer. This avoids incorrect 404 API responses.
2932 0 : m.get(tenant_shard_id).ok_or(TenantMapError::ShuttingDown)?,
2933 : )),
2934 0 : TenantSlotPeekMode::Write => Err(TenantMapError::ShuttingDown),
2935 : },
2936 0 : TenantsMap::Open(m) => Ok(m.get(tenant_shard_id)),
2937 : }
2938 0 : }
2939 :
2940 : enum TenantSlotAcquireMode {
2941 : /// Acquire the slot irrespective of current state, or whether it already exists
2942 : Any,
2943 : /// Return an error if trying to acquire a slot and it doesn't already exist
2944 : MustExist,
2945 : }
2946 :
2947 : use http_utils::error::ApiError;
2948 : use pageserver_api::models::TimelineGcRequest;
2949 :
2950 : use crate::tenant::gc_result::GcResult;
2951 :
2952 : #[cfg(test)]
2953 : mod tests {
2954 : use std::collections::BTreeMap;
2955 : use std::sync::Arc;
2956 :
2957 : use camino::Utf8PathBuf;
2958 : use storage_broker::BrokerClientChannel;
2959 : use tracing::Instrument;
2960 :
2961 : use super::super::harness::TenantHarness;
2962 : use super::TenantsMap;
2963 : use crate::{
2964 : basebackup_cache::BasebackupCache,
2965 : tenant::{
2966 : TenantSharedResources,
2967 : mgr::{BackgroundPurges, TenantManager, TenantSlot},
2968 : },
2969 : };
2970 :
2971 : #[tokio::test(start_paused = true)]
2972 1 : async fn shutdown_awaits_in_progress_tenant() {
2973 : // Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully
2974 : // wait for it to complete before proceeding.
2975 :
2976 1 : let h = TenantHarness::create("shutdown_awaits_in_progress_tenant")
2977 1 : .await
2978 1 : .unwrap();
2979 1 : let (t, _ctx) = h.load().await;
2980 :
2981 : // harness loads it to active, which is forced and nothing is running on the tenant
2982 :
2983 1 : let id = t.tenant_shard_id();
2984 :
2985 : // tenant harness configures the logging and we cannot escape it
2986 1 : let span = h.span();
2987 1 : let _e = span.enter();
2988 :
2989 1 : let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
2990 :
2991 : // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
2992 : // permit it to proceed: that will stick the tenant in InProgress
2993 :
2994 1 : let (basebackup_cache, _) = BasebackupCache::new(Utf8PathBuf::new(), None);
2995 :
2996 1 : let tenant_manager = TenantManager {
2997 1 : tenants: std::sync::RwLock::new(TenantsMap::Open(tenants)),
2998 1 : conf: h.conf,
2999 : resources: TenantSharedResources {
3000 1 : broker_client: BrokerClientChannel::connect_lazy("foobar.com")
3001 1 : .await
3002 1 : .unwrap(),
3003 1 : remote_storage: h.remote_storage.clone(),
3004 1 : deletion_queue_client: h.deletion_queue.new_client(),
3005 1 : l0_flush_global_state: crate::l0_flush::L0FlushGlobalState::new(
3006 1 : h.conf.l0_flush.clone(),
3007 : ),
3008 1 : basebackup_cache,
3009 1 : feature_resolver: crate::feature_resolver::FeatureResolver::new_disabled(),
3010 : },
3011 1 : cancel: tokio_util::sync::CancellationToken::new(),
3012 1 : background_purges: BackgroundPurges::default(),
3013 : };
3014 :
3015 1 : let tenant_manager = Arc::new(tenant_manager);
3016 :
3017 1 : let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
3018 1 : let (until_cleanup_started, cleanup_started) = utils::completion::channel();
3019 1 : let mut remove_tenant_from_memory_task = {
3020 1 : let tenant_manager = tenant_manager.clone();
3021 1 : let jh = tokio::spawn({
3022 1 : async move {
3023 1 : let cleanup = async move {
3024 1 : drop(until_cleanup_started);
3025 1 : can_complete_cleanup.wait().await;
3026 1 : anyhow::Ok(())
3027 1 : };
3028 1 : tenant_manager.remove_tenant_from_memory(id, cleanup).await
3029 1 : }
3030 1 : .instrument(h.span())
3031 : });
3032 :
3033 : // now the long cleanup should be in place, with the stopping state
3034 1 : cleanup_started.wait().await;
3035 1 : jh
3036 : };
3037 :
3038 1 : let mut shutdown_task = {
3039 1 : let (until_shutdown_started, shutdown_started) = utils::completion::channel();
3040 :
3041 1 : let tenant_manager = tenant_manager.clone();
3042 :
3043 1 : let shutdown_task = tokio::spawn(async move {
3044 1 : drop(until_shutdown_started);
3045 1 : tenant_manager.shutdown_all_tenants0().await;
3046 1 : });
3047 :
3048 1 : shutdown_started.wait().await;
3049 1 : shutdown_task
3050 : };
3051 :
3052 1 : let long_time = std::time::Duration::from_secs(15);
3053 1 : tokio::select! {
3054 1 : _ = &mut shutdown_task => unreachable!("shutdown should block on remove_tenant_from_memory completing"),
3055 1 : _ = &mut remove_tenant_from_memory_task => unreachable!("remove_tenant_from_memory_task should not complete until explicitly unblocked"),
3056 1 : _ = tokio::time::sleep(long_time) => {},
3057 : }
3058 :
3059 1 : drop(until_cleanup_completed);
3060 :
3061 : // Now that we allow it to proceed, shutdown should complete immediately
3062 1 : remove_tenant_from_memory_task.await.unwrap().unwrap();
3063 1 : shutdown_task.await.unwrap();
3064 1 : }
3065 : }
|