Line data Source code
1 : //! This module acts as a switchboard to access different repositories managed by this
2 : //! page server.
3 :
4 : use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
5 : use futures::StreamExt;
6 : use hyper::StatusCode;
7 : use itertools::Itertools;
8 : use pageserver_api::key::Key;
9 : use pageserver_api::models::LocationConfigMode;
10 : use pageserver_api::shard::{
11 : ShardCount, ShardIdentity, ShardIndex, ShardNumber, ShardStripeSize, TenantShardId,
12 : };
13 : use pageserver_api::upcall_api::ReAttachResponseTenant;
14 : use rand::{distributions::Alphanumeric, Rng};
15 : use std::borrow::Cow;
16 : use std::cmp::Ordering;
17 : use std::collections::{BTreeMap, HashMap};
18 : use std::ops::Deref;
19 : use std::sync::Arc;
20 : use std::time::Duration;
21 : use sysinfo::SystemExt;
22 : use tokio::fs;
23 :
24 : use anyhow::Context;
25 : use once_cell::sync::Lazy;
26 : use tokio::task::JoinSet;
27 : use tokio_util::sync::CancellationToken;
28 : use tracing::*;
29 :
30 : use remote_storage::GenericRemoteStorage;
31 : use utils::{completion, crashsafe};
32 :
33 : use crate::config::PageServerConf;
34 : use crate::context::{DownloadBehavior, RequestContext};
35 : use crate::control_plane_client::{
36 : ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError,
37 : };
38 : use crate::deletion_queue::DeletionQueueClient;
39 : use crate::http::routes::ACTIVE_TENANT_TIMEOUT;
40 : use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
41 : use crate::task_mgr::{self, TaskKind};
42 : use crate::tenant::config::{
43 : AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, SecondaryLocationConfig,
44 : };
45 : use crate::tenant::delete::DeleteTenantFlow;
46 : use crate::tenant::span::debug_assert_current_span_has_tenant_id;
47 : use crate::tenant::storage_layer::inmemory_layer;
48 : use crate::tenant::timeline::ShutdownMode;
49 : use crate::tenant::{AttachedTenantConf, GcError, SpawnMode, Tenant, TenantState};
50 : use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
51 :
52 : use utils::crashsafe::path_with_suffix_extension;
53 : use utils::fs_ext::PathExt;
54 : use utils::generation::Generation;
55 : use utils::id::{TenantId, TimelineId};
56 :
57 : use super::delete::DeleteTenantError;
58 : use super::remote_timeline_client::remote_tenant_path;
59 : use super::secondary::SecondaryTenant;
60 : use super::timeline::detach_ancestor::PreparedTimelineDetach;
61 : use super::TenantSharedResources;
62 :
63 : /// For a tenant that appears in TenantsMap, it may either be
64 : /// - `Attached`: has a full Tenant object, is elegible to service
65 : /// reads and ingest WAL.
66 : /// - `Secondary`: is only keeping a local cache warm.
67 : ///
68 : /// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
69 : /// that way we avoid having to carefully switch a tenant's ingestion etc on and off during
70 : /// its lifetime, and we can preserve some important safety invariants like `Tenant` always
71 : /// having a properly acquired generation (Secondary doesn't need a generation)
72 : #[derive(Clone)]
73 : pub(crate) enum TenantSlot {
74 : Attached(Arc<Tenant>),
75 : Secondary(Arc<SecondaryTenant>),
76 : /// In this state, other administrative operations acting on the TenantId should
77 : /// block, or return a retry indicator equivalent to HTTP 503.
78 : InProgress(utils::completion::Barrier),
79 : }
80 :
81 : impl std::fmt::Debug for TenantSlot {
82 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 0 : match self {
84 0 : Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()),
85 0 : Self::Secondary(_) => write!(f, "Secondary"),
86 0 : Self::InProgress(_) => write!(f, "InProgress"),
87 : }
88 0 : }
89 : }
90 :
91 : impl TenantSlot {
92 : /// Return the `Tenant` in this slot if attached, else None
93 0 : fn get_attached(&self) -> Option<&Arc<Tenant>> {
94 0 : match self {
95 0 : Self::Attached(t) => Some(t),
96 0 : Self::Secondary(_) => None,
97 0 : Self::InProgress(_) => None,
98 : }
99 0 : }
100 : }
101 :
102 : /// The tenants known to the pageserver.
103 : /// The enum variants are used to distinguish the different states that the pageserver can be in.
104 : pub(crate) enum TenantsMap {
105 : /// [`init_tenant_mgr`] is not done yet.
106 : Initializing,
107 : /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
108 : /// New tenants can be added using [`tenant_map_acquire_slot`].
109 : Open(BTreeMap<TenantShardId, TenantSlot>),
110 : /// The pageserver has entered shutdown mode via [`TenantManager::shutdown`].
111 : /// Existing tenants are still accessible, but no new tenants can be created.
112 : ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
113 : }
114 :
115 : pub(crate) enum TenantsMapRemoveResult {
116 : Occupied(TenantSlot),
117 : Vacant,
118 : InProgress(utils::completion::Barrier),
119 : }
120 :
121 : /// When resolving a TenantId to a shard, we may be looking for the 0th
122 : /// shard, or we might be looking for whichever shard holds a particular page.
123 : #[derive(Copy, Clone)]
124 : pub(crate) enum ShardSelector {
125 : /// Only return the 0th shard, if it is present. If a non-0th shard is present,
126 : /// ignore it.
127 : Zero,
128 : /// Pick the first shard we find for the TenantId
129 : First,
130 : /// Pick the shard that holds this key
131 : Page(Key),
132 : /// The shard ID is known: pick the given shard
133 : Known(ShardIndex),
134 : }
135 :
136 : /// A convenience for use with the re_attach ControlPlaneClient function: rather
137 : /// than the serializable struct, we build this enum that encapsulates
138 : /// the invariant that attached tenants always have generations.
139 : ///
140 : /// This represents the subset of a LocationConfig that we receive during re-attach.
141 : pub(crate) enum TenantStartupMode {
142 : Attached((AttachmentMode, Generation)),
143 : Secondary,
144 : }
145 :
146 : impl TenantStartupMode {
147 : /// Return the generation & mode that should be used when starting
148 : /// this tenant.
149 : ///
150 : /// If this returns None, the re-attach struct is in an invalid state and
151 : /// should be ignored in the response.
152 0 : fn from_reattach_tenant(rart: ReAttachResponseTenant) -> Option<Self> {
153 0 : match (rart.mode, rart.gen) {
154 0 : (LocationConfigMode::Detached, _) => None,
155 0 : (LocationConfigMode::Secondary, _) => Some(Self::Secondary),
156 0 : (LocationConfigMode::AttachedMulti, Some(g)) => {
157 0 : Some(Self::Attached((AttachmentMode::Multi, Generation::new(g))))
158 : }
159 0 : (LocationConfigMode::AttachedSingle, Some(g)) => {
160 0 : Some(Self::Attached((AttachmentMode::Single, Generation::new(g))))
161 : }
162 0 : (LocationConfigMode::AttachedStale, Some(g)) => {
163 0 : Some(Self::Attached((AttachmentMode::Stale, Generation::new(g))))
164 : }
165 : _ => {
166 0 : tracing::warn!(
167 0 : "Received invalid re-attach state for tenant {}: {rart:?}",
168 : rart.id
169 : );
170 0 : None
171 : }
172 : }
173 0 : }
174 : }
175 :
176 : /// Result type for looking up a TenantId to a specific shard
177 : pub(crate) enum ShardResolveResult {
178 : NotFound,
179 : Found(Arc<Tenant>),
180 : // Wait for this barrrier, then query again
181 : InProgress(utils::completion::Barrier),
182 : }
183 :
184 : impl TenantsMap {
185 : /// Convenience function for typical usage, where we want to get a `Tenant` object, for
186 : /// working with attached tenants. If the TenantId is in the map but in Secondary state,
187 : /// None is returned.
188 0 : pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc<Tenant>> {
189 0 : match self {
190 0 : TenantsMap::Initializing => None,
191 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
192 0 : m.get(tenant_shard_id).and_then(|slot| slot.get_attached())
193 : }
194 : }
195 0 : }
196 :
197 : /// Only for use from DeleteTenantFlow. This method directly removes a TenantSlot from the map.
198 : ///
199 : /// The normal way to remove a tenant is using a SlotGuard, which will gracefully remove the guarded
200 : /// slot if the enclosed tenant is shutdown.
201 0 : pub(crate) fn remove(&mut self, tenant_shard_id: TenantShardId) -> TenantsMapRemoveResult {
202 0 : use std::collections::btree_map::Entry;
203 0 : match self {
204 0 : TenantsMap::Initializing => TenantsMapRemoveResult::Vacant,
205 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => match m.entry(tenant_shard_id) {
206 0 : Entry::Occupied(entry) => match entry.get() {
207 0 : TenantSlot::InProgress(barrier) => {
208 0 : TenantsMapRemoveResult::InProgress(barrier.clone())
209 : }
210 0 : _ => TenantsMapRemoveResult::Occupied(entry.remove()),
211 : },
212 0 : Entry::Vacant(_entry) => TenantsMapRemoveResult::Vacant,
213 : },
214 : }
215 0 : }
216 :
217 : #[cfg(all(debug_assertions, not(test)))]
218 0 : pub(crate) fn len(&self) -> usize {
219 0 : match self {
220 0 : TenantsMap::Initializing => 0,
221 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.len(),
222 : }
223 0 : }
224 : }
225 :
226 : /// Precursor to deletion of a tenant dir: we do a fast rename to a tmp path, and then
227 : /// the slower actual deletion in the background.
228 : ///
229 : /// This is "safe" in that that it won't leave behind a partially deleted directory
230 : /// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting
231 : /// the contents.
232 : ///
233 : /// This is pageserver-specific, as it relies on future processes after a crash to check
234 : /// for TEMP_FILE_SUFFIX when loading things.
235 0 : async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<Utf8PathBuf> {
236 0 : let parent = path
237 0 : .as_ref()
238 0 : .parent()
239 0 : // It is invalid to call this function with a relative path. Tenant directories
240 0 : // should always have a parent.
241 0 : .ok_or(std::io::Error::new(
242 0 : std::io::ErrorKind::InvalidInput,
243 0 : "Path must be absolute",
244 0 : ))?;
245 0 : let rand_suffix = rand::thread_rng()
246 0 : .sample_iter(&Alphanumeric)
247 0 : .take(8)
248 0 : .map(char::from)
249 0 : .collect::<String>()
250 0 : + TEMP_FILE_SUFFIX;
251 0 : let tmp_path = path_with_suffix_extension(&path, &rand_suffix);
252 0 : fs::rename(path.as_ref(), &tmp_path).await?;
253 0 : fs::File::open(parent).await?.sync_all().await?;
254 0 : Ok(tmp_path)
255 0 : }
256 :
257 : /// When we have moved a tenant's content to a temporary directory, we may delete it lazily in
258 : /// the background, and thereby avoid blocking any API requests on this deletion completing.
259 0 : fn spawn_background_purge(tmp_path: Utf8PathBuf) {
260 0 : // Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
261 0 : // After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
262 0 : let task_tenant_id = None;
263 0 :
264 0 : task_mgr::spawn(
265 0 : task_mgr::BACKGROUND_RUNTIME.handle(),
266 0 : TaskKind::MgmtRequest,
267 0 : task_tenant_id,
268 0 : None,
269 0 : "tenant_files_delete",
270 0 : false,
271 0 : async move {
272 0 : fs::remove_dir_all(tmp_path.as_path())
273 0 : .await
274 0 : .with_context(|| format!("tenant directory {:?} deletion", tmp_path))
275 0 : },
276 0 : );
277 0 : }
278 :
279 : static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
280 2 : Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
281 :
282 : /// The TenantManager is responsible for storing and mutating the collection of all tenants
283 : /// that this pageserver process has state for. Every Tenant and SecondaryTenant instance
284 : /// lives inside the TenantManager.
285 : ///
286 : /// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
287 : /// the same tenant twice concurrently, or trying to configure the same tenant into secondary
288 : /// and attached modes concurrently.
289 : pub struct TenantManager {
290 : conf: &'static PageServerConf,
291 : // TODO: currently this is a &'static pointing to TENANTs. When we finish refactoring
292 : // out of that static variable, the TenantManager can own this.
293 : // See https://github.com/neondatabase/neon/issues/5796
294 : tenants: &'static std::sync::RwLock<TenantsMap>,
295 : resources: TenantSharedResources,
296 :
297 : // Long-running operations that happen outside of a [`Tenant`] lifetime should respect this token.
298 : // This is for edge cases like tenant deletion. In normal cases (within a Tenant lifetime),
299 : // tenants have their own cancellation tokens, which we fire individually in [`Self::shutdown`], or
300 : // when the tenant detaches.
301 : cancel: CancellationToken,
302 : }
303 :
304 0 : fn emergency_generations(
305 0 : tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
306 0 : ) -> HashMap<TenantShardId, TenantStartupMode> {
307 0 : tenant_confs
308 0 : .iter()
309 0 : .filter_map(|(tid, lc)| {
310 0 : let lc = match lc {
311 0 : Ok(lc) => lc,
312 0 : Err(_) => return None,
313 : };
314 : Some((
315 0 : *tid,
316 0 : match &lc.mode {
317 0 : LocationMode::Attached(alc) => {
318 0 : TenantStartupMode::Attached((alc.attach_mode, alc.generation))
319 : }
320 0 : LocationMode::Secondary(_) => TenantStartupMode::Secondary,
321 : },
322 : ))
323 0 : })
324 0 : .collect()
325 0 : }
326 :
327 0 : async fn init_load_generations(
328 0 : conf: &'static PageServerConf,
329 0 : tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
330 0 : resources: &TenantSharedResources,
331 0 : cancel: &CancellationToken,
332 0 : ) -> anyhow::Result<Option<HashMap<TenantShardId, TenantStartupMode>>> {
333 0 : let generations = if conf.control_plane_emergency_mode {
334 0 : error!(
335 0 : "Emergency mode! Tenants will be attached unsafely using their last known generation"
336 : );
337 0 : emergency_generations(tenant_confs)
338 0 : } else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
339 0 : info!("Calling control plane API to re-attach tenants");
340 : // If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
341 0 : match client.re_attach(conf).await {
342 0 : Ok(tenants) => tenants
343 0 : .into_iter()
344 0 : .flat_map(|(id, rart)| {
345 0 : TenantStartupMode::from_reattach_tenant(rart).map(|tsm| (id, tsm))
346 0 : })
347 0 : .collect(),
348 : Err(RetryForeverError::ShuttingDown) => {
349 0 : anyhow::bail!("Shut down while waiting for control plane re-attach response")
350 : }
351 : }
352 : } else {
353 0 : info!("Control plane API not configured, tenant generations are disabled");
354 0 : return Ok(None);
355 : };
356 :
357 : // The deletion queue needs to know about the startup attachment state to decide which (if any) stored
358 : // deletion list entries may still be valid. We provide that by pushing a recovery operation into
359 : // the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
360 : // are processed, even though we don't block on recovery completing here.
361 0 : let attached_tenants = generations
362 0 : .iter()
363 0 : .flat_map(|(id, start_mode)| {
364 0 : match start_mode {
365 0 : TenantStartupMode::Attached((_mode, generation)) => Some(generation),
366 0 : TenantStartupMode::Secondary => None,
367 : }
368 0 : .map(|gen| (*id, *gen))
369 0 : })
370 0 : .collect();
371 0 : resources.deletion_queue_client.recover(attached_tenants)?;
372 :
373 0 : Ok(Some(generations))
374 0 : }
375 :
376 : /// Given a directory discovered in the pageserver's tenants/ directory, attempt
377 : /// to load a tenant config from it.
378 : ///
379 : /// If file is missing, return Ok(None)
380 0 : fn load_tenant_config(
381 0 : conf: &'static PageServerConf,
382 0 : dentry: Utf8DirEntry,
383 0 : ) -> anyhow::Result<Option<(TenantShardId, anyhow::Result<LocationConf>)>> {
384 0 : let tenant_dir_path = dentry.path().to_path_buf();
385 0 : if crate::is_temporary(&tenant_dir_path) {
386 0 : info!("Found temporary tenant directory, removing: {tenant_dir_path}");
387 : // No need to use safe_remove_tenant_dir_all because this is already
388 : // a temporary path
389 0 : if let Err(e) = std::fs::remove_dir_all(&tenant_dir_path) {
390 0 : error!(
391 0 : "Failed to remove temporary directory '{}': {:?}",
392 : tenant_dir_path, e
393 : );
394 0 : }
395 0 : return Ok(None);
396 0 : }
397 :
398 : // This case happens if we crash during attachment before writing a config into the dir
399 0 : let is_empty = tenant_dir_path
400 0 : .is_empty_dir()
401 0 : .with_context(|| format!("Failed to check whether {tenant_dir_path:?} is an empty dir"))?;
402 0 : if is_empty {
403 0 : info!("removing empty tenant directory {tenant_dir_path:?}");
404 0 : if let Err(e) = std::fs::remove_dir(&tenant_dir_path) {
405 0 : error!(
406 0 : "Failed to remove empty tenant directory '{}': {e:#}",
407 : tenant_dir_path
408 : )
409 0 : }
410 0 : return Ok(None);
411 0 : }
412 :
413 0 : let tenant_shard_id = match tenant_dir_path
414 0 : .file_name()
415 0 : .unwrap_or_default()
416 0 : .parse::<TenantShardId>()
417 : {
418 0 : Ok(id) => id,
419 : Err(_) => {
420 0 : warn!("Invalid tenant path (garbage in our repo directory?): {tenant_dir_path}",);
421 0 : return Ok(None);
422 : }
423 : };
424 :
425 0 : let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
426 0 : if tenant_ignore_mark_file.exists() {
427 0 : info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
428 0 : return Ok(None);
429 0 : }
430 0 :
431 0 : Ok(Some((
432 0 : tenant_shard_id,
433 0 : Tenant::load_tenant_config(conf, &tenant_shard_id),
434 0 : )))
435 0 : }
436 :
437 : /// Initial stage of load: walk the local tenants directory, clean up any temp files,
438 : /// and load configurations for the tenants we found.
439 : ///
440 : /// Do this in parallel, because we expect 10k+ tenants, so serial execution can take
441 : /// seconds even on reasonably fast drives.
442 0 : async fn init_load_tenant_configs(
443 0 : conf: &'static PageServerConf,
444 0 : ) -> anyhow::Result<HashMap<TenantShardId, anyhow::Result<LocationConf>>> {
445 0 : let tenants_dir = conf.tenants_path();
446 :
447 0 : let dentries = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<Utf8DirEntry>> {
448 0 : let dir_entries = tenants_dir
449 0 : .read_dir_utf8()
450 0 : .with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
451 :
452 0 : Ok(dir_entries.collect::<Result<Vec<_>, std::io::Error>>()?)
453 0 : })
454 0 : .await??;
455 :
456 0 : let mut configs = HashMap::new();
457 0 :
458 0 : let mut join_set = JoinSet::new();
459 0 : for dentry in dentries {
460 0 : join_set.spawn_blocking(move || load_tenant_config(conf, dentry));
461 0 : }
462 :
463 0 : while let Some(r) = join_set.join_next().await {
464 0 : if let Some((tenant_id, tenant_config)) = r?? {
465 0 : configs.insert(tenant_id, tenant_config);
466 0 : }
467 : }
468 :
469 0 : Ok(configs)
470 0 : }
471 :
472 : /// Initialize repositories with locally available timelines.
473 : /// Timelines that are only partially available locally (remote storage has more data than this pageserver)
474 : /// are scheduled for download and added to the tenant once download is completed.
475 0 : #[instrument(skip_all)]
476 : pub async fn init_tenant_mgr(
477 : conf: &'static PageServerConf,
478 : resources: TenantSharedResources,
479 : init_order: InitializationOrder,
480 : cancel: CancellationToken,
481 : ) -> anyhow::Result<TenantManager> {
482 : let mut tenants = BTreeMap::new();
483 :
484 : let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
485 :
486 : // Initialize dynamic limits that depend on system resources
487 : let system_memory =
488 : sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory())
489 : .total_memory();
490 : let max_ephemeral_layer_bytes =
491 : conf.ephemeral_bytes_per_memory_kb as u64 * (system_memory / 1024);
492 : tracing::info!("Initialized ephemeral layer size limit to {max_ephemeral_layer_bytes}, for {system_memory} bytes of memory");
493 : inmemory_layer::GLOBAL_RESOURCES.max_dirty_bytes.store(
494 : max_ephemeral_layer_bytes,
495 : std::sync::atomic::Ordering::Relaxed,
496 : );
497 :
498 : // Scan local filesystem for attached tenants
499 : let tenant_configs = init_load_tenant_configs(conf).await?;
500 :
501 : // Determine which tenants are to be secondary or attached, and in which generation
502 : let tenant_modes = init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
503 :
504 : tracing::info!(
505 : "Attaching {} tenants at startup, warming up {} at a time",
506 : tenant_configs.len(),
507 : conf.concurrent_tenant_warmup.initial_permits()
508 : );
509 : TENANT.startup_scheduled.inc_by(tenant_configs.len() as u64);
510 :
511 : // Accumulate futures for writing tenant configs, so that we can execute in parallel
512 : let mut config_write_futs = Vec::new();
513 :
514 : // Update the location configs according to the re-attach response and persist them to disk
515 : tracing::info!("Updating {} location configs", tenant_configs.len());
516 : for (tenant_shard_id, location_conf) in tenant_configs {
517 : let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
518 :
519 : let mut location_conf = match location_conf {
520 : Ok(l) => l,
521 : Err(e) => {
522 : warn!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Marking tenant broken, failed to {e:#}");
523 :
524 : tenants.insert(
525 : tenant_shard_id,
526 : TenantSlot::Attached(Tenant::create_broken_tenant(
527 : conf,
528 : tenant_shard_id,
529 : resources.remote_storage.clone(),
530 : format!("{}", e),
531 : )),
532 : );
533 : continue;
534 : }
535 : };
536 :
537 : // FIXME: if we were attached, and get demoted to secondary on re-attach, we
538 : // don't have a place to get a config.
539 : // (https://github.com/neondatabase/neon/issues/5377)
540 : const DEFAULT_SECONDARY_CONF: SecondaryLocationConfig =
541 : SecondaryLocationConfig { warm: true };
542 :
543 : if let Some(tenant_modes) = &tenant_modes {
544 : // We have a generation map: treat it as the authority for whether
545 : // this tenant is really attached.
546 : match tenant_modes.get(&tenant_shard_id) {
547 : None => {
548 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Detaching tenant, control plane omitted it in re-attach response");
549 :
550 : match safe_rename_tenant_dir(&tenant_dir_path).await {
551 : Ok(tmp_path) => {
552 : spawn_background_purge(tmp_path);
553 : }
554 : Err(e) => {
555 : error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
556 : "Failed to move detached tenant directory '{tenant_dir_path}': {e:?}");
557 : }
558 : };
559 :
560 : // We deleted local content: move on to next tenant, don't try and spawn this one.
561 : continue;
562 : }
563 : Some(TenantStartupMode::Secondary) => {
564 : if !matches!(location_conf.mode, LocationMode::Secondary(_)) {
565 : location_conf.mode = LocationMode::Secondary(DEFAULT_SECONDARY_CONF);
566 : }
567 : }
568 : Some(TenantStartupMode::Attached((attach_mode, generation))) => {
569 : let old_gen_higher = match &location_conf.mode {
570 : LocationMode::Attached(AttachedLocationConfig {
571 : generation: old_generation,
572 : attach_mode: _attach_mode,
573 : }) => {
574 : if old_generation > generation {
575 : Some(old_generation)
576 : } else {
577 : None
578 : }
579 : }
580 : _ => None,
581 : };
582 : if let Some(old_generation) = old_gen_higher {
583 : tracing::error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
584 : "Control plane gave decreasing generation ({generation:?}) in re-attach response for tenant that was attached in generation {:?}, demoting to secondary",
585 : old_generation
586 : );
587 :
588 : // We cannot safely attach this tenant given a bogus generation number, but let's avoid throwing away
589 : // local disk content: demote to secondary rather than detaching.
590 : location_conf.mode = LocationMode::Secondary(DEFAULT_SECONDARY_CONF);
591 : } else {
592 : location_conf.attach_in_generation(*attach_mode, *generation);
593 : }
594 : }
595 : }
596 : } else {
597 : // Legacy mode: no generation information, any tenant present
598 : // on local disk may activate
599 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Starting tenant in legacy mode, no generation",);
600 : };
601 :
602 : // Presence of a generation number implies attachment: attach the tenant
603 : // if it wasn't already, and apply the generation number.
604 0 : config_write_futs.push(async move {
605 0 : let r = Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await;
606 0 : (tenant_shard_id, location_conf, r)
607 0 : });
608 : }
609 :
610 : // Execute config writes with concurrency, to avoid bottlenecking on local FS write latency
611 : tracing::info!(
612 : "Writing {} location config files...",
613 : config_write_futs.len()
614 : );
615 : let config_write_results = futures::stream::iter(config_write_futs)
616 : .buffer_unordered(16)
617 : .collect::<Vec<_>>()
618 : .await;
619 :
620 : tracing::info!(
621 : "Spawning {} tenant shard locations...",
622 : config_write_results.len()
623 : );
624 : // For those shards that have live configurations, construct `Tenant` or `SecondaryTenant` objects and start them running
625 : for (tenant_shard_id, location_conf, config_write_result) in config_write_results {
626 : // Errors writing configs are fatal
627 : config_write_result?;
628 :
629 : let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
630 : let shard_identity = location_conf.shard;
631 : let slot = match location_conf.mode {
632 : LocationMode::Attached(attached_conf) => {
633 : match tenant_spawn(
634 : conf,
635 : tenant_shard_id,
636 : &tenant_dir_path,
637 : resources.clone(),
638 : AttachedTenantConf::new(location_conf.tenant_conf, attached_conf),
639 : shard_identity,
640 : Some(init_order.clone()),
641 : &TENANTS,
642 : SpawnMode::Lazy,
643 : &ctx,
644 : ) {
645 : Ok(tenant) => TenantSlot::Attached(tenant),
646 : Err(e) => {
647 : error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to start tenant: {e:#}");
648 : continue;
649 : }
650 : }
651 : }
652 : LocationMode::Secondary(secondary_conf) => {
653 : info!(
654 : tenant_id = %tenant_shard_id.tenant_id,
655 : shard_id = %tenant_shard_id.shard_slug(),
656 : "Starting secondary tenant"
657 : );
658 : TenantSlot::Secondary(SecondaryTenant::new(
659 : tenant_shard_id,
660 : shard_identity,
661 : location_conf.tenant_conf,
662 : &secondary_conf,
663 : ))
664 : }
665 : };
666 :
667 : METRICS.slot_inserted(&slot);
668 : tenants.insert(tenant_shard_id, slot);
669 : }
670 :
671 : info!("Processed {} local tenants at startup", tenants.len());
672 :
673 : let mut tenants_map = TENANTS.write().unwrap();
674 : assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
675 :
676 : *tenants_map = TenantsMap::Open(tenants);
677 :
678 : Ok(TenantManager {
679 : conf,
680 : tenants: &TENANTS,
681 : resources,
682 : cancel: CancellationToken::new(),
683 : })
684 : }
685 :
686 : /// Wrapper for Tenant::spawn that checks invariants before running, and inserts
687 : /// a broken tenant in the map if Tenant::spawn fails.
688 : #[allow(clippy::too_many_arguments)]
689 0 : fn tenant_spawn(
690 0 : conf: &'static PageServerConf,
691 0 : tenant_shard_id: TenantShardId,
692 0 : tenant_path: &Utf8Path,
693 0 : resources: TenantSharedResources,
694 0 : location_conf: AttachedTenantConf,
695 0 : shard_identity: ShardIdentity,
696 0 : init_order: Option<InitializationOrder>,
697 0 : tenants: &'static std::sync::RwLock<TenantsMap>,
698 0 : mode: SpawnMode,
699 0 : ctx: &RequestContext,
700 0 : ) -> anyhow::Result<Arc<Tenant>> {
701 0 : anyhow::ensure!(
702 0 : tenant_path.is_dir(),
703 0 : "Cannot load tenant from path {tenant_path:?}, it either does not exist or not a directory"
704 : );
705 0 : anyhow::ensure!(
706 0 : !crate::is_temporary(tenant_path),
707 0 : "Cannot load tenant from temporary path {tenant_path:?}"
708 : );
709 0 : anyhow::ensure!(
710 0 : !tenant_path.is_empty_dir().with_context(|| {
711 0 : format!("Failed to check whether {tenant_path:?} is an empty dir")
712 0 : })?,
713 0 : "Cannot load tenant from empty directory {tenant_path:?}"
714 : );
715 :
716 0 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
717 0 : anyhow::ensure!(
718 0 : !conf.tenant_ignore_mark_file_path(&tenant_shard_id).exists(),
719 0 : "Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
720 : );
721 :
722 0 : let remote_storage = resources.remote_storage.clone();
723 0 : let tenant = match Tenant::spawn(
724 0 : conf,
725 0 : tenant_shard_id,
726 0 : resources,
727 0 : location_conf,
728 0 : shard_identity,
729 0 : init_order,
730 0 : tenants,
731 0 : mode,
732 0 : ctx,
733 0 : ) {
734 0 : Ok(tenant) => tenant,
735 0 : Err(e) => {
736 0 : error!("Failed to spawn tenant {tenant_shard_id}, reason: {e:#}");
737 0 : Tenant::create_broken_tenant(conf, tenant_shard_id, remote_storage, format!("{e:#}"))
738 : }
739 : };
740 :
741 0 : Ok(tenant)
742 0 : }
743 :
744 2 : async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
745 2 : let mut join_set = JoinSet::new();
746 0 :
747 0 : #[cfg(all(debug_assertions, not(test)))]
748 0 : {
749 0 : // Check that our metrics properly tracked the size of the tenants map. This is a convenient location to check,
750 0 : // as it happens implicitly at the end of tests etc.
751 0 : let m = tenants.read().unwrap();
752 0 : debug_assert_eq!(METRICS.slots_total(), m.len() as u64);
753 : }
754 :
755 : // Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
756 2 : let (total_in_progress, total_attached) = {
757 2 : let mut m = tenants.write().unwrap();
758 2 : match &mut *m {
759 : TenantsMap::Initializing => {
760 0 : *m = TenantsMap::ShuttingDown(BTreeMap::default());
761 0 : info!("tenants map is empty");
762 0 : return;
763 : }
764 2 : TenantsMap::Open(tenants) => {
765 2 : let mut shutdown_state = BTreeMap::new();
766 2 : let mut total_in_progress = 0;
767 2 : let mut total_attached = 0;
768 :
769 2 : for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
770 2 : match v {
771 0 : TenantSlot::Attached(t) => {
772 0 : shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
773 0 : join_set.spawn(
774 0 : async move {
775 0 : let res = {
776 0 : let (_guard, shutdown_progress) = completion::channel();
777 0 : t.shutdown(shutdown_progress, ShutdownMode::FreezeAndFlush).await
778 : };
779 :
780 0 : if let Err(other_progress) = res {
781 : // join the another shutdown in progress
782 0 : other_progress.wait().await;
783 0 : }
784 :
785 : // we cannot afford per tenant logging here, because if s3 is degraded, we are
786 : // going to log too many lines
787 0 : debug!("tenant successfully stopped");
788 0 : }
789 0 : .instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())),
790 : );
791 :
792 0 : total_attached += 1;
793 : }
794 0 : TenantSlot::Secondary(state) => {
795 0 : // We don't need to wait for this individually per-tenant: the
796 0 : // downloader task will be waited on eventually, this cancel
797 0 : // is just to encourage it to drop out if it is doing work
798 0 : // for this tenant right now.
799 0 : state.cancel.cancel();
800 0 :
801 0 : shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state));
802 0 : }
803 2 : TenantSlot::InProgress(notify) => {
804 2 : // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
805 2 : // wait for their notifications to fire in this function.
806 2 : join_set.spawn(async move {
807 2 : notify.wait().await;
808 2 : });
809 2 :
810 2 : total_in_progress += 1;
811 2 : }
812 : }
813 : }
814 2 : *m = TenantsMap::ShuttingDown(shutdown_state);
815 2 : (total_in_progress, total_attached)
816 : }
817 : TenantsMap::ShuttingDown(_) => {
818 0 : error!("already shutting down, this function isn't supposed to be called more than once");
819 0 : return;
820 : }
821 : }
822 : };
823 :
824 2 : let started_at = std::time::Instant::now();
825 2 :
826 2 : info!(
827 0 : "Waiting for {} InProgress tenants and {} Attached tenants to shut down",
828 : total_in_progress, total_attached
829 : );
830 :
831 2 : let total = join_set.len();
832 2 : let mut panicked = 0;
833 2 : let mut buffering = true;
834 2 : const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
835 2 : let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
836 :
837 6 : while !join_set.is_empty() {
838 : tokio::select! {
839 : Some(joined) = join_set.join_next() => {
840 : match joined {
841 : Ok(()) => {},
842 : Err(join_error) if join_error.is_cancelled() => {
843 : unreachable!("we are not cancelling any of the tasks");
844 : }
845 : Err(join_error) if join_error.is_panic() => {
846 : // cannot really do anything, as this panic is likely a bug
847 : panicked += 1;
848 : }
849 : Err(join_error) => {
850 : warn!("unknown kind of JoinError: {join_error}");
851 : }
852 : }
853 : if !buffering {
854 : // buffer so that every 500ms since the first update (or starting) we'll log
855 : // how far away we are; this is because we will get SIGKILL'd at 10s, and we
856 : // are not able to log *then*.
857 : buffering = true;
858 : buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
859 : }
860 : },
861 : _ = &mut buffered, if buffering => {
862 : buffering = false;
863 : info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
864 : }
865 : }
866 : }
867 :
868 2 : if panicked > 0 {
869 0 : warn!(
870 : panicked,
871 0 : total, "observed panicks while shutting down tenants"
872 : );
873 2 : }
874 :
875 : // caller will log how long we took
876 2 : }
877 :
878 0 : #[derive(thiserror::Error, Debug)]
879 : pub(crate) enum UpsertLocationError {
880 : #[error("Bad config request: {0}")]
881 : BadRequest(anyhow::Error),
882 :
883 : #[error("Cannot change config in this state: {0}")]
884 : Unavailable(#[from] TenantMapError),
885 :
886 : #[error("Tenant is already being modified")]
887 : InProgress,
888 :
889 : #[error("Failed to flush: {0}")]
890 : Flush(anyhow::Error),
891 :
892 : #[error("Internal error: {0}")]
893 : Other(#[from] anyhow::Error),
894 : }
895 :
896 : impl TenantManager {
897 : /// Convenience function so that anyone with a TenantManager can get at the global configuration, without
898 : /// having to pass it around everywhere as a separate object.
899 0 : pub(crate) fn get_conf(&self) -> &'static PageServerConf {
900 0 : self.conf
901 0 : }
902 :
903 : /// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or currently
904 : /// undergoing a state change (i.e. slot is InProgress).
905 : ///
906 : /// The return Tenant is not guaranteed to be active: check its status after obtaing it, or
907 : /// use [`Tenant::wait_to_become_active`] before using it if you will do I/O on it.
908 0 : pub(crate) fn get_attached_tenant_shard(
909 0 : &self,
910 0 : tenant_shard_id: TenantShardId,
911 0 : ) -> Result<Arc<Tenant>, GetTenantError> {
912 0 : let locked = self.tenants.read().unwrap();
913 :
914 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
915 :
916 0 : match peek_slot {
917 0 : Some(TenantSlot::Attached(tenant)) => Ok(Arc::clone(tenant)),
918 0 : Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
919 : None | Some(TenantSlot::Secondary(_)) => {
920 0 : Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
921 : }
922 : }
923 0 : }
924 :
925 0 : pub(crate) fn get_secondary_tenant_shard(
926 0 : &self,
927 0 : tenant_shard_id: TenantShardId,
928 0 : ) -> Option<Arc<SecondaryTenant>> {
929 0 : let locked = self.tenants.read().unwrap();
930 0 :
931 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
932 0 : .ok()
933 0 : .flatten();
934 :
935 0 : match peek_slot {
936 0 : Some(TenantSlot::Secondary(s)) => Some(s.clone()),
937 0 : _ => None,
938 : }
939 0 : }
940 :
941 : /// Whether the `TenantManager` is responsible for the tenant shard
942 0 : pub(crate) fn manages_tenant_shard(&self, tenant_shard_id: TenantShardId) -> bool {
943 0 : let locked = self.tenants.read().unwrap();
944 0 :
945 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
946 0 : .ok()
947 0 : .flatten();
948 0 :
949 0 : peek_slot.is_some()
950 0 : }
951 :
952 0 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
953 : pub(crate) async fn upsert_location(
954 : &self,
955 : tenant_shard_id: TenantShardId,
956 : new_location_config: LocationConf,
957 : flush: Option<Duration>,
958 : mut spawn_mode: SpawnMode,
959 : ctx: &RequestContext,
960 : ) -> Result<Option<Arc<Tenant>>, UpsertLocationError> {
961 : debug_assert_current_span_has_tenant_id();
962 : info!("configuring tenant location to state {new_location_config:?}");
963 :
964 : enum FastPathModified {
965 : Attached(Arc<Tenant>),
966 : Secondary(Arc<SecondaryTenant>),
967 : }
968 :
969 : // Special case fast-path for updates to existing slots: if our upsert is only updating configuration,
970 : // then we do not need to set the slot to InProgress, we can just call into the
971 : // existng tenant.
972 : let fast_path_taken = {
973 : let locked = self.tenants.read().unwrap();
974 : let peek_slot =
975 : tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
976 : match (&new_location_config.mode, peek_slot) {
977 : (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
978 : match attach_conf.generation.cmp(&tenant.generation) {
979 : Ordering::Equal => {
980 : // A transition from Attached to Attached in the same generation, we may
981 : // take our fast path and just provide the updated configuration
982 : // to the tenant.
983 : tenant.set_new_location_config(
984 : AttachedTenantConf::try_from(new_location_config.clone())
985 : .map_err(UpsertLocationError::BadRequest)?,
986 : );
987 :
988 : Some(FastPathModified::Attached(tenant.clone()))
989 : }
990 : Ordering::Less => {
991 : return Err(UpsertLocationError::BadRequest(anyhow::anyhow!(
992 : "Generation {:?} is less than existing {:?}",
993 : attach_conf.generation,
994 : tenant.generation
995 : )));
996 : }
997 : Ordering::Greater => {
998 : // Generation advanced, fall through to general case of replacing `Tenant` object
999 : None
1000 : }
1001 : }
1002 : }
1003 : (
1004 : LocationMode::Secondary(secondary_conf),
1005 : Some(TenantSlot::Secondary(secondary_tenant)),
1006 : ) => {
1007 : secondary_tenant.set_config(secondary_conf);
1008 : secondary_tenant.set_tenant_conf(&new_location_config.tenant_conf);
1009 : Some(FastPathModified::Secondary(secondary_tenant.clone()))
1010 : }
1011 : _ => {
1012 : // Not an Attached->Attached transition, fall through to general case
1013 : None
1014 : }
1015 : }
1016 : };
1017 :
1018 : // Fast-path continued: having dropped out of the self.tenants lock, do the async
1019 : // phase of writing config and/or waiting for flush, before returning.
1020 : match fast_path_taken {
1021 : Some(FastPathModified::Attached(tenant)) => {
1022 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
1023 : .await?;
1024 :
1025 : // Transition to AttachedStale means we may well hold a valid generation
1026 : // still, and have been requested to go stale as part of a migration. If
1027 : // the caller set `flush`, then flush to remote storage.
1028 : if let LocationMode::Attached(AttachedLocationConfig {
1029 : generation: _,
1030 : attach_mode: AttachmentMode::Stale,
1031 : }) = &new_location_config.mode
1032 : {
1033 : if let Some(flush_timeout) = flush {
1034 : match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
1035 : Ok(Err(e)) => {
1036 : return Err(UpsertLocationError::Flush(e));
1037 : }
1038 : Ok(Ok(_)) => return Ok(Some(tenant)),
1039 : Err(_) => {
1040 : tracing::warn!(
1041 : timeout_ms = flush_timeout.as_millis(),
1042 : "Timed out waiting for flush to remote storage, proceeding anyway."
1043 : )
1044 : }
1045 : }
1046 : }
1047 : }
1048 :
1049 : return Ok(Some(tenant));
1050 : }
1051 : Some(FastPathModified::Secondary(_secondary_tenant)) => {
1052 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
1053 : .await?;
1054 :
1055 : return Ok(None);
1056 : }
1057 : None => {
1058 : // Proceed with the general case procedure, where we will shutdown & remove any existing
1059 : // slot contents and replace with a fresh one
1060 : }
1061 : };
1062 :
1063 : // General case for upserts to TenantsMap, excluding the case above: we will substitute an
1064 : // InProgress value to the slot while we make whatever changes are required. The state for
1065 : // the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
1066 : // the state is ill-defined while we're in transition. Transitions are async, but fast: we do
1067 : // not do significant I/O, and shutdowns should be prompt via cancellation tokens.
1068 : let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)
1069 0 : .map_err(|e| match e {
1070 : TenantSlotError::AlreadyExists(_, _) | TenantSlotError::NotFound(_) => {
1071 0 : unreachable!("Called with mode Any")
1072 : }
1073 0 : TenantSlotError::InProgress => UpsertLocationError::InProgress,
1074 0 : TenantSlotError::MapState(s) => UpsertLocationError::Unavailable(s),
1075 0 : })?;
1076 :
1077 : match slot_guard.get_old_value() {
1078 : Some(TenantSlot::Attached(tenant)) => {
1079 : // The case where we keep a Tenant alive was covered above in the special case
1080 : // for Attached->Attached transitions in the same generation. By this point,
1081 : // if we see an attached tenant we know it will be discarded and should be
1082 : // shut down.
1083 : let (_guard, progress) = utils::completion::channel();
1084 :
1085 : match tenant.get_attach_mode() {
1086 : AttachmentMode::Single | AttachmentMode::Multi => {
1087 : // Before we leave our state as the presumed holder of the latest generation,
1088 : // flush any outstanding deletions to reduce the risk of leaking objects.
1089 : self.resources.deletion_queue_client.flush_advisory()
1090 : }
1091 : AttachmentMode::Stale => {
1092 : // If we're stale there's not point trying to flush deletions
1093 : }
1094 : };
1095 :
1096 : info!("Shutting down attached tenant");
1097 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1098 : Ok(()) => {}
1099 : Err(barrier) => {
1100 : info!("Shutdown already in progress, waiting for it to complete");
1101 : barrier.wait().await;
1102 : }
1103 : }
1104 : slot_guard.drop_old_value().expect("We just shut it down");
1105 :
1106 : // Edge case: if we were called with SpawnMode::Create, but a Tenant already existed, then
1107 : // the caller thinks they're creating but the tenant already existed. We must switch to
1108 : // Eager mode so that when starting this Tenant we properly probe remote storage for timelines,
1109 : // rather than assuming it to be empty.
1110 : spawn_mode = SpawnMode::Eager;
1111 : }
1112 : Some(TenantSlot::Secondary(state)) => {
1113 : info!("Shutting down secondary tenant");
1114 : state.shutdown().await;
1115 : }
1116 : Some(TenantSlot::InProgress(_)) => {
1117 : // This should never happen: acquire_slot should error out
1118 : // if the contents of a slot were InProgress.
1119 : return Err(UpsertLocationError::Other(anyhow::anyhow!(
1120 : "Acquired an InProgress slot, this is a bug."
1121 : )));
1122 : }
1123 : None => {
1124 : // Slot was vacant, nothing needs shutting down.
1125 : }
1126 : }
1127 :
1128 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1129 : let timelines_path = self.conf.timelines_path(&tenant_shard_id);
1130 :
1131 : // Directory structure is the same for attached and secondary modes:
1132 : // create it if it doesn't exist. Timeline load/creation expects the
1133 : // timelines/ subdir to already exist.
1134 : //
1135 : // Does not need to be fsync'd because local storage is just a cache.
1136 : tokio::fs::create_dir_all(&timelines_path)
1137 : .await
1138 0 : .with_context(|| format!("Creating {timelines_path}"))?;
1139 :
1140 : // Before activating either secondary or attached mode, persist the
1141 : // configuration, so that on restart we will re-attach (or re-start
1142 : // secondary) on the tenant.
1143 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config).await?;
1144 :
1145 : let new_slot = match &new_location_config.mode {
1146 : LocationMode::Secondary(secondary_config) => {
1147 : let shard_identity = new_location_config.shard;
1148 : TenantSlot::Secondary(SecondaryTenant::new(
1149 : tenant_shard_id,
1150 : shard_identity,
1151 : new_location_config.tenant_conf,
1152 : secondary_config,
1153 : ))
1154 : }
1155 : LocationMode::Attached(_attach_config) => {
1156 : let shard_identity = new_location_config.shard;
1157 :
1158 : // Testing hack: if we are configured with no control plane, then drop the generation
1159 : // from upserts. This enables creating generation-less tenants even though neon_local
1160 : // always uses generations when calling the location conf API.
1161 : let attached_conf = if cfg!(feature = "testing") {
1162 : let mut conf = AttachedTenantConf::try_from(new_location_config)?;
1163 : if self.conf.control_plane_api.is_none() {
1164 : conf.location.generation = Generation::none();
1165 : }
1166 : conf
1167 : } else {
1168 : AttachedTenantConf::try_from(new_location_config)?
1169 : };
1170 :
1171 : let tenant = tenant_spawn(
1172 : self.conf,
1173 : tenant_shard_id,
1174 : &tenant_path,
1175 : self.resources.clone(),
1176 : attached_conf,
1177 : shard_identity,
1178 : None,
1179 : self.tenants,
1180 : spawn_mode,
1181 : ctx,
1182 : )?;
1183 :
1184 : TenantSlot::Attached(tenant)
1185 : }
1186 : };
1187 :
1188 : let attached_tenant = if let TenantSlot::Attached(tenant) = &new_slot {
1189 : Some(tenant.clone())
1190 : } else {
1191 : None
1192 : };
1193 :
1194 : match slot_guard.upsert(new_slot) {
1195 : Err(TenantSlotUpsertError::InternalError(e)) => {
1196 : Err(UpsertLocationError::Other(anyhow::anyhow!(e)))
1197 : }
1198 : Err(TenantSlotUpsertError::MapState(e)) => Err(UpsertLocationError::Unavailable(e)),
1199 : Err(TenantSlotUpsertError::ShuttingDown((new_slot, _completion))) => {
1200 : // If we just called tenant_spawn() on a new tenant, and can't insert it into our map, then
1201 : // we must not leak it: this would violate the invariant that after shutdown_all_tenants, all tenants
1202 : // are shutdown.
1203 : //
1204 : // We must shut it down inline here.
1205 : match new_slot {
1206 : TenantSlot::InProgress(_) => {
1207 : // Unreachable because we never insert an InProgress
1208 : unreachable!()
1209 : }
1210 : TenantSlot::Attached(tenant) => {
1211 : let (_guard, progress) = utils::completion::channel();
1212 : info!("Shutting down just-spawned tenant, because tenant manager is shut down");
1213 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1214 : Ok(()) => {
1215 : info!("Finished shutting down just-spawned tenant");
1216 : }
1217 : Err(barrier) => {
1218 : info!("Shutdown already in progress, waiting for it to complete");
1219 : barrier.wait().await;
1220 : }
1221 : }
1222 : }
1223 : TenantSlot::Secondary(secondary_tenant) => {
1224 : secondary_tenant.shutdown().await;
1225 : }
1226 : }
1227 :
1228 : Err(UpsertLocationError::Unavailable(
1229 : TenantMapError::ShuttingDown,
1230 : ))
1231 : }
1232 : Ok(()) => Ok(attached_tenant),
1233 : }
1234 : }
1235 :
1236 : /// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
1237 : /// LocationConf that was last used to attach it. Optionally, the local file cache may be
1238 : /// dropped before re-attaching.
1239 : ///
1240 : /// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
1241 : /// where an issue is identified that would go away with a restart of the tenant.
1242 : ///
1243 : /// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
1244 : /// to respect the cancellation tokens used in normal shutdown().
1245 0 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
1246 : pub(crate) async fn reset_tenant(
1247 : &self,
1248 : tenant_shard_id: TenantShardId,
1249 : drop_cache: bool,
1250 : ctx: &RequestContext,
1251 : ) -> anyhow::Result<()> {
1252 : let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1253 : let Some(old_slot) = slot_guard.get_old_value() else {
1254 : anyhow::bail!("Tenant not found when trying to reset");
1255 : };
1256 :
1257 : let Some(tenant) = old_slot.get_attached() else {
1258 : slot_guard.revert();
1259 : anyhow::bail!("Tenant is not in attached state");
1260 : };
1261 :
1262 : let (_guard, progress) = utils::completion::channel();
1263 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1264 : Ok(()) => {
1265 : slot_guard.drop_old_value()?;
1266 : }
1267 : Err(_barrier) => {
1268 : slot_guard.revert();
1269 : anyhow::bail!("Cannot reset Tenant, already shutting down");
1270 : }
1271 : }
1272 :
1273 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1274 : let timelines_path = self.conf.timelines_path(&tenant_shard_id);
1275 : let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
1276 :
1277 : if drop_cache {
1278 : tracing::info!("Dropping local file cache");
1279 :
1280 : match tokio::fs::read_dir(&timelines_path).await {
1281 : Err(e) => {
1282 : tracing::warn!("Failed to list timelines while dropping cache: {}", e);
1283 : }
1284 : Ok(mut entries) => {
1285 : while let Some(entry) = entries.next_entry().await? {
1286 : tokio::fs::remove_dir_all(entry.path()).await?;
1287 : }
1288 : }
1289 : }
1290 : }
1291 :
1292 : let shard_identity = config.shard;
1293 : let tenant = tenant_spawn(
1294 : self.conf,
1295 : tenant_shard_id,
1296 : &tenant_path,
1297 : self.resources.clone(),
1298 : AttachedTenantConf::try_from(config)?,
1299 : shard_identity,
1300 : None,
1301 : self.tenants,
1302 : SpawnMode::Eager,
1303 : ctx,
1304 : )?;
1305 :
1306 : slot_guard.upsert(TenantSlot::Attached(tenant))?;
1307 :
1308 : Ok(())
1309 : }
1310 :
1311 0 : pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec<Arc<Tenant>> {
1312 0 : let locked = self.tenants.read().unwrap();
1313 0 : match &*locked {
1314 0 : TenantsMap::Initializing => Vec::new(),
1315 0 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => map
1316 0 : .values()
1317 0 : .filter_map(|slot| {
1318 0 : slot.get_attached()
1319 0 : .and_then(|t| if t.is_active() { Some(t.clone()) } else { None })
1320 0 : })
1321 0 : .collect(),
1322 : }
1323 0 : }
1324 : // Do some synchronous work for all tenant slots in Secondary state. The provided
1325 : // callback should be small and fast, as it will be called inside the global
1326 : // TenantsMap lock.
1327 0 : pub(crate) fn foreach_secondary_tenants<F>(&self, mut func: F)
1328 0 : where
1329 0 : // TODO: let the callback return a hint to drop out of the loop early
1330 0 : F: FnMut(&TenantShardId, &Arc<SecondaryTenant>),
1331 0 : {
1332 0 : let locked = self.tenants.read().unwrap();
1333 :
1334 0 : let map = match &*locked {
1335 0 : TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
1336 0 : TenantsMap::Open(m) => m,
1337 : };
1338 :
1339 0 : for (tenant_id, slot) in map {
1340 0 : if let TenantSlot::Secondary(state) = slot {
1341 : // Only expose secondary tenants that are not currently shutting down
1342 0 : if !state.cancel.is_cancelled() {
1343 0 : func(tenant_id, state)
1344 0 : }
1345 0 : }
1346 : }
1347 0 : }
1348 :
1349 : /// Total list of all tenant slots: this includes attached, secondary, and InProgress.
1350 0 : pub(crate) fn list(&self) -> Vec<(TenantShardId, TenantSlot)> {
1351 0 : let locked = self.tenants.read().unwrap();
1352 0 : match &*locked {
1353 0 : TenantsMap::Initializing => Vec::new(),
1354 0 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
1355 0 : map.iter().map(|(k, v)| (*k, v.clone())).collect()
1356 : }
1357 : }
1358 0 : }
1359 :
1360 0 : pub(crate) fn get(&self, tenant_shard_id: TenantShardId) -> Option<TenantSlot> {
1361 0 : let locked = self.tenants.read().unwrap();
1362 0 : match &*locked {
1363 0 : TenantsMap::Initializing => None,
1364 0 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
1365 0 : map.get(&tenant_shard_id).cloned()
1366 : }
1367 : }
1368 0 : }
1369 :
1370 0 : pub(crate) async fn delete_tenant(
1371 0 : &self,
1372 0 : tenant_shard_id: TenantShardId,
1373 0 : activation_timeout: Duration,
1374 0 : ) -> Result<StatusCode, DeleteTenantError> {
1375 0 : super::span::debug_assert_current_span_has_tenant_id();
1376 : // We acquire a SlotGuard during this function to protect against concurrent
1377 : // changes while the ::prepare phase of DeleteTenantFlow executes, but then
1378 : // have to return the Tenant to the map while the background deletion runs.
1379 : //
1380 : // TODO: refactor deletion to happen outside the lifetime of a Tenant.
1381 : // Currently, deletion requires a reference to the tenants map in order to
1382 : // keep the Tenant in the map until deletion is complete, and then remove
1383 : // it at the end.
1384 : //
1385 : // See https://github.com/neondatabase/neon/issues/5080
1386 :
1387 : // Tenant deletion can happen two ways:
1388 : // - Legacy: called on an attached location. The attached Tenant object stays alive in Stopping
1389 : // state until deletion is complete.
1390 : // - New: called on a pageserver without an attached location. We proceed with deletion from
1391 : // remote storage.
1392 : //
1393 : // See https://github.com/neondatabase/neon/issues/5080 for more context on this transition.
1394 :
1395 0 : let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1396 0 : match &slot_guard.old_value {
1397 0 : Some(TenantSlot::Attached(tenant)) => {
1398 0 : // Legacy deletion flow: the tenant remains attached, goes to Stopping state, and
1399 0 : // deletion will be resumed across restarts.
1400 0 : let tenant = tenant.clone();
1401 0 : return self
1402 0 : .delete_tenant_attached(slot_guard, tenant, activation_timeout)
1403 0 : .await;
1404 : }
1405 0 : Some(TenantSlot::Secondary(secondary_tenant)) => {
1406 0 : secondary_tenant.shutdown().await;
1407 0 : let local_tenant_directory = self.conf.tenant_path(&tenant_shard_id);
1408 0 : let tmp_dir = safe_rename_tenant_dir(&local_tenant_directory)
1409 0 : .await
1410 0 : .with_context(|| {
1411 0 : format!("local tenant directory {local_tenant_directory:?} rename")
1412 0 : })?;
1413 0 : spawn_background_purge(tmp_dir);
1414 : }
1415 0 : Some(TenantSlot::InProgress(_)) => unreachable!(),
1416 0 : None => {}
1417 : };
1418 :
1419 : // Fall through: local state for this tenant is no longer present, proceed with remote delete
1420 0 : let remote_path = remote_tenant_path(&tenant_shard_id);
1421 0 : let keys = match self
1422 0 : .resources
1423 0 : .remote_storage
1424 0 : .list(
1425 0 : Some(&remote_path),
1426 0 : remote_storage::ListingMode::NoDelimiter,
1427 0 : None,
1428 0 : &self.cancel,
1429 0 : )
1430 0 : .await
1431 : {
1432 0 : Ok(listing) => listing.keys,
1433 : Err(remote_storage::DownloadError::Cancelled) => {
1434 0 : return Err(DeleteTenantError::Cancelled)
1435 : }
1436 0 : Err(remote_storage::DownloadError::NotFound) => return Ok(StatusCode::NOT_FOUND),
1437 0 : Err(other) => return Err(DeleteTenantError::Other(anyhow::anyhow!(other))),
1438 : };
1439 :
1440 0 : if keys.is_empty() {
1441 0 : tracing::info!("Remote storage already deleted");
1442 : } else {
1443 0 : tracing::info!("Deleting {} keys from remote storage", keys.len());
1444 0 : self.resources
1445 0 : .remote_storage
1446 0 : .delete_objects(&keys, &self.cancel)
1447 0 : .await?;
1448 : }
1449 :
1450 : // Callers use 404 as success for deletions, for historical reasons.
1451 0 : Ok(StatusCode::NOT_FOUND)
1452 0 : }
1453 :
1454 0 : async fn delete_tenant_attached(
1455 0 : &self,
1456 0 : slot_guard: SlotGuard,
1457 0 : tenant: Arc<Tenant>,
1458 0 : activation_timeout: Duration,
1459 0 : ) -> Result<StatusCode, DeleteTenantError> {
1460 0 : match tenant.current_state() {
1461 : TenantState::Broken { .. } | TenantState::Stopping { .. } => {
1462 : // If deletion is already in progress, return success (the semantics of this
1463 : // function are to rerturn success afterr deletion is spawned in background).
1464 : // Otherwise fall through and let [`DeleteTenantFlow`] handle this state.
1465 0 : if DeleteTenantFlow::is_in_progress(&tenant) {
1466 : // The `delete_progress` lock is held: deletion is already happening
1467 : // in the bacckground
1468 0 : slot_guard.revert();
1469 0 : return Ok(StatusCode::ACCEPTED);
1470 0 : }
1471 : }
1472 : _ => {
1473 0 : tenant
1474 0 : .wait_to_become_active(activation_timeout)
1475 0 : .await
1476 0 : .map_err(|e| match e {
1477 : GetActiveTenantError::WillNotBecomeActive(_)
1478 : | GetActiveTenantError::Broken(_) => {
1479 0 : DeleteTenantError::InvalidState(tenant.current_state())
1480 : }
1481 0 : GetActiveTenantError::Cancelled => DeleteTenantError::Cancelled,
1482 0 : GetActiveTenantError::NotFound(_) => DeleteTenantError::NotAttached,
1483 : GetActiveTenantError::WaitForActiveTimeout {
1484 0 : latest_state: _latest_state,
1485 0 : wait_time: _wait_time,
1486 0 : } => DeleteTenantError::InvalidState(tenant.current_state()),
1487 0 : })?;
1488 : }
1489 : }
1490 :
1491 0 : let result = DeleteTenantFlow::run(
1492 0 : self.conf,
1493 0 : self.resources.remote_storage.clone(),
1494 0 : &TENANTS,
1495 0 : tenant,
1496 0 : &self.cancel,
1497 0 : )
1498 0 : .await;
1499 :
1500 : // The Tenant goes back into the map in Stopping state, it will eventually be removed by DeleteTenantFLow
1501 0 : slot_guard.revert();
1502 0 : let () = result?;
1503 0 : Ok(StatusCode::ACCEPTED)
1504 0 : }
1505 :
1506 0 : #[instrument(skip_all, fields(tenant_id=%tenant.get_tenant_shard_id().tenant_id, shard_id=%tenant.get_tenant_shard_id().shard_slug(), new_shard_count=%new_shard_count.literal()))]
1507 : pub(crate) async fn shard_split(
1508 : &self,
1509 : tenant: Arc<Tenant>,
1510 : new_shard_count: ShardCount,
1511 : new_stripe_size: Option<ShardStripeSize>,
1512 : ctx: &RequestContext,
1513 : ) -> anyhow::Result<Vec<TenantShardId>> {
1514 : let tenant_shard_id = *tenant.get_tenant_shard_id();
1515 : let r = self
1516 : .do_shard_split(tenant, new_shard_count, new_stripe_size, ctx)
1517 : .await;
1518 : if r.is_err() {
1519 : // Shard splitting might have left the original shard in a partially shut down state (it
1520 : // stops the shard's remote timeline client). Reset it to ensure we leave things in
1521 : // a working state.
1522 : if self.get(tenant_shard_id).is_some() {
1523 : tracing::warn!("Resetting after shard split failure");
1524 : if let Err(e) = self.reset_tenant(tenant_shard_id, false, ctx).await {
1525 : // Log this error because our return value will still be the original error, not this one. This is
1526 : // a severe error: if this happens, we might be leaving behind a tenant that is not fully functional
1527 : // (e.g. has uploads disabled). We can't do anything else: if reset fails then shutting the tenant down or
1528 : // setting it broken probably won't help either.
1529 : tracing::error!("Failed to reset: {e}");
1530 : }
1531 : }
1532 : }
1533 :
1534 : r
1535 : }
1536 :
1537 0 : pub(crate) async fn do_shard_split(
1538 0 : &self,
1539 0 : tenant: Arc<Tenant>,
1540 0 : new_shard_count: ShardCount,
1541 0 : new_stripe_size: Option<ShardStripeSize>,
1542 0 : ctx: &RequestContext,
1543 0 : ) -> anyhow::Result<Vec<TenantShardId>> {
1544 0 : let tenant_shard_id = *tenant.get_tenant_shard_id();
1545 0 :
1546 0 : // Validate the incoming request
1547 0 : if new_shard_count.count() <= tenant_shard_id.shard_count.count() {
1548 0 : anyhow::bail!("Requested shard count is not an increase");
1549 0 : }
1550 0 : let expansion_factor = new_shard_count.count() / tenant_shard_id.shard_count.count();
1551 0 : if !expansion_factor.is_power_of_two() {
1552 0 : anyhow::bail!("Requested split is not a power of two");
1553 0 : }
1554 :
1555 0 : if let Some(new_stripe_size) = new_stripe_size {
1556 0 : if tenant.get_shard_stripe_size() != new_stripe_size
1557 0 : && tenant_shard_id.shard_count.count() > 1
1558 : {
1559 : // This tenant already has multiple shards, it is illegal to try and change its stripe size
1560 0 : anyhow::bail!(
1561 0 : "Shard stripe size may not be modified once tenant has multiple shards"
1562 0 : );
1563 0 : }
1564 0 : }
1565 :
1566 : // Plan: identify what the new child shards will be
1567 0 : let child_shards = tenant_shard_id.split(new_shard_count);
1568 0 : tracing::info!(
1569 0 : "Shard {} splits into: {}",
1570 0 : tenant_shard_id.to_index(),
1571 0 : child_shards
1572 0 : .iter()
1573 0 : .map(|id| format!("{}", id.to_index()))
1574 0 : .join(",")
1575 : );
1576 :
1577 0 : fail::fail_point!("shard-split-pre-prepare", |_| Err(anyhow::anyhow!(
1578 0 : "failpoint"
1579 0 : )));
1580 :
1581 0 : let parent_shard_identity = tenant.shard_identity;
1582 0 : let parent_tenant_conf = tenant.get_tenant_conf();
1583 0 : let parent_generation = tenant.generation;
1584 :
1585 : // Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
1586 0 : if let Err(e) = tenant.split_prepare(&child_shards).await {
1587 : // If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
1588 : // have been left in a partially-shut-down state.
1589 0 : tracing::warn!("Failed to prepare for split: {e}, reloading Tenant before returning");
1590 0 : return Err(e);
1591 0 : }
1592 0 :
1593 0 : fail::fail_point!("shard-split-post-prepare", |_| Err(anyhow::anyhow!(
1594 0 : "failpoint"
1595 0 : )));
1596 :
1597 0 : self.resources.deletion_queue_client.flush_advisory();
1598 0 :
1599 0 : // Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
1600 0 : drop(tenant);
1601 0 : let mut parent_slot_guard =
1602 0 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1603 0 : let parent = match parent_slot_guard.get_old_value() {
1604 0 : Some(TenantSlot::Attached(t)) => t,
1605 0 : Some(TenantSlot::Secondary(_)) => anyhow::bail!("Tenant location in secondary mode"),
1606 : Some(TenantSlot::InProgress(_)) => {
1607 : // tenant_map_acquire_slot never returns InProgress, if a slot was InProgress
1608 : // it would return an error.
1609 0 : unreachable!()
1610 : }
1611 : None => {
1612 : // We don't actually need the parent shard to still be attached to do our work, but it's
1613 : // a weird enough situation that the caller probably didn't want us to continue working
1614 : // if they had detached the tenant they requested the split on.
1615 0 : anyhow::bail!("Detached parent shard in the middle of split!")
1616 : }
1617 : };
1618 0 : fail::fail_point!("shard-split-pre-hardlink", |_| Err(anyhow::anyhow!(
1619 0 : "failpoint"
1620 0 : )));
1621 : // Optimization: hardlink layers from the parent into the children, so that they don't have to
1622 : // re-download & duplicate the data referenced in their initial IndexPart
1623 0 : self.shard_split_hardlink(parent, child_shards.clone())
1624 0 : .await?;
1625 0 : fail::fail_point!("shard-split-post-hardlink", |_| Err(anyhow::anyhow!(
1626 0 : "failpoint"
1627 0 : )));
1628 :
1629 : // Take a snapshot of where the parent's WAL ingest had got to: we will wait for
1630 : // child shards to reach this point.
1631 0 : let mut target_lsns = HashMap::new();
1632 0 : for timeline in parent.timelines.lock().unwrap().clone().values() {
1633 0 : target_lsns.insert(timeline.timeline_id, timeline.get_last_record_lsn());
1634 0 : }
1635 :
1636 : // TODO: we should have the parent shard stop its WAL ingest here, it's a waste of resources
1637 : // and could slow down the children trying to catch up.
1638 :
1639 : // Phase 3: Spawn the child shards
1640 0 : for child_shard in &child_shards {
1641 0 : let mut child_shard_identity = parent_shard_identity;
1642 0 : if let Some(new_stripe_size) = new_stripe_size {
1643 0 : child_shard_identity.stripe_size = new_stripe_size;
1644 0 : }
1645 0 : child_shard_identity.count = child_shard.shard_count;
1646 0 : child_shard_identity.number = child_shard.shard_number;
1647 0 :
1648 0 : let child_location_conf = LocationConf {
1649 0 : mode: LocationMode::Attached(AttachedLocationConfig {
1650 0 : generation: parent_generation,
1651 0 : attach_mode: AttachmentMode::Single,
1652 0 : }),
1653 0 : shard: child_shard_identity,
1654 0 : tenant_conf: parent_tenant_conf.clone(),
1655 0 : };
1656 0 :
1657 0 : self.upsert_location(
1658 0 : *child_shard,
1659 0 : child_location_conf,
1660 0 : None,
1661 0 : SpawnMode::Eager,
1662 0 : ctx,
1663 0 : )
1664 0 : .await?;
1665 : }
1666 :
1667 0 : fail::fail_point!("shard-split-post-child-conf", |_| Err(anyhow::anyhow!(
1668 0 : "failpoint"
1669 0 : )));
1670 :
1671 : // Phase 4: wait for child chards WAL ingest to catch up to target LSN
1672 0 : for child_shard_id in &child_shards {
1673 0 : let child_shard_id = *child_shard_id;
1674 0 : let child_shard = {
1675 0 : let locked = TENANTS.read().unwrap();
1676 0 : let peek_slot =
1677 0 : tenant_map_peek_slot(&locked, &child_shard_id, TenantSlotPeekMode::Read)?;
1678 0 : peek_slot.and_then(|s| s.get_attached()).cloned()
1679 : };
1680 0 : if let Some(t) = child_shard {
1681 : // Wait for the child shard to become active: this should be very quick because it only
1682 : // has to download the index_part that we just uploaded when creating it.
1683 0 : if let Err(e) = t.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await {
1684 : // This is not fatal: we have durably created the child shard. It just makes the
1685 : // split operation less seamless for clients, as we will may detach the parent
1686 : // shard before the child shards are fully ready to serve requests.
1687 0 : tracing::warn!("Failed to wait for shard {child_shard_id} to activate: {e}");
1688 0 : continue;
1689 0 : }
1690 0 :
1691 0 : let timelines = t.timelines.lock().unwrap().clone();
1692 0 : for timeline in timelines.values() {
1693 0 : let Some(target_lsn) = target_lsns.get(&timeline.timeline_id) else {
1694 0 : continue;
1695 : };
1696 :
1697 0 : tracing::info!(
1698 0 : "Waiting for child shard {}/{} to reach target lsn {}...",
1699 0 : child_shard_id,
1700 0 : timeline.timeline_id,
1701 : target_lsn
1702 : );
1703 :
1704 0 : fail::fail_point!("shard-split-lsn-wait", |_| Err(anyhow::anyhow!(
1705 0 : "failpoint"
1706 0 : )));
1707 0 : if let Err(e) = timeline
1708 0 : .wait_lsn(
1709 0 : *target_lsn,
1710 0 : crate::tenant::timeline::WaitLsnWaiter::Tenant,
1711 0 : ctx,
1712 0 : )
1713 0 : .await
1714 : {
1715 : // Failure here might mean shutdown, in any case this part is an optimization
1716 : // and we shouldn't hold up the split operation.
1717 0 : tracing::warn!(
1718 0 : "Failed to wait for timeline {} to reach lsn {target_lsn}: {e}",
1719 0 : timeline.timeline_id
1720 : );
1721 : } else {
1722 0 : tracing::info!(
1723 0 : "Child shard {}/{} reached target lsn {}",
1724 0 : child_shard_id,
1725 0 : timeline.timeline_id,
1726 : target_lsn
1727 : );
1728 : }
1729 : }
1730 0 : }
1731 : }
1732 :
1733 : // Phase 5: Shut down the parent shard, and erase it from disk
1734 0 : let (_guard, progress) = completion::channel();
1735 0 : match parent.shutdown(progress, ShutdownMode::Hard).await {
1736 0 : Ok(()) => {}
1737 0 : Err(other) => {
1738 0 : other.wait().await;
1739 : }
1740 : }
1741 0 : let local_tenant_directory = self.conf.tenant_path(&tenant_shard_id);
1742 0 : let tmp_path = safe_rename_tenant_dir(&local_tenant_directory)
1743 0 : .await
1744 0 : .with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))?;
1745 0 : spawn_background_purge(tmp_path);
1746 0 :
1747 0 : fail::fail_point!("shard-split-pre-finish", |_| Err(anyhow::anyhow!(
1748 0 : "failpoint"
1749 0 : )));
1750 :
1751 0 : parent_slot_guard.drop_old_value()?;
1752 :
1753 : // Phase 6: Release the InProgress on the parent shard
1754 0 : drop(parent_slot_guard);
1755 0 :
1756 0 : Ok(child_shards)
1757 0 : }
1758 :
1759 : /// Part of [`Self::shard_split`]: hard link parent shard layers into child shards, as an optimization
1760 : /// to avoid the children downloading them again.
1761 : ///
1762 : /// For each resident layer in the parent shard, we will hard link it into all of the child shards.
1763 0 : async fn shard_split_hardlink(
1764 0 : &self,
1765 0 : parent_shard: &Tenant,
1766 0 : child_shards: Vec<TenantShardId>,
1767 0 : ) -> anyhow::Result<()> {
1768 0 : debug_assert_current_span_has_tenant_id();
1769 0 :
1770 0 : let parent_path = self.conf.tenant_path(parent_shard.get_tenant_shard_id());
1771 0 : let (parent_timelines, parent_layers) = {
1772 0 : let mut parent_layers = Vec::new();
1773 0 : let timelines = parent_shard.timelines.lock().unwrap().clone();
1774 0 : let parent_timelines = timelines.keys().cloned().collect::<Vec<_>>();
1775 0 : for timeline in timelines.values() {
1776 0 : let timeline_layers = timeline
1777 0 : .layers
1778 0 : .read()
1779 0 : .await
1780 0 : .likely_resident_layers()
1781 0 : .collect::<Vec<_>>();
1782 :
1783 0 : for layer in timeline_layers {
1784 0 : let relative_path = layer
1785 0 : .local_path()
1786 0 : .strip_prefix(&parent_path)
1787 0 : .context("Removing prefix from parent layer path")?;
1788 0 : parent_layers.push(relative_path.to_owned());
1789 : }
1790 : }
1791 0 : debug_assert!(
1792 0 : !parent_layers.is_empty(),
1793 0 : "shutdown cannot empty the layermap"
1794 : );
1795 0 : (parent_timelines, parent_layers)
1796 0 : };
1797 0 :
1798 0 : let mut child_prefixes = Vec::new();
1799 0 : let mut create_dirs = Vec::new();
1800 :
1801 0 : for child in child_shards {
1802 0 : let child_prefix = self.conf.tenant_path(&child);
1803 0 : create_dirs.push(child_prefix.clone());
1804 0 : create_dirs.extend(
1805 0 : parent_timelines
1806 0 : .iter()
1807 0 : .map(|t| self.conf.timeline_path(&child, t)),
1808 0 : );
1809 0 :
1810 0 : child_prefixes.push(child_prefix);
1811 0 : }
1812 :
1813 : // Since we will do a large number of small filesystem metadata operations, batch them into
1814 : // spawn_blocking calls rather than doing each one as a tokio::fs round-trip.
1815 0 : let jh = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> {
1816 0 : for dir in &create_dirs {
1817 0 : if let Err(e) = std::fs::create_dir_all(dir) {
1818 : // Ignore AlreadyExists errors, drop out on all other errors
1819 0 : match e.kind() {
1820 0 : std::io::ErrorKind::AlreadyExists => {}
1821 : _ => {
1822 0 : return Err(anyhow::anyhow!(e).context(format!("Creating {dir}")));
1823 : }
1824 : }
1825 0 : }
1826 : }
1827 :
1828 0 : for child_prefix in child_prefixes {
1829 0 : for relative_layer in &parent_layers {
1830 0 : let parent_path = parent_path.join(relative_layer);
1831 0 : let child_path = child_prefix.join(relative_layer);
1832 0 : if let Err(e) = std::fs::hard_link(&parent_path, &child_path) {
1833 0 : match e.kind() {
1834 0 : std::io::ErrorKind::AlreadyExists => {}
1835 : std::io::ErrorKind::NotFound => {
1836 0 : tracing::info!(
1837 0 : "Layer {} not found during hard-linking, evicted during split?",
1838 : relative_layer
1839 : );
1840 : }
1841 : _ => {
1842 0 : return Err(anyhow::anyhow!(e).context(format!(
1843 0 : "Hard linking {relative_layer} into {child_prefix}"
1844 0 : )))
1845 : }
1846 : }
1847 0 : }
1848 : }
1849 : }
1850 :
1851 : // Durability is not required for correctness, but if we crashed during split and
1852 : // then came restarted with empty timeline dirs, it would be very inefficient to
1853 : // re-populate from remote storage.
1854 0 : for dir in create_dirs {
1855 0 : if let Err(e) = crashsafe::fsync(&dir) {
1856 : // Something removed a newly created timeline dir out from underneath us? Extremely
1857 : // unexpected, but not worth panic'ing over as this whole function is just an
1858 : // optimization.
1859 0 : tracing::warn!("Failed to fsync directory {dir}: {e}")
1860 0 : }
1861 : }
1862 :
1863 0 : Ok(parent_layers.len())
1864 0 : });
1865 0 :
1866 0 : match jh.await {
1867 0 : Ok(Ok(layer_count)) => {
1868 0 : tracing::info!(count = layer_count, "Hard linked layers into child shards");
1869 : }
1870 0 : Ok(Err(e)) => {
1871 0 : // This is an optimization, so we tolerate failure.
1872 0 : tracing::warn!("Error hard-linking layers, proceeding anyway: {e}")
1873 : }
1874 0 : Err(e) => {
1875 0 : // This is something totally unexpected like a panic, so bail out.
1876 0 : anyhow::bail!("Error joining hard linking task: {e}");
1877 : }
1878 : }
1879 :
1880 0 : Ok(())
1881 0 : }
1882 :
1883 : ///
1884 : /// Shut down all tenants. This runs as part of pageserver shutdown.
1885 : ///
1886 : /// NB: We leave the tenants in the map, so that they remain accessible through
1887 : /// the management API until we shut it down. If we removed the shut-down tenants
1888 : /// from the tenants map, the management API would return 404 for these tenants,
1889 : /// because TenantsMap::get() now returns `None`.
1890 : /// That could be easily misinterpreted by control plane, the consumer of the
1891 : /// management API. For example, it could attach the tenant on a different pageserver.
1892 : /// We would then be in split-brain once this pageserver restarts.
1893 0 : #[instrument(skip_all)]
1894 : pub(crate) async fn shutdown(&self) {
1895 : self.cancel.cancel();
1896 :
1897 : shutdown_all_tenants0(self.tenants).await
1898 : }
1899 :
1900 0 : pub(crate) async fn detach_tenant(
1901 0 : &self,
1902 0 : conf: &'static PageServerConf,
1903 0 : tenant_shard_id: TenantShardId,
1904 0 : detach_ignored: bool,
1905 0 : deletion_queue_client: &DeletionQueueClient,
1906 0 : ) -> Result<(), TenantStateError> {
1907 0 : let tmp_path = self
1908 0 : .detach_tenant0(
1909 0 : conf,
1910 0 : &TENANTS,
1911 0 : tenant_shard_id,
1912 0 : detach_ignored,
1913 0 : deletion_queue_client,
1914 0 : )
1915 0 : .await?;
1916 0 : spawn_background_purge(tmp_path);
1917 0 :
1918 0 : Ok(())
1919 0 : }
1920 :
1921 0 : async fn detach_tenant0(
1922 0 : &self,
1923 0 : conf: &'static PageServerConf,
1924 0 : tenants: &std::sync::RwLock<TenantsMap>,
1925 0 : tenant_shard_id: TenantShardId,
1926 0 : detach_ignored: bool,
1927 0 : deletion_queue_client: &DeletionQueueClient,
1928 0 : ) -> Result<Utf8PathBuf, TenantStateError> {
1929 0 : let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
1930 0 : let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
1931 0 : safe_rename_tenant_dir(&local_tenant_directory)
1932 0 : .await
1933 0 : .with_context(|| {
1934 0 : format!("local tenant directory {local_tenant_directory:?} rename")
1935 0 : })
1936 0 : };
1937 :
1938 0 : let removal_result = remove_tenant_from_memory(
1939 0 : tenants,
1940 0 : tenant_shard_id,
1941 0 : tenant_dir_rename_operation(tenant_shard_id),
1942 0 : )
1943 0 : .await;
1944 :
1945 : // Flush pending deletions, so that they have a good chance of passing validation
1946 : // before this tenant is potentially re-attached elsewhere.
1947 0 : deletion_queue_client.flush_advisory();
1948 0 :
1949 0 : // Ignored tenants are not present in memory and will bail the removal from memory operation.
1950 0 : // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
1951 0 : if detach_ignored
1952 0 : && matches!(
1953 0 : removal_result,
1954 : Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
1955 : )
1956 : {
1957 0 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
1958 0 : if tenant_ignore_mark.exists() {
1959 0 : info!("Detaching an ignored tenant");
1960 0 : let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
1961 0 : .await
1962 0 : .with_context(|| {
1963 0 : format!("Ignored tenant {tenant_shard_id} local directory rename")
1964 0 : })?;
1965 0 : return Ok(tmp_path);
1966 0 : }
1967 0 : }
1968 :
1969 0 : removal_result
1970 0 : }
1971 :
1972 0 : pub(crate) fn list_tenants(
1973 0 : &self,
1974 0 : ) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
1975 0 : let tenants = TENANTS.read().unwrap();
1976 0 : let m = match &*tenants {
1977 0 : TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
1978 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
1979 0 : };
1980 0 : Ok(m.iter()
1981 0 : .filter_map(|(id, tenant)| match tenant {
1982 0 : TenantSlot::Attached(tenant) => {
1983 0 : Some((*id, tenant.current_state(), tenant.generation()))
1984 : }
1985 0 : TenantSlot::Secondary(_) => None,
1986 0 : TenantSlot::InProgress(_) => None,
1987 0 : })
1988 0 : .collect())
1989 0 : }
1990 :
1991 : /// Completes an earlier prepared timeline detach ancestor.
1992 0 : pub(crate) async fn complete_detaching_timeline_ancestor(
1993 0 : &self,
1994 0 : tenant_shard_id: TenantShardId,
1995 0 : timeline_id: TimelineId,
1996 0 : prepared: PreparedTimelineDetach,
1997 0 : ctx: &RequestContext,
1998 0 : ) -> Result<Vec<TimelineId>, anyhow::Error> {
1999 : struct RevertOnDropSlot(Option<SlotGuard>);
2000 :
2001 : impl Drop for RevertOnDropSlot {
2002 0 : fn drop(&mut self) {
2003 0 : if let Some(taken) = self.0.take() {
2004 0 : taken.revert();
2005 0 : }
2006 0 : }
2007 : }
2008 :
2009 : impl RevertOnDropSlot {
2010 0 : fn into_inner(mut self) -> SlotGuard {
2011 0 : self.0.take().unwrap()
2012 0 : }
2013 : }
2014 :
2015 : impl std::ops::Deref for RevertOnDropSlot {
2016 : type Target = SlotGuard;
2017 :
2018 0 : fn deref(&self) -> &Self::Target {
2019 0 : self.0.as_ref().unwrap()
2020 0 : }
2021 : }
2022 :
2023 0 : let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
2024 0 : let slot_guard = RevertOnDropSlot(Some(slot_guard));
2025 :
2026 0 : let tenant = {
2027 0 : let Some(old_slot) = slot_guard.get_old_value() else {
2028 0 : anyhow::bail!(
2029 0 : "Tenant not found when trying to complete detaching timeline ancestor"
2030 0 : );
2031 : };
2032 :
2033 0 : let Some(tenant) = old_slot.get_attached() else {
2034 0 : anyhow::bail!("Tenant is not in attached state");
2035 : };
2036 :
2037 0 : if !tenant.is_active() {
2038 0 : anyhow::bail!("Tenant is not active");
2039 0 : }
2040 0 :
2041 0 : tenant.clone()
2042 : };
2043 :
2044 0 : let timeline = tenant.get_timeline(timeline_id, true)?;
2045 :
2046 0 : let reparented = timeline
2047 0 : .complete_detaching_timeline_ancestor(&tenant, prepared, ctx)
2048 0 : .await?;
2049 :
2050 0 : let mut slot_guard = slot_guard.into_inner();
2051 0 :
2052 0 : let (_guard, progress) = utils::completion::channel();
2053 0 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
2054 : Ok(()) => {
2055 0 : slot_guard.drop_old_value()?;
2056 : }
2057 0 : Err(_barrier) => {
2058 0 : slot_guard.revert();
2059 0 : // this really should not happen, at all, unless shutdown was already going?
2060 0 : anyhow::bail!("Cannot restart Tenant, already shutting down");
2061 : }
2062 : }
2063 :
2064 0 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
2065 0 : let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
2066 :
2067 0 : let shard_identity = config.shard;
2068 0 : let tenant = tenant_spawn(
2069 0 : self.conf,
2070 0 : tenant_shard_id,
2071 0 : &tenant_path,
2072 0 : self.resources.clone(),
2073 0 : AttachedTenantConf::try_from(config)?,
2074 0 : shard_identity,
2075 0 : None,
2076 0 : self.tenants,
2077 0 : SpawnMode::Eager,
2078 0 : ctx,
2079 0 : )?;
2080 :
2081 0 : slot_guard.upsert(TenantSlot::Attached(tenant))?;
2082 :
2083 0 : Ok(reparented)
2084 0 : }
2085 :
2086 : /// A page service client sends a TenantId, and to look up the correct Tenant we must
2087 : /// resolve this to a fully qualified TenantShardId.
2088 : ///
2089 : /// During shard splits: we shall see parent shards in InProgress state and skip them, and
2090 : /// instead match on child shards which should appear in Attached state. Very early in a shard
2091 : /// split, or in other cases where a shard is InProgress, we will return our own InProgress result
2092 : /// to instruct the caller to wait for that to finish before querying again.
2093 0 : pub(crate) fn resolve_attached_shard(
2094 0 : &self,
2095 0 : tenant_id: &TenantId,
2096 0 : selector: ShardSelector,
2097 0 : ) -> ShardResolveResult {
2098 0 : let tenants = self.tenants.read().unwrap();
2099 0 : let mut want_shard = None;
2100 0 : let mut any_in_progress = None;
2101 0 :
2102 0 : match &*tenants {
2103 0 : TenantsMap::Initializing => ShardResolveResult::NotFound,
2104 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
2105 0 : for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
2106 : // Ignore all slots that don't contain an attached tenant
2107 0 : let tenant = match &slot.1 {
2108 0 : TenantSlot::Attached(t) => t,
2109 0 : TenantSlot::InProgress(barrier) => {
2110 0 : // We might still find a usable shard, but in case we don't, remember that
2111 0 : // we saw at least one InProgress slot, so that we can distinguish this case
2112 0 : // from a simple NotFound in our return value.
2113 0 : any_in_progress = Some(barrier.clone());
2114 0 : continue;
2115 : }
2116 0 : _ => continue,
2117 : };
2118 :
2119 0 : match selector {
2120 0 : ShardSelector::First => return ShardResolveResult::Found(tenant.clone()),
2121 0 : ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
2122 0 : return ShardResolveResult::Found(tenant.clone())
2123 : }
2124 0 : ShardSelector::Page(key) => {
2125 0 : // First slot we see for this tenant, calculate the expected shard number
2126 0 : // for the key: we will use this for checking if this and subsequent
2127 0 : // slots contain the key, rather than recalculating the hash each time.
2128 0 : if want_shard.is_none() {
2129 0 : want_shard = Some(tenant.shard_identity.get_shard_number(&key));
2130 0 : }
2131 :
2132 0 : if Some(tenant.shard_identity.number) == want_shard {
2133 0 : return ShardResolveResult::Found(tenant.clone());
2134 0 : }
2135 : }
2136 0 : ShardSelector::Known(shard)
2137 0 : if tenant.shard_identity.shard_index() == shard =>
2138 0 : {
2139 0 : return ShardResolveResult::Found(tenant.clone());
2140 : }
2141 0 : _ => continue,
2142 : }
2143 : }
2144 :
2145 : // Fall through: we didn't find a slot that was in Attached state & matched our selector. If
2146 : // we found one or more InProgress slot, indicate to caller that they should retry later. Otherwise
2147 : // this requested shard simply isn't found.
2148 0 : if let Some(barrier) = any_in_progress {
2149 0 : ShardResolveResult::InProgress(barrier)
2150 : } else {
2151 0 : ShardResolveResult::NotFound
2152 : }
2153 : }
2154 : }
2155 0 : }
2156 : }
2157 :
2158 0 : #[derive(Debug, thiserror::Error)]
2159 : pub(crate) enum GetTenantError {
2160 : /// NotFound is a TenantId rather than TenantShardId, because this error type is used from
2161 : /// getters that use a TenantId and a ShardSelector, not just getters that target a specific shard.
2162 : #[error("Tenant {0} not found")]
2163 : NotFound(TenantId),
2164 :
2165 : #[error("Tenant {0} is not active")]
2166 : NotActive(TenantShardId),
2167 :
2168 : // Initializing or shutting down: cannot authoritatively say whether we have this tenant
2169 : #[error("Tenant map is not available: {0}")]
2170 : MapState(#[from] TenantMapError),
2171 : }
2172 :
2173 0 : #[derive(thiserror::Error, Debug)]
2174 : pub(crate) enum GetActiveTenantError {
2175 : /// We may time out either while TenantSlot is InProgress, or while the Tenant
2176 : /// is in a non-Active state
2177 : #[error(
2178 : "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
2179 : )]
2180 : WaitForActiveTimeout {
2181 : latest_state: Option<TenantState>,
2182 : wait_time: Duration,
2183 : },
2184 :
2185 : /// The TenantSlot is absent, or in secondary mode
2186 : #[error(transparent)]
2187 : NotFound(#[from] GetTenantError),
2188 :
2189 : /// Cancellation token fired while we were waiting
2190 : #[error("cancelled")]
2191 : Cancelled,
2192 :
2193 : /// Tenant exists, but is in a state that cannot become active (e.g. Stopping, Broken)
2194 : #[error("will not become active. Current state: {0}")]
2195 : WillNotBecomeActive(TenantState),
2196 :
2197 : /// Broken is logically a subset of WillNotBecomeActive, but a distinct error is useful as
2198 : /// WillNotBecomeActive is a permitted error under some circumstances, whereas broken should
2199 : /// never happen.
2200 : #[error("Tenant is broken: {0}")]
2201 : Broken(String),
2202 : }
2203 :
2204 0 : #[derive(Debug, thiserror::Error)]
2205 : pub(crate) enum DeleteTimelineError {
2206 : #[error("Tenant {0}")]
2207 : Tenant(#[from] GetTenantError),
2208 :
2209 : #[error("Timeline {0}")]
2210 : Timeline(#[from] crate::tenant::DeleteTimelineError),
2211 : }
2212 :
2213 0 : #[derive(Debug, thiserror::Error)]
2214 : pub(crate) enum TenantStateError {
2215 : #[error("Tenant {0} is stopping")]
2216 : IsStopping(TenantShardId),
2217 : #[error(transparent)]
2218 : SlotError(#[from] TenantSlotError),
2219 : #[error(transparent)]
2220 : SlotUpsertError(#[from] TenantSlotUpsertError),
2221 : #[error(transparent)]
2222 : Other(#[from] anyhow::Error),
2223 : }
2224 :
2225 0 : pub(crate) async fn load_tenant(
2226 0 : conf: &'static PageServerConf,
2227 0 : tenant_id: TenantId,
2228 0 : generation: Generation,
2229 0 : broker_client: storage_broker::BrokerClientChannel,
2230 0 : remote_storage: GenericRemoteStorage,
2231 0 : deletion_queue_client: DeletionQueueClient,
2232 0 : ctx: &RequestContext,
2233 0 : ) -> Result<(), TenantMapInsertError> {
2234 0 : // This is a legacy API (replaced by `/location_conf`). It does not support sharding
2235 0 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
2236 :
2237 0 : let slot_guard =
2238 0 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
2239 0 : let tenant_path = conf.tenant_path(&tenant_shard_id);
2240 0 :
2241 0 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
2242 0 : if tenant_ignore_mark.exists() {
2243 0 : std::fs::remove_file(&tenant_ignore_mark).with_context(|| {
2244 0 : format!(
2245 0 : "Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"
2246 0 : )
2247 0 : })?;
2248 0 : }
2249 :
2250 0 : let resources = TenantSharedResources {
2251 0 : broker_client,
2252 0 : remote_storage,
2253 0 : deletion_queue_client,
2254 0 : };
2255 :
2256 0 : let mut location_conf =
2257 0 : Tenant::load_tenant_config(conf, &tenant_shard_id).map_err(TenantMapInsertError::Other)?;
2258 0 : location_conf.attach_in_generation(AttachmentMode::Single, generation);
2259 0 :
2260 0 : Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
2261 :
2262 0 : let shard_identity = location_conf.shard;
2263 0 : let new_tenant = tenant_spawn(
2264 0 : conf,
2265 0 : tenant_shard_id,
2266 0 : &tenant_path,
2267 0 : resources,
2268 0 : AttachedTenantConf::try_from(location_conf)?,
2269 0 : shard_identity,
2270 0 : None,
2271 0 : &TENANTS,
2272 0 : SpawnMode::Eager,
2273 0 : ctx,
2274 0 : )
2275 0 : .with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))?;
2276 :
2277 0 : slot_guard.upsert(TenantSlot::Attached(new_tenant))?;
2278 0 : Ok(())
2279 0 : }
2280 :
2281 0 : pub(crate) async fn ignore_tenant(
2282 0 : conf: &'static PageServerConf,
2283 0 : tenant_id: TenantId,
2284 0 : ) -> Result<(), TenantStateError> {
2285 0 : ignore_tenant0(conf, &TENANTS, tenant_id).await
2286 0 : }
2287 :
2288 0 : #[instrument(skip_all, fields(shard_id))]
2289 : async fn ignore_tenant0(
2290 : conf: &'static PageServerConf,
2291 : tenants: &std::sync::RwLock<TenantsMap>,
2292 : tenant_id: TenantId,
2293 : ) -> Result<(), TenantStateError> {
2294 : // This is a legacy API (replaced by `/location_conf`). It does not support sharding
2295 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
2296 : tracing::Span::current().record(
2297 : "shard_id",
2298 : tracing::field::display(tenant_shard_id.shard_slug()),
2299 : );
2300 :
2301 0 : remove_tenant_from_memory(tenants, tenant_shard_id, async {
2302 0 : let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
2303 0 : fs::File::create(&ignore_mark_file)
2304 0 : .await
2305 0 : .context("Failed to create ignore mark file")
2306 0 : .and_then(|_| {
2307 0 : crashsafe::fsync_file_and_parent(&ignore_mark_file)
2308 0 : .context("Failed to fsync ignore mark file")
2309 0 : })
2310 0 : .with_context(|| format!("Failed to crate ignore mark for tenant {tenant_shard_id}"))?;
2311 0 : Ok(())
2312 0 : })
2313 : .await
2314 : }
2315 :
2316 0 : #[derive(Debug, thiserror::Error)]
2317 : pub(crate) enum TenantMapListError {
2318 : #[error("tenant map is still initiailizing")]
2319 : Initializing,
2320 : }
2321 :
2322 0 : #[derive(Debug, thiserror::Error)]
2323 : pub(crate) enum TenantMapInsertError {
2324 : #[error(transparent)]
2325 : SlotError(#[from] TenantSlotError),
2326 : #[error(transparent)]
2327 : SlotUpsertError(#[from] TenantSlotUpsertError),
2328 : #[error(transparent)]
2329 : Other(#[from] anyhow::Error),
2330 : }
2331 :
2332 : /// Superset of TenantMapError: issues that can occur when acquiring a slot
2333 : /// for a particular tenant ID.
2334 0 : #[derive(Debug, thiserror::Error)]
2335 : pub(crate) enum TenantSlotError {
2336 : /// When acquiring a slot with the expectation that the tenant already exists.
2337 : #[error("Tenant {0} not found")]
2338 : NotFound(TenantShardId),
2339 :
2340 : /// When acquiring a slot with the expectation that the tenant does not already exist.
2341 : #[error("tenant {0} already exists, state: {1:?}")]
2342 : AlreadyExists(TenantShardId, TenantState),
2343 :
2344 : // Tried to read a slot that is currently being mutated by another administrative
2345 : // operation.
2346 : #[error("tenant has a state change in progress, try again later")]
2347 : InProgress,
2348 :
2349 : #[error(transparent)]
2350 : MapState(#[from] TenantMapError),
2351 : }
2352 :
2353 : /// Superset of TenantMapError: issues that can occur when using a SlotGuard
2354 : /// to insert a new value.
2355 0 : #[derive(thiserror::Error)]
2356 : pub(crate) enum TenantSlotUpsertError {
2357 : /// An error where the slot is in an unexpected state, indicating a code bug
2358 : #[error("Internal error updating Tenant")]
2359 : InternalError(Cow<'static, str>),
2360 :
2361 : #[error(transparent)]
2362 : MapState(TenantMapError),
2363 :
2364 : // If we encounter TenantManager shutdown during upsert, we must carry the Completion
2365 : // from the SlotGuard, so that the caller can hold it while they clean up: otherwise
2366 : // TenantManager shutdown might race ahead before we're done cleaning up any Tenant that
2367 : // was protected by the SlotGuard.
2368 : #[error("Shutting down")]
2369 : ShuttingDown((TenantSlot, utils::completion::Completion)),
2370 : }
2371 :
2372 : impl std::fmt::Debug for TenantSlotUpsertError {
2373 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2374 0 : match self {
2375 0 : Self::InternalError(reason) => write!(f, "Internal Error {reason}"),
2376 0 : Self::MapState(map_error) => write!(f, "Tenant map state: {map_error:?}"),
2377 0 : Self::ShuttingDown(_completion) => write!(f, "Tenant map shutting down"),
2378 : }
2379 0 : }
2380 : }
2381 :
2382 0 : #[derive(Debug, thiserror::Error)]
2383 : enum TenantSlotDropError {
2384 : /// It is only legal to drop a TenantSlot if its contents are fully shut down
2385 : #[error("Tenant was not shut down")]
2386 : NotShutdown,
2387 : }
2388 :
2389 : /// Errors that can happen any time we are walking the tenant map to try and acquire
2390 : /// the TenantSlot for a particular tenant.
2391 0 : #[derive(Debug, thiserror::Error)]
2392 : pub enum TenantMapError {
2393 : // Tried to read while initializing
2394 : #[error("tenant map is still initializing")]
2395 : StillInitializing,
2396 :
2397 : // Tried to read while shutting down
2398 : #[error("tenant map is shutting down")]
2399 : ShuttingDown,
2400 : }
2401 :
2402 : /// Guards a particular tenant_id's content in the TenantsMap. While this
2403 : /// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
2404 : /// for this tenant, which acts as a marker for any operations targeting
2405 : /// this tenant to retry later, or wait for the InProgress state to end.
2406 : ///
2407 : /// This structure enforces the important invariant that we do not have overlapping
2408 : /// tasks that will try use local storage for a the same tenant ID: we enforce that
2409 : /// the previous contents of a slot have been shut down before the slot can be
2410 : /// left empty or used for something else
2411 : ///
2412 : /// Holders of a SlotGuard should explicitly dispose of it, using either `upsert`
2413 : /// to provide a new value, or `revert` to put the slot back into its initial
2414 : /// state. If the SlotGuard is dropped without calling either of these, then
2415 : /// we will leave the slot empty if our `old_value` is already shut down, else
2416 : /// we will replace the slot with `old_value` (equivalent to doing a revert).
2417 : ///
2418 : /// The `old_value` may be dropped before the SlotGuard is dropped, by calling
2419 : /// `drop_old_value`. It is an error to call this without shutting down
2420 : /// the conents of `old_value`.
2421 : pub struct SlotGuard {
2422 : tenant_shard_id: TenantShardId,
2423 : old_value: Option<TenantSlot>,
2424 : upserted: bool,
2425 :
2426 : /// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will
2427 : /// release any waiters as soon as this SlotGuard is dropped.
2428 : completion: utils::completion::Completion,
2429 : }
2430 :
2431 : impl SlotGuard {
2432 2 : fn new(
2433 2 : tenant_shard_id: TenantShardId,
2434 2 : old_value: Option<TenantSlot>,
2435 2 : completion: utils::completion::Completion,
2436 2 : ) -> Self {
2437 2 : Self {
2438 2 : tenant_shard_id,
2439 2 : old_value,
2440 2 : upserted: false,
2441 2 : completion,
2442 2 : }
2443 2 : }
2444 :
2445 : /// Get any value that was present in the slot before we acquired ownership
2446 : /// of it: in state transitions, this will be the old state.
2447 2 : fn get_old_value(&self) -> &Option<TenantSlot> {
2448 2 : &self.old_value
2449 2 : }
2450 :
2451 : /// Emplace a new value in the slot. This consumes the guard, and after
2452 : /// returning, the slot is no longer protected from concurrent changes.
2453 0 : fn upsert(mut self, new_value: TenantSlot) -> Result<(), TenantSlotUpsertError> {
2454 0 : if !self.old_value_is_shutdown() {
2455 : // This is a bug: callers should never try to drop an old value without
2456 : // shutting it down
2457 0 : return Err(TenantSlotUpsertError::InternalError(
2458 0 : "Old TenantSlot value not shut down".into(),
2459 0 : ));
2460 0 : }
2461 :
2462 0 : let replaced = {
2463 0 : let mut locked = TENANTS.write().unwrap();
2464 0 :
2465 0 : if let TenantSlot::InProgress(_) = new_value {
2466 : // It is never expected to try and upsert InProgress via this path: it should
2467 : // only be written via the tenant_map_acquire_slot path. If we hit this it's a bug.
2468 0 : return Err(TenantSlotUpsertError::InternalError(
2469 0 : "Attempt to upsert an InProgress state".into(),
2470 0 : ));
2471 0 : }
2472 :
2473 0 : let m = match &mut *locked {
2474 : TenantsMap::Initializing => {
2475 0 : return Err(TenantSlotUpsertError::MapState(
2476 0 : TenantMapError::StillInitializing,
2477 0 : ))
2478 : }
2479 : TenantsMap::ShuttingDown(_) => {
2480 0 : return Err(TenantSlotUpsertError::ShuttingDown((
2481 0 : new_value,
2482 0 : self.completion.clone(),
2483 0 : )));
2484 : }
2485 0 : TenantsMap::Open(m) => m,
2486 0 : };
2487 0 :
2488 0 : METRICS.slot_inserted(&new_value);
2489 0 :
2490 0 : let replaced = m.insert(self.tenant_shard_id, new_value);
2491 0 : self.upserted = true;
2492 0 : if let Some(replaced) = replaced.as_ref() {
2493 0 : METRICS.slot_removed(replaced);
2494 0 : }
2495 :
2496 0 : replaced
2497 : };
2498 :
2499 : // Sanity check: on an upsert we should always be replacing an InProgress marker
2500 0 : match replaced {
2501 : Some(TenantSlot::InProgress(_)) => {
2502 : // Expected case: we find our InProgress in the map: nothing should have
2503 : // replaced it because the code that acquires slots will not grant another
2504 : // one for the same TenantId.
2505 0 : Ok(())
2506 : }
2507 : None => {
2508 0 : METRICS.unexpected_errors.inc();
2509 0 : error!(
2510 : tenant_shard_id = %self.tenant_shard_id,
2511 0 : "Missing InProgress marker during tenant upsert, this is a bug."
2512 : );
2513 0 : Err(TenantSlotUpsertError::InternalError(
2514 0 : "Missing InProgress marker during tenant upsert".into(),
2515 0 : ))
2516 : }
2517 0 : Some(slot) => {
2518 0 : METRICS.unexpected_errors.inc();
2519 0 : error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot);
2520 0 : Err(TenantSlotUpsertError::InternalError(
2521 0 : "Unexpected contents of TenantSlot".into(),
2522 0 : ))
2523 : }
2524 : }
2525 0 : }
2526 :
2527 : /// Replace the InProgress slot with whatever was in the guard when we started
2528 0 : fn revert(mut self) {
2529 0 : if let Some(value) = self.old_value.take() {
2530 0 : match self.upsert(value) {
2531 0 : Err(TenantSlotUpsertError::InternalError(_)) => {
2532 0 : // We already logged the error, nothing else we can do.
2533 0 : }
2534 : Err(
2535 : TenantSlotUpsertError::MapState(_) | TenantSlotUpsertError::ShuttingDown(_),
2536 0 : ) => {
2537 0 : // If the map is shutting down, we need not replace anything
2538 0 : }
2539 0 : Ok(()) => {}
2540 : }
2541 0 : }
2542 0 : }
2543 :
2544 : /// We may never drop our old value until it is cleanly shut down: otherwise we might leave
2545 : /// rogue background tasks that would write to the local tenant directory that this guard
2546 : /// is responsible for protecting
2547 2 : fn old_value_is_shutdown(&self) -> bool {
2548 2 : match self.old_value.as_ref() {
2549 2 : Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(),
2550 0 : Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(),
2551 : Some(TenantSlot::InProgress(_)) => {
2552 : // A SlotGuard cannot be constructed for a slot that was already InProgress
2553 0 : unreachable!()
2554 : }
2555 0 : None => true,
2556 : }
2557 2 : }
2558 :
2559 : /// The guard holder is done with the old value of the slot: they are obliged to already
2560 : /// shut it down before we reach this point.
2561 2 : fn drop_old_value(&mut self) -> Result<(), TenantSlotDropError> {
2562 2 : if !self.old_value_is_shutdown() {
2563 0 : Err(TenantSlotDropError::NotShutdown)
2564 : } else {
2565 2 : self.old_value.take();
2566 2 : Ok(())
2567 : }
2568 2 : }
2569 : }
2570 :
2571 : impl Drop for SlotGuard {
2572 2 : fn drop(&mut self) {
2573 2 : if self.upserted {
2574 0 : return;
2575 2 : }
2576 2 : // Our old value is already shutdown, or it never existed: it is safe
2577 2 : // for us to fully release the TenantSlot back into an empty state
2578 2 :
2579 2 : let mut locked = TENANTS.write().unwrap();
2580 :
2581 2 : let m = match &mut *locked {
2582 : TenantsMap::Initializing => {
2583 : // There is no map, this should never happen.
2584 2 : return;
2585 : }
2586 : TenantsMap::ShuttingDown(_) => {
2587 : // When we transition to shutdown, InProgress elements are removed
2588 : // from the map, so we do not need to clean up our Inprogress marker.
2589 : // See [`shutdown_all_tenants0`]
2590 0 : return;
2591 : }
2592 0 : TenantsMap::Open(m) => m,
2593 0 : };
2594 0 :
2595 0 : use std::collections::btree_map::Entry;
2596 0 : match m.entry(self.tenant_shard_id) {
2597 0 : Entry::Occupied(mut entry) => {
2598 0 : if !matches!(entry.get(), TenantSlot::InProgress(_)) {
2599 0 : METRICS.unexpected_errors.inc();
2600 0 : error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get());
2601 0 : }
2602 :
2603 0 : if self.old_value_is_shutdown() {
2604 0 : METRICS.slot_removed(entry.get());
2605 0 : entry.remove();
2606 0 : } else {
2607 0 : let inserting = self.old_value.take().unwrap();
2608 0 : METRICS.slot_inserted(&inserting);
2609 0 : let replaced = entry.insert(inserting);
2610 0 : METRICS.slot_removed(&replaced);
2611 0 : }
2612 : }
2613 : Entry::Vacant(_) => {
2614 0 : METRICS.unexpected_errors.inc();
2615 0 : error!(
2616 : tenant_shard_id = %self.tenant_shard_id,
2617 0 : "Missing InProgress marker during SlotGuard drop, this is a bug."
2618 : );
2619 : }
2620 : }
2621 2 : }
2622 : }
2623 :
2624 : enum TenantSlotPeekMode {
2625 : /// In Read mode, peek will be permitted to see the slots even if the pageserver is shutting down
2626 : Read,
2627 : /// In Write mode, trying to peek at a slot while the pageserver is shutting down is an error
2628 : Write,
2629 : }
2630 :
2631 0 : fn tenant_map_peek_slot<'a>(
2632 0 : tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
2633 0 : tenant_shard_id: &TenantShardId,
2634 0 : mode: TenantSlotPeekMode,
2635 0 : ) -> Result<Option<&'a TenantSlot>, TenantMapError> {
2636 0 : match tenants.deref() {
2637 0 : TenantsMap::Initializing => Err(TenantMapError::StillInitializing),
2638 0 : TenantsMap::ShuttingDown(m) => match mode {
2639 : TenantSlotPeekMode::Read => Ok(Some(
2640 : // When reading in ShuttingDown state, we must translate None results
2641 : // into a ShuttingDown error, because absence of a tenant shard ID in the map
2642 : // isn't a reliable indicator of the tenant being gone: it might have been
2643 : // InProgress when shutdown started, and cleaned up from that state such
2644 : // that it's now no longer in the map. Callers will have to wait until
2645 : // we next start up to get a proper answer. This avoids incorrect 404 API responses.
2646 0 : m.get(tenant_shard_id).ok_or(TenantMapError::ShuttingDown)?,
2647 : )),
2648 0 : TenantSlotPeekMode::Write => Err(TenantMapError::ShuttingDown),
2649 : },
2650 0 : TenantsMap::Open(m) => Ok(m.get(tenant_shard_id)),
2651 : }
2652 0 : }
2653 :
2654 : enum TenantSlotAcquireMode {
2655 : /// Acquire the slot irrespective of current state, or whether it already exists
2656 : Any,
2657 : /// Return an error if trying to acquire a slot and it doesn't already exist
2658 : MustExist,
2659 : /// Return an error if trying to acquire a slot and it already exists
2660 : MustNotExist,
2661 : }
2662 :
2663 0 : fn tenant_map_acquire_slot(
2664 0 : tenant_shard_id: &TenantShardId,
2665 0 : mode: TenantSlotAcquireMode,
2666 0 : ) -> Result<SlotGuard, TenantSlotError> {
2667 0 : tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode)
2668 0 : }
2669 :
2670 2 : fn tenant_map_acquire_slot_impl(
2671 2 : tenant_shard_id: &TenantShardId,
2672 2 : tenants: &std::sync::RwLock<TenantsMap>,
2673 2 : mode: TenantSlotAcquireMode,
2674 2 : ) -> Result<SlotGuard, TenantSlotError> {
2675 2 : use TenantSlotAcquireMode::*;
2676 2 : METRICS.tenant_slot_writes.inc();
2677 2 :
2678 2 : let mut locked = tenants.write().unwrap();
2679 2 : let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug());
2680 2 : let _guard = span.enter();
2681 :
2682 2 : let m = match &mut *locked {
2683 0 : TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
2684 0 : TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
2685 2 : TenantsMap::Open(m) => m,
2686 2 : };
2687 2 :
2688 2 : use std::collections::btree_map::Entry;
2689 2 :
2690 2 : let entry = m.entry(*tenant_shard_id);
2691 2 :
2692 2 : match entry {
2693 0 : Entry::Vacant(v) => match mode {
2694 : MustExist => {
2695 0 : tracing::debug!("Vacant && MustExist: return NotFound");
2696 0 : Err(TenantSlotError::NotFound(*tenant_shard_id))
2697 : }
2698 : _ => {
2699 0 : let (completion, barrier) = utils::completion::channel();
2700 0 : let inserting = TenantSlot::InProgress(barrier);
2701 0 : METRICS.slot_inserted(&inserting);
2702 0 : v.insert(inserting);
2703 0 : tracing::debug!("Vacant, inserted InProgress");
2704 0 : Ok(SlotGuard::new(*tenant_shard_id, None, completion))
2705 : }
2706 : },
2707 2 : Entry::Occupied(mut o) => {
2708 2 : // Apply mode-driven checks
2709 2 : match (o.get(), mode) {
2710 : (TenantSlot::InProgress(_), _) => {
2711 0 : tracing::debug!("Occupied, failing for InProgress");
2712 0 : Err(TenantSlotError::InProgress)
2713 : }
2714 0 : (slot, MustNotExist) => match slot {
2715 0 : TenantSlot::Attached(tenant) => {
2716 0 : tracing::debug!("Attached && MustNotExist, return AlreadyExists");
2717 0 : Err(TenantSlotError::AlreadyExists(
2718 0 : *tenant_shard_id,
2719 0 : tenant.current_state(),
2720 0 : ))
2721 : }
2722 : _ => {
2723 : // FIXME: the AlreadyExists error assumes that we have a Tenant
2724 : // to get the state from
2725 0 : tracing::debug!("Occupied & MustNotExist, return AlreadyExists");
2726 0 : Err(TenantSlotError::AlreadyExists(
2727 0 : *tenant_shard_id,
2728 0 : TenantState::Broken {
2729 0 : reason: "Present but not attached".to_string(),
2730 0 : backtrace: "".to_string(),
2731 0 : },
2732 0 : ))
2733 : }
2734 : },
2735 : _ => {
2736 : // Happy case: the slot was not in any state that violated our mode
2737 2 : let (completion, barrier) = utils::completion::channel();
2738 2 : let in_progress = TenantSlot::InProgress(barrier);
2739 2 : METRICS.slot_inserted(&in_progress);
2740 2 : let old_value = o.insert(in_progress);
2741 2 : METRICS.slot_removed(&old_value);
2742 2 : tracing::debug!("Occupied, replaced with InProgress");
2743 2 : Ok(SlotGuard::new(
2744 2 : *tenant_shard_id,
2745 2 : Some(old_value),
2746 2 : completion,
2747 2 : ))
2748 : }
2749 : }
2750 : }
2751 : }
2752 2 : }
2753 :
2754 : /// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
2755 : /// Allows to remove other tenant resources manually, via `tenant_cleanup`.
2756 : /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
2757 : /// operation would be needed to remove it.
2758 2 : async fn remove_tenant_from_memory<V, F>(
2759 2 : tenants: &std::sync::RwLock<TenantsMap>,
2760 2 : tenant_shard_id: TenantShardId,
2761 2 : tenant_cleanup: F,
2762 2 : ) -> Result<V, TenantStateError>
2763 2 : where
2764 2 : F: std::future::Future<Output = anyhow::Result<V>>,
2765 2 : {
2766 2 : let mut slot_guard =
2767 2 : tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
2768 :
2769 : // allow pageserver shutdown to await for our completion
2770 2 : let (_guard, progress) = completion::channel();
2771 :
2772 : // The SlotGuard allows us to manipulate the Tenant object without fear of some
2773 : // concurrent API request doing something else for the same tenant ID.
2774 2 : let attached_tenant = match slot_guard.get_old_value() {
2775 2 : Some(TenantSlot::Attached(tenant)) => {
2776 2 : // whenever we remove a tenant from memory, we don't want to flush and wait for upload
2777 2 : let shutdown_mode = ShutdownMode::Hard;
2778 2 :
2779 2 : // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
2780 2 : // that we can continue safely to cleanup.
2781 2 : match tenant.shutdown(progress, shutdown_mode).await {
2782 2 : Ok(()) => {}
2783 0 : Err(_other) => {
2784 0 : // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
2785 0 : // wait for it but return an error right away because these are distinct requests.
2786 0 : slot_guard.revert();
2787 0 : return Err(TenantStateError::IsStopping(tenant_shard_id));
2788 : }
2789 : }
2790 2 : Some(tenant)
2791 : }
2792 0 : Some(TenantSlot::Secondary(secondary_state)) => {
2793 0 : tracing::info!("Shutting down in secondary mode");
2794 0 : secondary_state.shutdown().await;
2795 0 : None
2796 : }
2797 : Some(TenantSlot::InProgress(_)) => {
2798 : // Acquiring a slot guarantees its old value was not InProgress
2799 0 : unreachable!();
2800 : }
2801 0 : None => None,
2802 : };
2803 :
2804 2 : match tenant_cleanup
2805 2 : .await
2806 2 : .with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
2807 : {
2808 2 : Ok(hook_value) => {
2809 2 : // Success: drop the old TenantSlot::Attached.
2810 2 : slot_guard
2811 2 : .drop_old_value()
2812 2 : .expect("We just called shutdown");
2813 2 :
2814 2 : Ok(hook_value)
2815 : }
2816 0 : Err(e) => {
2817 : // If we had a Tenant, set it to Broken and put it back in the TenantsMap
2818 0 : if let Some(attached_tenant) = attached_tenant {
2819 0 : attached_tenant.set_broken(e.to_string()).await;
2820 0 : }
2821 : // Leave the broken tenant in the map
2822 0 : slot_guard.revert();
2823 0 :
2824 0 : Err(TenantStateError::Other(e))
2825 : }
2826 : }
2827 2 : }
2828 :
2829 : use {
2830 : crate::repository::GcResult, pageserver_api::models::TimelineGcRequest,
2831 : utils::http::error::ApiError,
2832 : };
2833 :
2834 0 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))]
2835 : pub(crate) async fn immediate_gc(
2836 : tenant_shard_id: TenantShardId,
2837 : timeline_id: TimelineId,
2838 : gc_req: TimelineGcRequest,
2839 : cancel: CancellationToken,
2840 : ctx: &RequestContext,
2841 : ) -> Result<GcResult, ApiError> {
2842 : let tenant = {
2843 : let guard = TENANTS.read().unwrap();
2844 : guard
2845 : .get(&tenant_shard_id)
2846 : .cloned()
2847 0 : .with_context(|| format!("tenant {tenant_shard_id}"))
2848 0 : .map_err(|e| ApiError::NotFound(e.into()))?
2849 : };
2850 :
2851 0 : let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
2852 : // Use tenant's pitr setting
2853 : let pitr = tenant.get_pitr_interval();
2854 :
2855 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
2856 :
2857 : // Run in task_mgr to avoid race with tenant_detach operation
2858 : let ctx: RequestContext =
2859 : ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
2860 :
2861 0 : let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
2862 :
2863 : fail::fail_point!("immediate_gc_task_pre");
2864 :
2865 : #[allow(unused_mut)]
2866 : let mut result = tenant
2867 : .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
2868 : .await;
2869 : // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
2870 : // better once the types support it.
2871 :
2872 : #[cfg(feature = "testing")]
2873 : {
2874 : // we need to synchronize with drop completion for python tests without polling for
2875 : // log messages
2876 : if let Ok(result) = result.as_mut() {
2877 : let mut js = tokio::task::JoinSet::new();
2878 : for layer in std::mem::take(&mut result.doomed_layers) {
2879 : js.spawn(layer.wait_drop());
2880 : }
2881 : tracing::info!(
2882 : total = js.len(),
2883 : "starting to wait for the gc'd layers to be dropped"
2884 : );
2885 : while let Some(res) = js.join_next().await {
2886 : res.expect("wait_drop should not panic");
2887 : }
2888 : }
2889 :
2890 : let timeline = tenant.get_timeline(timeline_id, false).ok();
2891 0 : let rtc = timeline.as_ref().map(|x| &x.remote_client);
2892 :
2893 : if let Some(rtc) = rtc {
2894 : // layer drops schedule actions on remote timeline client to actually do the
2895 : // deletions; don't care about the shutdown error, just exit fast
2896 : drop(rtc.wait_completion().await);
2897 : }
2898 : }
2899 :
2900 0 : result.map_err(|e| match e {
2901 0 : GcError::TenantCancelled | GcError::TimelineCancelled => ApiError::ShuttingDown,
2902 : GcError::TimelineNotFound => {
2903 0 : ApiError::NotFound(anyhow::anyhow!("Timeline not found").into())
2904 : }
2905 0 : other => ApiError::InternalServerError(anyhow::anyhow!(other)),
2906 0 : })
2907 : }
2908 :
2909 : #[cfg(test)]
2910 : mod tests {
2911 : use std::collections::BTreeMap;
2912 : use std::sync::Arc;
2913 : use tracing::Instrument;
2914 :
2915 : use crate::tenant::mgr::TenantSlot;
2916 :
2917 : use super::{super::harness::TenantHarness, TenantsMap};
2918 :
2919 : #[tokio::test(start_paused = true)]
2920 2 : async fn shutdown_awaits_in_progress_tenant() {
2921 2 : // Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully
2922 2 : // wait for it to complete before proceeding.
2923 2 :
2924 2 : let h = TenantHarness::create("shutdown_awaits_in_progress_tenant").unwrap();
2925 8 : let (t, _ctx) = h.load().await;
2926 2 :
2927 2 : // harness loads it to active, which is forced and nothing is running on the tenant
2928 2 :
2929 2 : let id = t.tenant_shard_id();
2930 2 :
2931 2 : // tenant harness configures the logging and we cannot escape it
2932 2 : let span = h.span();
2933 2 : let _e = span.enter();
2934 2 :
2935 2 : let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
2936 2 : let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
2937 2 :
2938 2 : // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
2939 2 : // permit it to proceed: that will stick the tenant in InProgress
2940 2 :
2941 2 : let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
2942 2 : let (until_cleanup_started, cleanup_started) = utils::completion::channel();
2943 2 : let mut remove_tenant_from_memory_task = {
2944 2 : let jh = tokio::spawn({
2945 2 : let tenants = tenants.clone();
2946 2 : async move {
2947 2 : let cleanup = async move {
2948 2 : drop(until_cleanup_started);
2949 2 : can_complete_cleanup.wait().await;
2950 2 : anyhow::Ok(())
2951 2 : };
2952 2 : super::remove_tenant_from_memory(&tenants, id, cleanup).await
2953 2 : }
2954 2 : .instrument(h.span())
2955 2 : });
2956 2 :
2957 2 : // now the long cleanup should be in place, with the stopping state
2958 2 : cleanup_started.wait().await;
2959 2 : jh
2960 2 : };
2961 2 :
2962 2 : let mut shutdown_task = {
2963 2 : let (until_shutdown_started, shutdown_started) = utils::completion::channel();
2964 2 :
2965 2 : let shutdown_task = tokio::spawn(async move {
2966 2 : drop(until_shutdown_started);
2967 4 : super::shutdown_all_tenants0(&tenants).await;
2968 2 : });
2969 2 :
2970 2 : shutdown_started.wait().await;
2971 2 : shutdown_task
2972 2 : };
2973 2 :
2974 2 : let long_time = std::time::Duration::from_secs(15);
2975 2 : tokio::select! {
2976 2 : _ = &mut shutdown_task => unreachable!("shutdown should block on remove_tenant_from_memory completing"),
2977 2 : _ = &mut remove_tenant_from_memory_task => unreachable!("remove_tenant_from_memory_task should not complete until explicitly unblocked"),
2978 2 : _ = tokio::time::sleep(long_time) => {},
2979 2 : }
2980 2 :
2981 2 : drop(until_cleanup_completed);
2982 2 :
2983 2 : // Now that we allow it to proceed, shutdown should complete immediately
2984 2 : remove_tenant_from_memory_task.await.unwrap().unwrap();
2985 2 : shutdown_task.await.unwrap();
2986 2 : }
2987 : }
|