Line data Source code
1 : //! This module acts as a switchboard to access different repositories managed by this
2 : //! page server.
3 :
4 : use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
5 : use futures::StreamExt;
6 : use itertools::Itertools;
7 : use pageserver_api::key::Key;
8 : use pageserver_api::models::LocationConfigMode;
9 : use pageserver_api::shard::{
10 : ShardCount, ShardIdentity, ShardIndex, ShardNumber, ShardStripeSize, TenantShardId,
11 : };
12 : use pageserver_api::upcall_api::ReAttachResponseTenant;
13 : use rand::{distributions::Alphanumeric, Rng};
14 : use std::borrow::Cow;
15 : use std::cmp::Ordering;
16 : use std::collections::{BTreeMap, HashMap};
17 : use std::ops::Deref;
18 : use std::sync::Arc;
19 : use std::time::Duration;
20 : use sysinfo::SystemExt;
21 : use tokio::fs;
22 :
23 : use anyhow::Context;
24 : use once_cell::sync::Lazy;
25 : use tokio::task::JoinSet;
26 : use tokio_util::sync::CancellationToken;
27 : use tracing::*;
28 :
29 : use utils::{backoff, completion, crashsafe};
30 :
31 : use crate::config::PageServerConf;
32 : use crate::context::{DownloadBehavior, RequestContext};
33 : use crate::control_plane_client::{
34 : ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError,
35 : };
36 : use crate::deletion_queue::DeletionQueueClient;
37 : use crate::http::routes::ACTIVE_TENANT_TIMEOUT;
38 : use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
39 : use crate::task_mgr::{self, TaskKind};
40 : use crate::tenant::config::{
41 : AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, SecondaryLocationConfig,
42 : };
43 : use crate::tenant::span::debug_assert_current_span_has_tenant_id;
44 : use crate::tenant::storage_layer::inmemory_layer;
45 : use crate::tenant::timeline::ShutdownMode;
46 : use crate::tenant::{AttachedTenantConf, GcError, LoadConfigError, SpawnMode, Tenant, TenantState};
47 : use crate::virtual_file::MaybeFatalIo;
48 : use crate::{InitializationOrder, TEMP_FILE_SUFFIX};
49 :
50 : use utils::crashsafe::path_with_suffix_extension;
51 : use utils::fs_ext::PathExt;
52 : use utils::generation::Generation;
53 : use utils::id::{TenantId, TimelineId};
54 :
55 : use super::remote_timeline_client::remote_tenant_path;
56 : use super::secondary::SecondaryTenant;
57 : use super::timeline::detach_ancestor::PreparedTimelineDetach;
58 : use super::TenantSharedResources;
59 :
60 : /// For a tenant that appears in TenantsMap, it may either be
61 : /// - `Attached`: has a full Tenant object, is elegible to service
62 : /// reads and ingest WAL.
63 : /// - `Secondary`: is only keeping a local cache warm.
64 : ///
65 : /// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
66 : /// that way we avoid having to carefully switch a tenant's ingestion etc on and off during
67 : /// its lifetime, and we can preserve some important safety invariants like `Tenant` always
68 : /// having a properly acquired generation (Secondary doesn't need a generation)
69 : #[derive(Clone)]
70 : pub(crate) enum TenantSlot {
71 : Attached(Arc<Tenant>),
72 : Secondary(Arc<SecondaryTenant>),
73 : /// In this state, other administrative operations acting on the TenantId should
74 : /// block, or return a retry indicator equivalent to HTTP 503.
75 : InProgress(utils::completion::Barrier),
76 : }
77 :
78 : impl std::fmt::Debug for TenantSlot {
79 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 0 : match self {
81 0 : Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()),
82 0 : Self::Secondary(_) => write!(f, "Secondary"),
83 0 : Self::InProgress(_) => write!(f, "InProgress"),
84 : }
85 0 : }
86 : }
87 :
88 : impl TenantSlot {
89 : /// Return the `Tenant` in this slot if attached, else None
90 0 : fn get_attached(&self) -> Option<&Arc<Tenant>> {
91 0 : match self {
92 0 : Self::Attached(t) => Some(t),
93 0 : Self::Secondary(_) => None,
94 0 : Self::InProgress(_) => None,
95 : }
96 0 : }
97 : }
98 :
99 : /// The tenants known to the pageserver.
100 : /// The enum variants are used to distinguish the different states that the pageserver can be in.
101 : pub(crate) enum TenantsMap {
102 : /// [`init_tenant_mgr`] is not done yet.
103 : Initializing,
104 : /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
105 : /// New tenants can be added using [`tenant_map_acquire_slot`].
106 : Open(BTreeMap<TenantShardId, TenantSlot>),
107 : /// The pageserver has entered shutdown mode via [`TenantManager::shutdown`].
108 : /// Existing tenants are still accessible, but no new tenants can be created.
109 : ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
110 : }
111 :
112 : /// When resolving a TenantId to a shard, we may be looking for the 0th
113 : /// shard, or we might be looking for whichever shard holds a particular page.
114 : #[derive(Copy, Clone)]
115 : pub(crate) enum ShardSelector {
116 : /// Only return the 0th shard, if it is present. If a non-0th shard is present,
117 : /// ignore it.
118 : Zero,
119 : /// Pick the first shard we find for the TenantId
120 : First,
121 : /// Pick the shard that holds this key
122 : Page(Key),
123 : /// The shard ID is known: pick the given shard
124 : Known(ShardIndex),
125 : }
126 :
127 : /// A convenience for use with the re_attach ControlPlaneClient function: rather
128 : /// than the serializable struct, we build this enum that encapsulates
129 : /// the invariant that attached tenants always have generations.
130 : ///
131 : /// This represents the subset of a LocationConfig that we receive during re-attach.
132 : pub(crate) enum TenantStartupMode {
133 : Attached((AttachmentMode, Generation)),
134 : Secondary,
135 : }
136 :
137 : impl TenantStartupMode {
138 : /// Return the generation & mode that should be used when starting
139 : /// this tenant.
140 : ///
141 : /// If this returns None, the re-attach struct is in an invalid state and
142 : /// should be ignored in the response.
143 0 : fn from_reattach_tenant(rart: ReAttachResponseTenant) -> Option<Self> {
144 0 : match (rart.mode, rart.gen) {
145 0 : (LocationConfigMode::Detached, _) => None,
146 0 : (LocationConfigMode::Secondary, _) => Some(Self::Secondary),
147 0 : (LocationConfigMode::AttachedMulti, Some(g)) => {
148 0 : Some(Self::Attached((AttachmentMode::Multi, Generation::new(g))))
149 : }
150 0 : (LocationConfigMode::AttachedSingle, Some(g)) => {
151 0 : Some(Self::Attached((AttachmentMode::Single, Generation::new(g))))
152 : }
153 0 : (LocationConfigMode::AttachedStale, Some(g)) => {
154 0 : Some(Self::Attached((AttachmentMode::Stale, Generation::new(g))))
155 : }
156 : _ => {
157 0 : tracing::warn!(
158 0 : "Received invalid re-attach state for tenant {}: {rart:?}",
159 : rart.id
160 : );
161 0 : None
162 : }
163 : }
164 0 : }
165 : }
166 :
167 : /// Result type for looking up a TenantId to a specific shard
168 : pub(crate) enum ShardResolveResult {
169 : NotFound,
170 : Found(Arc<Tenant>),
171 : // Wait for this barrrier, then query again
172 : InProgress(utils::completion::Barrier),
173 : }
174 :
175 : impl TenantsMap {
176 : /// Convenience function for typical usage, where we want to get a `Tenant` object, for
177 : /// working with attached tenants. If the TenantId is in the map but in Secondary state,
178 : /// None is returned.
179 0 : pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc<Tenant>> {
180 0 : match self {
181 0 : TenantsMap::Initializing => None,
182 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
183 0 : m.get(tenant_shard_id).and_then(|slot| slot.get_attached())
184 : }
185 : }
186 0 : }
187 :
188 : #[cfg(all(debug_assertions, not(test)))]
189 0 : pub(crate) fn len(&self) -> usize {
190 0 : match self {
191 0 : TenantsMap::Initializing => 0,
192 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.len(),
193 : }
194 0 : }
195 : }
196 :
197 : /// Precursor to deletion of a tenant dir: we do a fast rename to a tmp path, and then
198 : /// the slower actual deletion in the background.
199 : ///
200 : /// This is "safe" in that that it won't leave behind a partially deleted directory
201 : /// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting
202 : /// the contents.
203 : ///
204 : /// This is pageserver-specific, as it relies on future processes after a crash to check
205 : /// for TEMP_FILE_SUFFIX when loading things.
206 0 : async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<Utf8PathBuf> {
207 0 : let parent = path
208 0 : .as_ref()
209 0 : .parent()
210 0 : // It is invalid to call this function with a relative path. Tenant directories
211 0 : // should always have a parent.
212 0 : .ok_or(std::io::Error::new(
213 0 : std::io::ErrorKind::InvalidInput,
214 0 : "Path must be absolute",
215 0 : ))?;
216 0 : let rand_suffix = rand::thread_rng()
217 0 : .sample_iter(&Alphanumeric)
218 0 : .take(8)
219 0 : .map(char::from)
220 0 : .collect::<String>()
221 0 : + TEMP_FILE_SUFFIX;
222 0 : let tmp_path = path_with_suffix_extension(&path, &rand_suffix);
223 0 : fs::rename(path.as_ref(), &tmp_path).await?;
224 0 : fs::File::open(parent).await?.sync_all().await?;
225 0 : Ok(tmp_path)
226 0 : }
227 :
228 : /// When we have moved a tenant's content to a temporary directory, we may delete it lazily in
229 : /// the background, and thereby avoid blocking any API requests on this deletion completing.
230 0 : fn spawn_background_purge(tmp_path: Utf8PathBuf) {
231 0 : // Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
232 0 : // After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
233 0 : let task_tenant_id = None;
234 0 :
235 0 : task_mgr::spawn(
236 0 : task_mgr::BACKGROUND_RUNTIME.handle(),
237 0 : TaskKind::MgmtRequest,
238 0 : task_tenant_id,
239 0 : None,
240 0 : "tenant_files_delete",
241 0 : false,
242 0 : async move {
243 0 : fs::remove_dir_all(tmp_path.as_path())
244 0 : .await
245 0 : .with_context(|| format!("tenant directory {:?} deletion", tmp_path))
246 0 : },
247 0 : );
248 0 : }
249 :
250 : static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
251 2 : Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
252 :
253 : /// The TenantManager is responsible for storing and mutating the collection of all tenants
254 : /// that this pageserver process has state for. Every Tenant and SecondaryTenant instance
255 : /// lives inside the TenantManager.
256 : ///
257 : /// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
258 : /// the same tenant twice concurrently, or trying to configure the same tenant into secondary
259 : /// and attached modes concurrently.
260 : pub struct TenantManager {
261 : conf: &'static PageServerConf,
262 : // TODO: currently this is a &'static pointing to TENANTs. When we finish refactoring
263 : // out of that static variable, the TenantManager can own this.
264 : // See https://github.com/neondatabase/neon/issues/5796
265 : tenants: &'static std::sync::RwLock<TenantsMap>,
266 : resources: TenantSharedResources,
267 :
268 : // Long-running operations that happen outside of a [`Tenant`] lifetime should respect this token.
269 : // This is for edge cases like tenant deletion. In normal cases (within a Tenant lifetime),
270 : // tenants have their own cancellation tokens, which we fire individually in [`Self::shutdown`], or
271 : // when the tenant detaches.
272 : cancel: CancellationToken,
273 : }
274 :
275 0 : fn emergency_generations(
276 0 : tenant_confs: &HashMap<TenantShardId, Result<LocationConf, LoadConfigError>>,
277 0 : ) -> HashMap<TenantShardId, TenantStartupMode> {
278 0 : tenant_confs
279 0 : .iter()
280 0 : .filter_map(|(tid, lc)| {
281 0 : let lc = match lc {
282 0 : Ok(lc) => lc,
283 0 : Err(_) => return None,
284 : };
285 : Some((
286 0 : *tid,
287 0 : match &lc.mode {
288 0 : LocationMode::Attached(alc) => {
289 0 : TenantStartupMode::Attached((alc.attach_mode, alc.generation))
290 : }
291 0 : LocationMode::Secondary(_) => TenantStartupMode::Secondary,
292 : },
293 : ))
294 0 : })
295 0 : .collect()
296 0 : }
297 :
298 0 : async fn init_load_generations(
299 0 : conf: &'static PageServerConf,
300 0 : tenant_confs: &HashMap<TenantShardId, Result<LocationConf, LoadConfigError>>,
301 0 : resources: &TenantSharedResources,
302 0 : cancel: &CancellationToken,
303 0 : ) -> anyhow::Result<Option<HashMap<TenantShardId, TenantStartupMode>>> {
304 0 : let generations = if conf.control_plane_emergency_mode {
305 0 : error!(
306 0 : "Emergency mode! Tenants will be attached unsafely using their last known generation"
307 : );
308 0 : emergency_generations(tenant_confs)
309 0 : } else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
310 0 : info!("Calling control plane API to re-attach tenants");
311 : // If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
312 0 : match client.re_attach(conf).await {
313 0 : Ok(tenants) => tenants
314 0 : .into_iter()
315 0 : .flat_map(|(id, rart)| {
316 0 : TenantStartupMode::from_reattach_tenant(rart).map(|tsm| (id, tsm))
317 0 : })
318 0 : .collect(),
319 : Err(RetryForeverError::ShuttingDown) => {
320 0 : anyhow::bail!("Shut down while waiting for control plane re-attach response")
321 : }
322 : }
323 : } else {
324 0 : info!("Control plane API not configured, tenant generations are disabled");
325 0 : return Ok(None);
326 : };
327 :
328 : // The deletion queue needs to know about the startup attachment state to decide which (if any) stored
329 : // deletion list entries may still be valid. We provide that by pushing a recovery operation into
330 : // the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
331 : // are processed, even though we don't block on recovery completing here.
332 0 : let attached_tenants = generations
333 0 : .iter()
334 0 : .flat_map(|(id, start_mode)| {
335 0 : match start_mode {
336 0 : TenantStartupMode::Attached((_mode, generation)) => Some(generation),
337 0 : TenantStartupMode::Secondary => None,
338 : }
339 0 : .map(|gen| (*id, *gen))
340 0 : })
341 0 : .collect();
342 0 : resources.deletion_queue_client.recover(attached_tenants)?;
343 :
344 0 : Ok(Some(generations))
345 0 : }
346 :
347 : /// Given a directory discovered in the pageserver's tenants/ directory, attempt
348 : /// to load a tenant config from it.
349 : ///
350 : /// If we cleaned up something expected (like an empty dir or a temp dir), return None.
351 0 : fn load_tenant_config(
352 0 : conf: &'static PageServerConf,
353 0 : tenant_shard_id: TenantShardId,
354 0 : dentry: Utf8DirEntry,
355 0 : ) -> Option<Result<LocationConf, LoadConfigError>> {
356 0 : let tenant_dir_path = dentry.path().to_path_buf();
357 0 : if crate::is_temporary(&tenant_dir_path) {
358 0 : info!("Found temporary tenant directory, removing: {tenant_dir_path}");
359 : // No need to use safe_remove_tenant_dir_all because this is already
360 : // a temporary path
361 0 : std::fs::remove_dir_all(&tenant_dir_path).fatal_err("delete temporary tenant dir");
362 0 : return None;
363 0 : }
364 0 :
365 0 : // This case happens if we crash during attachment before writing a config into the dir
366 0 : let is_empty = tenant_dir_path
367 0 : .is_empty_dir()
368 0 : .fatal_err("Checking for empty tenant dir");
369 0 : if is_empty {
370 0 : info!("removing empty tenant directory {tenant_dir_path:?}");
371 0 : std::fs::remove_dir(&tenant_dir_path).fatal_err("delete empty tenant dir");
372 0 : return None;
373 0 : }
374 0 :
375 0 : Some(Tenant::load_tenant_config(conf, &tenant_shard_id))
376 0 : }
377 :
378 : /// Initial stage of load: walk the local tenants directory, clean up any temp files,
379 : /// and load configurations for the tenants we found.
380 : ///
381 : /// Do this in parallel, because we expect 10k+ tenants, so serial execution can take
382 : /// seconds even on reasonably fast drives.
383 0 : async fn init_load_tenant_configs(
384 0 : conf: &'static PageServerConf,
385 0 : ) -> HashMap<TenantShardId, Result<LocationConf, LoadConfigError>> {
386 0 : let tenants_dir = conf.tenants_path();
387 :
388 0 : let dentries = tokio::task::spawn_blocking(move || -> Vec<Utf8DirEntry> {
389 0 : let context = format!("read tenants dir {tenants_dir}");
390 0 : let dir_entries = tenants_dir.read_dir_utf8().fatal_err(&context);
391 0 :
392 0 : dir_entries
393 0 : .collect::<Result<Vec<_>, std::io::Error>>()
394 0 : .fatal_err(&context)
395 0 : })
396 0 : .await
397 0 : .expect("Config load task panicked");
398 0 :
399 0 : let mut configs = HashMap::new();
400 0 :
401 0 : let mut join_set = JoinSet::new();
402 0 : for dentry in dentries {
403 0 : let tenant_shard_id = match dentry.file_name().parse::<TenantShardId>() {
404 0 : Ok(id) => id,
405 : Err(_) => {
406 0 : warn!(
407 0 : "Invalid tenant path (garbage in our repo directory?): '{}'",
408 0 : dentry.file_name()
409 : );
410 0 : continue;
411 : }
412 : };
413 :
414 0 : join_set.spawn_blocking(move || {
415 0 : (
416 0 : tenant_shard_id,
417 0 : load_tenant_config(conf, tenant_shard_id, dentry),
418 0 : )
419 0 : });
420 : }
421 :
422 0 : while let Some(r) = join_set.join_next().await {
423 0 : let (tenant_shard_id, tenant_config) = r.expect("Panic in config load task");
424 0 : if let Some(tenant_config) = tenant_config {
425 0 : configs.insert(tenant_shard_id, tenant_config);
426 0 : }
427 : }
428 :
429 0 : configs
430 0 : }
431 :
432 0 : #[derive(Debug, thiserror::Error)]
433 : pub(crate) enum DeleteTenantError {
434 : #[error("Tenant map slot error {0}")]
435 : SlotError(#[from] TenantSlotError),
436 :
437 : #[error("Cancelled")]
438 : Cancelled,
439 :
440 : #[error(transparent)]
441 : Other(#[from] anyhow::Error),
442 : }
443 :
444 : /// Initialize repositories with locally available timelines.
445 : /// Timelines that are only partially available locally (remote storage has more data than this pageserver)
446 : /// are scheduled for download and added to the tenant once download is completed.
447 0 : #[instrument(skip_all)]
448 : pub async fn init_tenant_mgr(
449 : conf: &'static PageServerConf,
450 : resources: TenantSharedResources,
451 : init_order: InitializationOrder,
452 : cancel: CancellationToken,
453 : ) -> anyhow::Result<TenantManager> {
454 : let mut tenants = BTreeMap::new();
455 :
456 : let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
457 :
458 : // Initialize dynamic limits that depend on system resources
459 : let system_memory =
460 : sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory())
461 : .total_memory();
462 : let max_ephemeral_layer_bytes =
463 : conf.ephemeral_bytes_per_memory_kb as u64 * (system_memory / 1024);
464 : tracing::info!("Initialized ephemeral layer size limit to {max_ephemeral_layer_bytes}, for {system_memory} bytes of memory");
465 : inmemory_layer::GLOBAL_RESOURCES.max_dirty_bytes.store(
466 : max_ephemeral_layer_bytes,
467 : std::sync::atomic::Ordering::Relaxed,
468 : );
469 :
470 : // Scan local filesystem for attached tenants
471 : let tenant_configs = init_load_tenant_configs(conf).await;
472 :
473 : // Determine which tenants are to be secondary or attached, and in which generation
474 : let tenant_modes = init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
475 :
476 : tracing::info!(
477 : "Attaching {} tenants at startup, warming up {} at a time",
478 : tenant_configs.len(),
479 : conf.concurrent_tenant_warmup.initial_permits()
480 : );
481 : TENANT.startup_scheduled.inc_by(tenant_configs.len() as u64);
482 :
483 : // Accumulate futures for writing tenant configs, so that we can execute in parallel
484 : let mut config_write_futs = Vec::new();
485 :
486 : // Update the location configs according to the re-attach response and persist them to disk
487 : tracing::info!("Updating {} location configs", tenant_configs.len());
488 : for (tenant_shard_id, location_conf) in tenant_configs {
489 : let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
490 :
491 : let mut location_conf = match location_conf {
492 : Ok(l) => l,
493 : Err(e) => {
494 : // This should only happen in the case of a serialization bug or critical local I/O error: we cannot load this tenant
495 : error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to load tenant config, failed to {e:#}");
496 : continue;
497 : }
498 : };
499 :
500 : // FIXME: if we were attached, and get demoted to secondary on re-attach, we
501 : // don't have a place to get a config.
502 : // (https://github.com/neondatabase/neon/issues/5377)
503 : const DEFAULT_SECONDARY_CONF: SecondaryLocationConfig =
504 : SecondaryLocationConfig { warm: true };
505 :
506 : if let Some(tenant_modes) = &tenant_modes {
507 : // We have a generation map: treat it as the authority for whether
508 : // this tenant is really attached.
509 : match tenant_modes.get(&tenant_shard_id) {
510 : None => {
511 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Detaching tenant, control plane omitted it in re-attach response");
512 :
513 : match safe_rename_tenant_dir(&tenant_dir_path).await {
514 : Ok(tmp_path) => {
515 : spawn_background_purge(tmp_path);
516 : }
517 : Err(e) => {
518 : error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
519 : "Failed to move detached tenant directory '{tenant_dir_path}': {e:?}");
520 : }
521 : };
522 :
523 : // We deleted local content: move on to next tenant, don't try and spawn this one.
524 : continue;
525 : }
526 : Some(TenantStartupMode::Secondary) => {
527 : if !matches!(location_conf.mode, LocationMode::Secondary(_)) {
528 : location_conf.mode = LocationMode::Secondary(DEFAULT_SECONDARY_CONF);
529 : }
530 : }
531 : Some(TenantStartupMode::Attached((attach_mode, generation))) => {
532 : let old_gen_higher = match &location_conf.mode {
533 : LocationMode::Attached(AttachedLocationConfig {
534 : generation: old_generation,
535 : attach_mode: _attach_mode,
536 : }) => {
537 : if old_generation > generation {
538 : Some(old_generation)
539 : } else {
540 : None
541 : }
542 : }
543 : _ => None,
544 : };
545 : if let Some(old_generation) = old_gen_higher {
546 : tracing::error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
547 : "Control plane gave decreasing generation ({generation:?}) in re-attach response for tenant that was attached in generation {:?}, demoting to secondary",
548 : old_generation
549 : );
550 :
551 : // We cannot safely attach this tenant given a bogus generation number, but let's avoid throwing away
552 : // local disk content: demote to secondary rather than detaching.
553 : location_conf.mode = LocationMode::Secondary(DEFAULT_SECONDARY_CONF);
554 : } else {
555 : location_conf.attach_in_generation(*attach_mode, *generation);
556 : }
557 : }
558 : }
559 : } else {
560 : // Legacy mode: no generation information, any tenant present
561 : // on local disk may activate
562 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Starting tenant in legacy mode, no generation",);
563 : };
564 :
565 : // Presence of a generation number implies attachment: attach the tenant
566 : // if it wasn't already, and apply the generation number.
567 0 : config_write_futs.push(async move {
568 0 : let r = Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await;
569 0 : (tenant_shard_id, location_conf, r)
570 0 : });
571 : }
572 :
573 : // Execute config writes with concurrency, to avoid bottlenecking on local FS write latency
574 : tracing::info!(
575 : "Writing {} location config files...",
576 : config_write_futs.len()
577 : );
578 : let config_write_results = futures::stream::iter(config_write_futs)
579 : .buffer_unordered(16)
580 : .collect::<Vec<_>>()
581 : .await;
582 :
583 : tracing::info!(
584 : "Spawning {} tenant shard locations...",
585 : config_write_results.len()
586 : );
587 : // For those shards that have live configurations, construct `Tenant` or `SecondaryTenant` objects and start them running
588 : for (tenant_shard_id, location_conf, config_write_result) in config_write_results {
589 : // Writing a config to local disk is foundational to startup up tenants: panic if we can't.
590 : config_write_result.fatal_err("write tenant shard config file");
591 :
592 : let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
593 : let shard_identity = location_conf.shard;
594 : let slot = match location_conf.mode {
595 : LocationMode::Attached(attached_conf) => TenantSlot::Attached(tenant_spawn(
596 : conf,
597 : tenant_shard_id,
598 : &tenant_dir_path,
599 : resources.clone(),
600 : AttachedTenantConf::new(location_conf.tenant_conf, attached_conf),
601 : shard_identity,
602 : Some(init_order.clone()),
603 : SpawnMode::Lazy,
604 : &ctx,
605 : )),
606 : LocationMode::Secondary(secondary_conf) => {
607 : info!(
608 : tenant_id = %tenant_shard_id.tenant_id,
609 : shard_id = %tenant_shard_id.shard_slug(),
610 : "Starting secondary tenant"
611 : );
612 : TenantSlot::Secondary(SecondaryTenant::new(
613 : tenant_shard_id,
614 : shard_identity,
615 : location_conf.tenant_conf,
616 : &secondary_conf,
617 : ))
618 : }
619 : };
620 :
621 : METRICS.slot_inserted(&slot);
622 : tenants.insert(tenant_shard_id, slot);
623 : }
624 :
625 : info!("Processed {} local tenants at startup", tenants.len());
626 :
627 : let mut tenants_map = TENANTS.write().unwrap();
628 : assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
629 :
630 : *tenants_map = TenantsMap::Open(tenants);
631 :
632 : Ok(TenantManager {
633 : conf,
634 : tenants: &TENANTS,
635 : resources,
636 : cancel: CancellationToken::new(),
637 : })
638 : }
639 :
640 : /// Wrapper for Tenant::spawn that checks invariants before running
641 : #[allow(clippy::too_many_arguments)]
642 0 : fn tenant_spawn(
643 0 : conf: &'static PageServerConf,
644 0 : tenant_shard_id: TenantShardId,
645 0 : tenant_path: &Utf8Path,
646 0 : resources: TenantSharedResources,
647 0 : location_conf: AttachedTenantConf,
648 0 : shard_identity: ShardIdentity,
649 0 : init_order: Option<InitializationOrder>,
650 0 : mode: SpawnMode,
651 0 : ctx: &RequestContext,
652 0 : ) -> Arc<Tenant> {
653 0 : // All these conditions should have been satisfied by our caller: the tenant dir exists, is a well formed
654 0 : // path, and contains a configuration file. Assertions that do synchronous I/O are limited to debug mode
655 0 : // to avoid impacting prod runtime performance.
656 0 : assert!(!crate::is_temporary(tenant_path));
657 0 : debug_assert!(tenant_path.is_dir());
658 0 : debug_assert!(conf
659 0 : .tenant_location_config_path(&tenant_shard_id)
660 0 : .try_exists()
661 0 : .unwrap());
662 :
663 0 : Tenant::spawn(
664 0 : conf,
665 0 : tenant_shard_id,
666 0 : resources,
667 0 : location_conf,
668 0 : shard_identity,
669 0 : init_order,
670 0 : mode,
671 0 : ctx,
672 0 : )
673 0 : }
674 :
675 2 : async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
676 2 : let mut join_set = JoinSet::new();
677 0 :
678 0 : #[cfg(all(debug_assertions, not(test)))]
679 0 : {
680 0 : // Check that our metrics properly tracked the size of the tenants map. This is a convenient location to check,
681 0 : // as it happens implicitly at the end of tests etc.
682 0 : let m = tenants.read().unwrap();
683 0 : debug_assert_eq!(METRICS.slots_total(), m.len() as u64);
684 : }
685 :
686 : // Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
687 2 : let (total_in_progress, total_attached) = {
688 2 : let mut m = tenants.write().unwrap();
689 2 : match &mut *m {
690 : TenantsMap::Initializing => {
691 0 : *m = TenantsMap::ShuttingDown(BTreeMap::default());
692 0 : info!("tenants map is empty");
693 0 : return;
694 : }
695 2 : TenantsMap::Open(tenants) => {
696 2 : let mut shutdown_state = BTreeMap::new();
697 2 : let mut total_in_progress = 0;
698 2 : let mut total_attached = 0;
699 :
700 2 : for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
701 2 : match v {
702 0 : TenantSlot::Attached(t) => {
703 0 : shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
704 0 : join_set.spawn(
705 0 : async move {
706 0 : let res = {
707 0 : let (_guard, shutdown_progress) = completion::channel();
708 0 : t.shutdown(shutdown_progress, ShutdownMode::FreezeAndFlush).await
709 : };
710 :
711 0 : if let Err(other_progress) = res {
712 : // join the another shutdown in progress
713 0 : other_progress.wait().await;
714 0 : }
715 :
716 : // we cannot afford per tenant logging here, because if s3 is degraded, we are
717 : // going to log too many lines
718 0 : debug!("tenant successfully stopped");
719 0 : }
720 0 : .instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())),
721 : );
722 :
723 0 : total_attached += 1;
724 : }
725 0 : TenantSlot::Secondary(state) => {
726 0 : // We don't need to wait for this individually per-tenant: the
727 0 : // downloader task will be waited on eventually, this cancel
728 0 : // is just to encourage it to drop out if it is doing work
729 0 : // for this tenant right now.
730 0 : state.cancel.cancel();
731 0 :
732 0 : shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state));
733 0 : }
734 2 : TenantSlot::InProgress(notify) => {
735 2 : // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
736 2 : // wait for their notifications to fire in this function.
737 2 : join_set.spawn(async move {
738 2 : notify.wait().await;
739 2 : });
740 2 :
741 2 : total_in_progress += 1;
742 2 : }
743 : }
744 : }
745 2 : *m = TenantsMap::ShuttingDown(shutdown_state);
746 2 : (total_in_progress, total_attached)
747 : }
748 : TenantsMap::ShuttingDown(_) => {
749 0 : error!("already shutting down, this function isn't supposed to be called more than once");
750 0 : return;
751 : }
752 : }
753 : };
754 :
755 2 : let started_at = std::time::Instant::now();
756 2 :
757 2 : info!(
758 0 : "Waiting for {} InProgress tenants and {} Attached tenants to shut down",
759 : total_in_progress, total_attached
760 : );
761 :
762 2 : let total = join_set.len();
763 2 : let mut panicked = 0;
764 2 : let mut buffering = true;
765 2 : const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
766 2 : let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
767 :
768 6 : while !join_set.is_empty() {
769 : tokio::select! {
770 : Some(joined) = join_set.join_next() => {
771 : match joined {
772 : Ok(()) => {},
773 : Err(join_error) if join_error.is_cancelled() => {
774 : unreachable!("we are not cancelling any of the tasks");
775 : }
776 : Err(join_error) if join_error.is_panic() => {
777 : // cannot really do anything, as this panic is likely a bug
778 : panicked += 1;
779 : }
780 : Err(join_error) => {
781 : warn!("unknown kind of JoinError: {join_error}");
782 : }
783 : }
784 : if !buffering {
785 : // buffer so that every 500ms since the first update (or starting) we'll log
786 : // how far away we are; this is because we will get SIGKILL'd at 10s, and we
787 : // are not able to log *then*.
788 : buffering = true;
789 : buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
790 : }
791 : },
792 : _ = &mut buffered, if buffering => {
793 : buffering = false;
794 : info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
795 : }
796 : }
797 : }
798 :
799 2 : if panicked > 0 {
800 0 : warn!(
801 : panicked,
802 0 : total, "observed panicks while shutting down tenants"
803 : );
804 2 : }
805 :
806 : // caller will log how long we took
807 2 : }
808 :
809 0 : #[derive(thiserror::Error, Debug)]
810 : pub(crate) enum UpsertLocationError {
811 : #[error("Bad config request: {0}")]
812 : BadRequest(anyhow::Error),
813 :
814 : #[error("Cannot change config in this state: {0}")]
815 : Unavailable(#[from] TenantMapError),
816 :
817 : #[error("Tenant is already being modified")]
818 : InProgress,
819 :
820 : #[error("Failed to flush: {0}")]
821 : Flush(anyhow::Error),
822 :
823 : /// This error variant is for unexpected situations (soft assertions) where the system is in an unexpected state.
824 : #[error("Internal error: {0}")]
825 : InternalError(anyhow::Error),
826 : }
827 :
828 : impl TenantManager {
829 : /// Convenience function so that anyone with a TenantManager can get at the global configuration, without
830 : /// having to pass it around everywhere as a separate object.
831 0 : pub(crate) fn get_conf(&self) -> &'static PageServerConf {
832 0 : self.conf
833 0 : }
834 :
835 : /// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or currently
836 : /// undergoing a state change (i.e. slot is InProgress).
837 : ///
838 : /// The return Tenant is not guaranteed to be active: check its status after obtaing it, or
839 : /// use [`Tenant::wait_to_become_active`] before using it if you will do I/O on it.
840 0 : pub(crate) fn get_attached_tenant_shard(
841 0 : &self,
842 0 : tenant_shard_id: TenantShardId,
843 0 : ) -> Result<Arc<Tenant>, GetTenantError> {
844 0 : let locked = self.tenants.read().unwrap();
845 :
846 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
847 :
848 0 : match peek_slot {
849 0 : Some(TenantSlot::Attached(tenant)) => Ok(Arc::clone(tenant)),
850 0 : Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
851 : None | Some(TenantSlot::Secondary(_)) => {
852 0 : Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
853 : }
854 : }
855 0 : }
856 :
857 0 : pub(crate) fn get_secondary_tenant_shard(
858 0 : &self,
859 0 : tenant_shard_id: TenantShardId,
860 0 : ) -> Option<Arc<SecondaryTenant>> {
861 0 : let locked = self.tenants.read().unwrap();
862 0 :
863 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
864 0 : .ok()
865 0 : .flatten();
866 :
867 0 : match peek_slot {
868 0 : Some(TenantSlot::Secondary(s)) => Some(s.clone()),
869 0 : _ => None,
870 : }
871 0 : }
872 :
873 : /// Whether the `TenantManager` is responsible for the tenant shard
874 0 : pub(crate) fn manages_tenant_shard(&self, tenant_shard_id: TenantShardId) -> bool {
875 0 : let locked = self.tenants.read().unwrap();
876 0 :
877 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
878 0 : .ok()
879 0 : .flatten();
880 0 :
881 0 : peek_slot.is_some()
882 0 : }
883 :
884 0 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
885 : pub(crate) async fn upsert_location(
886 : &self,
887 : tenant_shard_id: TenantShardId,
888 : new_location_config: LocationConf,
889 : flush: Option<Duration>,
890 : mut spawn_mode: SpawnMode,
891 : ctx: &RequestContext,
892 : ) -> Result<Option<Arc<Tenant>>, UpsertLocationError> {
893 : debug_assert_current_span_has_tenant_id();
894 : info!("configuring tenant location to state {new_location_config:?}");
895 :
896 : enum FastPathModified {
897 : Attached(Arc<Tenant>),
898 : Secondary(Arc<SecondaryTenant>),
899 : }
900 :
901 : // Special case fast-path for updates to existing slots: if our upsert is only updating configuration,
902 : // then we do not need to set the slot to InProgress, we can just call into the
903 : // existng tenant.
904 : let fast_path_taken = {
905 : let locked = self.tenants.read().unwrap();
906 : let peek_slot =
907 : tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
908 : match (&new_location_config.mode, peek_slot) {
909 : (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
910 : match attach_conf.generation.cmp(&tenant.generation) {
911 : Ordering::Equal => {
912 : // A transition from Attached to Attached in the same generation, we may
913 : // take our fast path and just provide the updated configuration
914 : // to the tenant.
915 : tenant.set_new_location_config(
916 : AttachedTenantConf::try_from(new_location_config.clone())
917 : .map_err(UpsertLocationError::BadRequest)?,
918 : );
919 :
920 : Some(FastPathModified::Attached(tenant.clone()))
921 : }
922 : Ordering::Less => {
923 : return Err(UpsertLocationError::BadRequest(anyhow::anyhow!(
924 : "Generation {:?} is less than existing {:?}",
925 : attach_conf.generation,
926 : tenant.generation
927 : )));
928 : }
929 : Ordering::Greater => {
930 : // Generation advanced, fall through to general case of replacing `Tenant` object
931 : None
932 : }
933 : }
934 : }
935 : (
936 : LocationMode::Secondary(secondary_conf),
937 : Some(TenantSlot::Secondary(secondary_tenant)),
938 : ) => {
939 : secondary_tenant.set_config(secondary_conf);
940 : secondary_tenant.set_tenant_conf(&new_location_config.tenant_conf);
941 : Some(FastPathModified::Secondary(secondary_tenant.clone()))
942 : }
943 : _ => {
944 : // Not an Attached->Attached transition, fall through to general case
945 : None
946 : }
947 : }
948 : };
949 :
950 : // Fast-path continued: having dropped out of the self.tenants lock, do the async
951 : // phase of writing config and/or waiting for flush, before returning.
952 : match fast_path_taken {
953 : Some(FastPathModified::Attached(tenant)) => {
954 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
955 : .await
956 : .fatal_err("write tenant shard config");
957 :
958 : // Transition to AttachedStale means we may well hold a valid generation
959 : // still, and have been requested to go stale as part of a migration. If
960 : // the caller set `flush`, then flush to remote storage.
961 : if let LocationMode::Attached(AttachedLocationConfig {
962 : generation: _,
963 : attach_mode: AttachmentMode::Stale,
964 : }) = &new_location_config.mode
965 : {
966 : if let Some(flush_timeout) = flush {
967 : match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
968 : Ok(Err(e)) => {
969 : return Err(UpsertLocationError::Flush(e));
970 : }
971 : Ok(Ok(_)) => return Ok(Some(tenant)),
972 : Err(_) => {
973 : tracing::warn!(
974 : timeout_ms = flush_timeout.as_millis(),
975 : "Timed out waiting for flush to remote storage, proceeding anyway."
976 : )
977 : }
978 : }
979 : }
980 : }
981 :
982 : return Ok(Some(tenant));
983 : }
984 : Some(FastPathModified::Secondary(_secondary_tenant)) => {
985 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
986 : .await
987 : .fatal_err("write tenant shard config");
988 :
989 : return Ok(None);
990 : }
991 : None => {
992 : // Proceed with the general case procedure, where we will shutdown & remove any existing
993 : // slot contents and replace with a fresh one
994 : }
995 : };
996 :
997 : // General case for upserts to TenantsMap, excluding the case above: we will substitute an
998 : // InProgress value to the slot while we make whatever changes are required. The state for
999 : // the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
1000 : // the state is ill-defined while we're in transition. Transitions are async, but fast: we do
1001 : // not do significant I/O, and shutdowns should be prompt via cancellation tokens.
1002 : let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)
1003 0 : .map_err(|e| match e {
1004 : TenantSlotError::NotFound(_) => {
1005 0 : unreachable!("Called with mode Any")
1006 : }
1007 0 : TenantSlotError::InProgress => UpsertLocationError::InProgress,
1008 0 : TenantSlotError::MapState(s) => UpsertLocationError::Unavailable(s),
1009 0 : })?;
1010 :
1011 : match slot_guard.get_old_value() {
1012 : Some(TenantSlot::Attached(tenant)) => {
1013 : // The case where we keep a Tenant alive was covered above in the special case
1014 : // for Attached->Attached transitions in the same generation. By this point,
1015 : // if we see an attached tenant we know it will be discarded and should be
1016 : // shut down.
1017 : let (_guard, progress) = utils::completion::channel();
1018 :
1019 : match tenant.get_attach_mode() {
1020 : AttachmentMode::Single | AttachmentMode::Multi => {
1021 : // Before we leave our state as the presumed holder of the latest generation,
1022 : // flush any outstanding deletions to reduce the risk of leaking objects.
1023 : self.resources.deletion_queue_client.flush_advisory()
1024 : }
1025 : AttachmentMode::Stale => {
1026 : // If we're stale there's not point trying to flush deletions
1027 : }
1028 : };
1029 :
1030 : info!("Shutting down attached tenant");
1031 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1032 : Ok(()) => {}
1033 : Err(barrier) => {
1034 : info!("Shutdown already in progress, waiting for it to complete");
1035 : barrier.wait().await;
1036 : }
1037 : }
1038 : slot_guard.drop_old_value().expect("We just shut it down");
1039 :
1040 : // Edge case: if we were called with SpawnMode::Create, but a Tenant already existed, then
1041 : // the caller thinks they're creating but the tenant already existed. We must switch to
1042 : // Eager mode so that when starting this Tenant we properly probe remote storage for timelines,
1043 : // rather than assuming it to be empty.
1044 : spawn_mode = SpawnMode::Eager;
1045 : }
1046 : Some(TenantSlot::Secondary(state)) => {
1047 : info!("Shutting down secondary tenant");
1048 : state.shutdown().await;
1049 : }
1050 : Some(TenantSlot::InProgress(_)) => {
1051 : // This should never happen: acquire_slot should error out
1052 : // if the contents of a slot were InProgress.
1053 : return Err(UpsertLocationError::InternalError(anyhow::anyhow!(
1054 : "Acquired an InProgress slot, this is a bug."
1055 : )));
1056 : }
1057 : None => {
1058 : // Slot was vacant, nothing needs shutting down.
1059 : }
1060 : }
1061 :
1062 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1063 : let timelines_path = self.conf.timelines_path(&tenant_shard_id);
1064 :
1065 : // Directory structure is the same for attached and secondary modes:
1066 : // create it if it doesn't exist. Timeline load/creation expects the
1067 : // timelines/ subdir to already exist.
1068 : //
1069 : // Does not need to be fsync'd because local storage is just a cache.
1070 : tokio::fs::create_dir_all(&timelines_path)
1071 : .await
1072 : .fatal_err("create timelines/ dir");
1073 :
1074 : // Before activating either secondary or attached mode, persist the
1075 : // configuration, so that on restart we will re-attach (or re-start
1076 : // secondary) on the tenant.
1077 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
1078 : .await
1079 : .fatal_err("write tenant shard config");
1080 :
1081 : let new_slot = match &new_location_config.mode {
1082 : LocationMode::Secondary(secondary_config) => {
1083 : let shard_identity = new_location_config.shard;
1084 : TenantSlot::Secondary(SecondaryTenant::new(
1085 : tenant_shard_id,
1086 : shard_identity,
1087 : new_location_config.tenant_conf,
1088 : secondary_config,
1089 : ))
1090 : }
1091 : LocationMode::Attached(_attach_config) => {
1092 : let shard_identity = new_location_config.shard;
1093 :
1094 : // Testing hack: if we are configured with no control plane, then drop the generation
1095 : // from upserts. This enables creating generation-less tenants even though neon_local
1096 : // always uses generations when calling the location conf API.
1097 : let attached_conf = if cfg!(feature = "testing") {
1098 : let mut conf = AttachedTenantConf::try_from(new_location_config)
1099 : .map_err(UpsertLocationError::BadRequest)?;
1100 : if self.conf.control_plane_api.is_none() {
1101 : conf.location.generation = Generation::none();
1102 : }
1103 : conf
1104 : } else {
1105 : AttachedTenantConf::try_from(new_location_config)
1106 : .map_err(UpsertLocationError::BadRequest)?
1107 : };
1108 :
1109 : let tenant = tenant_spawn(
1110 : self.conf,
1111 : tenant_shard_id,
1112 : &tenant_path,
1113 : self.resources.clone(),
1114 : attached_conf,
1115 : shard_identity,
1116 : None,
1117 : spawn_mode,
1118 : ctx,
1119 : );
1120 :
1121 : TenantSlot::Attached(tenant)
1122 : }
1123 : };
1124 :
1125 : let attached_tenant = if let TenantSlot::Attached(tenant) = &new_slot {
1126 : Some(tenant.clone())
1127 : } else {
1128 : None
1129 : };
1130 :
1131 : match slot_guard.upsert(new_slot) {
1132 : Err(TenantSlotUpsertError::InternalError(e)) => {
1133 : Err(UpsertLocationError::InternalError(anyhow::anyhow!(e)))
1134 : }
1135 : Err(TenantSlotUpsertError::MapState(e)) => Err(UpsertLocationError::Unavailable(e)),
1136 : Err(TenantSlotUpsertError::ShuttingDown((new_slot, _completion))) => {
1137 : // If we just called tenant_spawn() on a new tenant, and can't insert it into our map, then
1138 : // we must not leak it: this would violate the invariant that after shutdown_all_tenants, all tenants
1139 : // are shutdown.
1140 : //
1141 : // We must shut it down inline here.
1142 : match new_slot {
1143 : TenantSlot::InProgress(_) => {
1144 : // Unreachable because we never insert an InProgress
1145 : unreachable!()
1146 : }
1147 : TenantSlot::Attached(tenant) => {
1148 : let (_guard, progress) = utils::completion::channel();
1149 : info!("Shutting down just-spawned tenant, because tenant manager is shut down");
1150 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1151 : Ok(()) => {
1152 : info!("Finished shutting down just-spawned tenant");
1153 : }
1154 : Err(barrier) => {
1155 : info!("Shutdown already in progress, waiting for it to complete");
1156 : barrier.wait().await;
1157 : }
1158 : }
1159 : }
1160 : TenantSlot::Secondary(secondary_tenant) => {
1161 : secondary_tenant.shutdown().await;
1162 : }
1163 : }
1164 :
1165 : Err(UpsertLocationError::Unavailable(
1166 : TenantMapError::ShuttingDown,
1167 : ))
1168 : }
1169 : Ok(()) => Ok(attached_tenant),
1170 : }
1171 : }
1172 :
1173 : /// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
1174 : /// LocationConf that was last used to attach it. Optionally, the local file cache may be
1175 : /// dropped before re-attaching.
1176 : ///
1177 : /// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
1178 : /// where an issue is identified that would go away with a restart of the tenant.
1179 : ///
1180 : /// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
1181 : /// to respect the cancellation tokens used in normal shutdown().
1182 0 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
1183 : pub(crate) async fn reset_tenant(
1184 : &self,
1185 : tenant_shard_id: TenantShardId,
1186 : drop_cache: bool,
1187 : ctx: &RequestContext,
1188 : ) -> anyhow::Result<()> {
1189 : let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1190 : let Some(old_slot) = slot_guard.get_old_value() else {
1191 : anyhow::bail!("Tenant not found when trying to reset");
1192 : };
1193 :
1194 : let Some(tenant) = old_slot.get_attached() else {
1195 : slot_guard.revert();
1196 : anyhow::bail!("Tenant is not in attached state");
1197 : };
1198 :
1199 : let (_guard, progress) = utils::completion::channel();
1200 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1201 : Ok(()) => {
1202 : slot_guard.drop_old_value()?;
1203 : }
1204 : Err(_barrier) => {
1205 : slot_guard.revert();
1206 : anyhow::bail!("Cannot reset Tenant, already shutting down");
1207 : }
1208 : }
1209 :
1210 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1211 : let timelines_path = self.conf.timelines_path(&tenant_shard_id);
1212 : let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
1213 :
1214 : if drop_cache {
1215 : tracing::info!("Dropping local file cache");
1216 :
1217 : match tokio::fs::read_dir(&timelines_path).await {
1218 : Err(e) => {
1219 : tracing::warn!("Failed to list timelines while dropping cache: {}", e);
1220 : }
1221 : Ok(mut entries) => {
1222 : while let Some(entry) = entries.next_entry().await? {
1223 : tokio::fs::remove_dir_all(entry.path()).await?;
1224 : }
1225 : }
1226 : }
1227 : }
1228 :
1229 : let shard_identity = config.shard;
1230 : let tenant = tenant_spawn(
1231 : self.conf,
1232 : tenant_shard_id,
1233 : &tenant_path,
1234 : self.resources.clone(),
1235 : AttachedTenantConf::try_from(config)?,
1236 : shard_identity,
1237 : None,
1238 : SpawnMode::Eager,
1239 : ctx,
1240 : );
1241 :
1242 : slot_guard.upsert(TenantSlot::Attached(tenant))?;
1243 :
1244 : Ok(())
1245 : }
1246 :
1247 0 : pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec<Arc<Tenant>> {
1248 0 : let locked = self.tenants.read().unwrap();
1249 0 : match &*locked {
1250 0 : TenantsMap::Initializing => Vec::new(),
1251 0 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => map
1252 0 : .values()
1253 0 : .filter_map(|slot| {
1254 0 : slot.get_attached()
1255 0 : .and_then(|t| if t.is_active() { Some(t.clone()) } else { None })
1256 0 : })
1257 0 : .collect(),
1258 : }
1259 0 : }
1260 : // Do some synchronous work for all tenant slots in Secondary state. The provided
1261 : // callback should be small and fast, as it will be called inside the global
1262 : // TenantsMap lock.
1263 0 : pub(crate) fn foreach_secondary_tenants<F>(&self, mut func: F)
1264 0 : where
1265 0 : // TODO: let the callback return a hint to drop out of the loop early
1266 0 : F: FnMut(&TenantShardId, &Arc<SecondaryTenant>),
1267 0 : {
1268 0 : let locked = self.tenants.read().unwrap();
1269 :
1270 0 : let map = match &*locked {
1271 0 : TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
1272 0 : TenantsMap::Open(m) => m,
1273 : };
1274 :
1275 0 : for (tenant_id, slot) in map {
1276 0 : if let TenantSlot::Secondary(state) = slot {
1277 : // Only expose secondary tenants that are not currently shutting down
1278 0 : if !state.cancel.is_cancelled() {
1279 0 : func(tenant_id, state)
1280 0 : }
1281 0 : }
1282 : }
1283 0 : }
1284 :
1285 : /// Total list of all tenant slots: this includes attached, secondary, and InProgress.
1286 0 : pub(crate) fn list(&self) -> Vec<(TenantShardId, TenantSlot)> {
1287 0 : let locked = self.tenants.read().unwrap();
1288 0 : match &*locked {
1289 0 : TenantsMap::Initializing => Vec::new(),
1290 0 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
1291 0 : map.iter().map(|(k, v)| (*k, v.clone())).collect()
1292 : }
1293 : }
1294 0 : }
1295 :
1296 0 : pub(crate) fn get(&self, tenant_shard_id: TenantShardId) -> Option<TenantSlot> {
1297 0 : let locked = self.tenants.read().unwrap();
1298 0 : match &*locked {
1299 0 : TenantsMap::Initializing => None,
1300 0 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
1301 0 : map.get(&tenant_shard_id).cloned()
1302 : }
1303 : }
1304 0 : }
1305 :
1306 0 : async fn delete_tenant_remote(
1307 0 : &self,
1308 0 : tenant_shard_id: TenantShardId,
1309 0 : ) -> Result<(), DeleteTenantError> {
1310 0 : let remote_path = remote_tenant_path(&tenant_shard_id);
1311 0 : let keys = match self
1312 0 : .resources
1313 0 : .remote_storage
1314 0 : .list(
1315 0 : Some(&remote_path),
1316 0 : remote_storage::ListingMode::NoDelimiter,
1317 0 : None,
1318 0 : &self.cancel,
1319 0 : )
1320 0 : .await
1321 : {
1322 0 : Ok(listing) => listing.keys,
1323 : Err(remote_storage::DownloadError::Cancelled) => {
1324 0 : return Err(DeleteTenantError::Cancelled)
1325 : }
1326 0 : Err(remote_storage::DownloadError::NotFound) => return Ok(()),
1327 0 : Err(other) => return Err(DeleteTenantError::Other(anyhow::anyhow!(other))),
1328 : };
1329 :
1330 0 : if keys.is_empty() {
1331 0 : tracing::info!("Remote storage already deleted");
1332 : } else {
1333 0 : tracing::info!("Deleting {} keys from remote storage", keys.len());
1334 0 : self.resources
1335 0 : .remote_storage
1336 0 : .delete_objects(&keys, &self.cancel)
1337 0 : .await?;
1338 : }
1339 :
1340 0 : Ok(())
1341 0 : }
1342 :
1343 : /// If a tenant is attached, detach it. Then remove its data from remote storage.
1344 : ///
1345 : /// A tenant is considered deleted once it is gone from remote storage. It is the caller's
1346 : /// responsibility to avoid trying to attach the tenant again or use it any way once deletion
1347 : /// has started: this operation is not atomic, and must be retried until it succeeds.
1348 0 : pub(crate) async fn delete_tenant(
1349 0 : &self,
1350 0 : tenant_shard_id: TenantShardId,
1351 0 : ) -> Result<(), DeleteTenantError> {
1352 0 : super::span::debug_assert_current_span_has_tenant_id();
1353 :
1354 0 : async fn delete_local(
1355 0 : conf: &PageServerConf,
1356 0 : tenant_shard_id: &TenantShardId,
1357 0 : ) -> anyhow::Result<()> {
1358 0 : let local_tenant_directory = conf.tenant_path(tenant_shard_id);
1359 0 : let tmp_dir = safe_rename_tenant_dir(&local_tenant_directory)
1360 0 : .await
1361 0 : .with_context(|| {
1362 0 : format!("local tenant directory {local_tenant_directory:?} rename")
1363 0 : })?;
1364 0 : spawn_background_purge(tmp_dir);
1365 0 : Ok(())
1366 0 : }
1367 :
1368 0 : let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1369 0 : match &slot_guard.old_value {
1370 0 : Some(TenantSlot::Attached(tenant)) => {
1371 0 : // Legacy deletion flow: the tenant remains attached, goes to Stopping state, and
1372 0 : // deletion will be resumed across restarts.
1373 0 : let tenant = tenant.clone();
1374 0 : let (_guard, progress) = utils::completion::channel();
1375 0 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1376 0 : Ok(()) => {}
1377 0 : Err(barrier) => {
1378 0 : info!("Shutdown already in progress, waiting for it to complete");
1379 0 : barrier.wait().await;
1380 : }
1381 : }
1382 0 : delete_local(self.conf, &tenant_shard_id).await?;
1383 : }
1384 0 : Some(TenantSlot::Secondary(secondary_tenant)) => {
1385 0 : secondary_tenant.shutdown().await;
1386 :
1387 0 : delete_local(self.conf, &tenant_shard_id).await?;
1388 : }
1389 0 : Some(TenantSlot::InProgress(_)) => unreachable!(),
1390 0 : None => {}
1391 : };
1392 :
1393 : // Fall through: local state for this tenant is no longer present, proceed with remote delete.
1394 : // - We use a retry wrapper here so that common transient S3 errors (e.g. 503, 429) do not result
1395 : // in 500 responses to delete requests.
1396 : // - We keep the `SlotGuard` during this I/O, so that if a concurrent delete request comes in, it will
1397 : // 503/retry, rather than kicking off a wasteful concurrent deletion.
1398 0 : match backoff::retry(
1399 0 : || async move { self.delete_tenant_remote(tenant_shard_id).await },
1400 0 : |e| match e {
1401 0 : DeleteTenantError::Cancelled => true,
1402 : DeleteTenantError::SlotError(_) => {
1403 0 : unreachable!("Remote deletion doesn't touch slots")
1404 : }
1405 0 : _ => false,
1406 0 : },
1407 0 : 1,
1408 0 : 3,
1409 0 : &format!("delete_tenant[tenant_shard_id={tenant_shard_id}]"),
1410 0 : &self.cancel,
1411 0 : )
1412 0 : .await
1413 : {
1414 0 : Some(r) => r,
1415 0 : None => Err(DeleteTenantError::Cancelled),
1416 : }
1417 0 : }
1418 :
1419 0 : #[instrument(skip_all, fields(tenant_id=%tenant.get_tenant_shard_id().tenant_id, shard_id=%tenant.get_tenant_shard_id().shard_slug(), new_shard_count=%new_shard_count.literal()))]
1420 : pub(crate) async fn shard_split(
1421 : &self,
1422 : tenant: Arc<Tenant>,
1423 : new_shard_count: ShardCount,
1424 : new_stripe_size: Option<ShardStripeSize>,
1425 : ctx: &RequestContext,
1426 : ) -> anyhow::Result<Vec<TenantShardId>> {
1427 : let tenant_shard_id = *tenant.get_tenant_shard_id();
1428 : let r = self
1429 : .do_shard_split(tenant, new_shard_count, new_stripe_size, ctx)
1430 : .await;
1431 : if r.is_err() {
1432 : // Shard splitting might have left the original shard in a partially shut down state (it
1433 : // stops the shard's remote timeline client). Reset it to ensure we leave things in
1434 : // a working state.
1435 : if self.get(tenant_shard_id).is_some() {
1436 : tracing::warn!("Resetting after shard split failure");
1437 : if let Err(e) = self.reset_tenant(tenant_shard_id, false, ctx).await {
1438 : // Log this error because our return value will still be the original error, not this one. This is
1439 : // a severe error: if this happens, we might be leaving behind a tenant that is not fully functional
1440 : // (e.g. has uploads disabled). We can't do anything else: if reset fails then shutting the tenant down or
1441 : // setting it broken probably won't help either.
1442 : tracing::error!("Failed to reset: {e}");
1443 : }
1444 : }
1445 : }
1446 :
1447 : r
1448 : }
1449 :
1450 0 : pub(crate) async fn do_shard_split(
1451 0 : &self,
1452 0 : tenant: Arc<Tenant>,
1453 0 : new_shard_count: ShardCount,
1454 0 : new_stripe_size: Option<ShardStripeSize>,
1455 0 : ctx: &RequestContext,
1456 0 : ) -> anyhow::Result<Vec<TenantShardId>> {
1457 0 : let tenant_shard_id = *tenant.get_tenant_shard_id();
1458 0 :
1459 0 : // Validate the incoming request
1460 0 : if new_shard_count.count() <= tenant_shard_id.shard_count.count() {
1461 0 : anyhow::bail!("Requested shard count is not an increase");
1462 0 : }
1463 0 : let expansion_factor = new_shard_count.count() / tenant_shard_id.shard_count.count();
1464 0 : if !expansion_factor.is_power_of_two() {
1465 0 : anyhow::bail!("Requested split is not a power of two");
1466 0 : }
1467 :
1468 0 : if let Some(new_stripe_size) = new_stripe_size {
1469 0 : if tenant.get_shard_stripe_size() != new_stripe_size
1470 0 : && tenant_shard_id.shard_count.count() > 1
1471 : {
1472 : // This tenant already has multiple shards, it is illegal to try and change its stripe size
1473 0 : anyhow::bail!(
1474 0 : "Shard stripe size may not be modified once tenant has multiple shards"
1475 0 : );
1476 0 : }
1477 0 : }
1478 :
1479 : // Plan: identify what the new child shards will be
1480 0 : let child_shards = tenant_shard_id.split(new_shard_count);
1481 0 : tracing::info!(
1482 0 : "Shard {} splits into: {}",
1483 0 : tenant_shard_id.to_index(),
1484 0 : child_shards
1485 0 : .iter()
1486 0 : .map(|id| format!("{}", id.to_index()))
1487 0 : .join(",")
1488 : );
1489 :
1490 0 : fail::fail_point!("shard-split-pre-prepare", |_| Err(anyhow::anyhow!(
1491 0 : "failpoint"
1492 0 : )));
1493 :
1494 0 : let parent_shard_identity = tenant.shard_identity;
1495 0 : let parent_tenant_conf = tenant.get_tenant_conf();
1496 0 : let parent_generation = tenant.generation;
1497 :
1498 : // Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
1499 0 : if let Err(e) = tenant.split_prepare(&child_shards).await {
1500 : // If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
1501 : // have been left in a partially-shut-down state.
1502 0 : tracing::warn!("Failed to prepare for split: {e}, reloading Tenant before returning");
1503 0 : return Err(e);
1504 0 : }
1505 0 :
1506 0 : fail::fail_point!("shard-split-post-prepare", |_| Err(anyhow::anyhow!(
1507 0 : "failpoint"
1508 0 : )));
1509 :
1510 0 : self.resources.deletion_queue_client.flush_advisory();
1511 0 :
1512 0 : // Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
1513 0 : drop(tenant);
1514 0 : let mut parent_slot_guard =
1515 0 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1516 0 : let parent = match parent_slot_guard.get_old_value() {
1517 0 : Some(TenantSlot::Attached(t)) => t,
1518 0 : Some(TenantSlot::Secondary(_)) => anyhow::bail!("Tenant location in secondary mode"),
1519 : Some(TenantSlot::InProgress(_)) => {
1520 : // tenant_map_acquire_slot never returns InProgress, if a slot was InProgress
1521 : // it would return an error.
1522 0 : unreachable!()
1523 : }
1524 : None => {
1525 : // We don't actually need the parent shard to still be attached to do our work, but it's
1526 : // a weird enough situation that the caller probably didn't want us to continue working
1527 : // if they had detached the tenant they requested the split on.
1528 0 : anyhow::bail!("Detached parent shard in the middle of split!")
1529 : }
1530 : };
1531 0 : fail::fail_point!("shard-split-pre-hardlink", |_| Err(anyhow::anyhow!(
1532 0 : "failpoint"
1533 0 : )));
1534 : // Optimization: hardlink layers from the parent into the children, so that they don't have to
1535 : // re-download & duplicate the data referenced in their initial IndexPart
1536 0 : self.shard_split_hardlink(parent, child_shards.clone())
1537 0 : .await?;
1538 0 : fail::fail_point!("shard-split-post-hardlink", |_| Err(anyhow::anyhow!(
1539 0 : "failpoint"
1540 0 : )));
1541 :
1542 : // Take a snapshot of where the parent's WAL ingest had got to: we will wait for
1543 : // child shards to reach this point.
1544 0 : let mut target_lsns = HashMap::new();
1545 0 : for timeline in parent.timelines.lock().unwrap().clone().values() {
1546 0 : target_lsns.insert(timeline.timeline_id, timeline.get_last_record_lsn());
1547 0 : }
1548 :
1549 : // TODO: we should have the parent shard stop its WAL ingest here, it's a waste of resources
1550 : // and could slow down the children trying to catch up.
1551 :
1552 : // Phase 3: Spawn the child shards
1553 0 : for child_shard in &child_shards {
1554 0 : let mut child_shard_identity = parent_shard_identity;
1555 0 : if let Some(new_stripe_size) = new_stripe_size {
1556 0 : child_shard_identity.stripe_size = new_stripe_size;
1557 0 : }
1558 0 : child_shard_identity.count = child_shard.shard_count;
1559 0 : child_shard_identity.number = child_shard.shard_number;
1560 0 :
1561 0 : let child_location_conf = LocationConf {
1562 0 : mode: LocationMode::Attached(AttachedLocationConfig {
1563 0 : generation: parent_generation,
1564 0 : attach_mode: AttachmentMode::Single,
1565 0 : }),
1566 0 : shard: child_shard_identity,
1567 0 : tenant_conf: parent_tenant_conf.clone(),
1568 0 : };
1569 0 :
1570 0 : self.upsert_location(
1571 0 : *child_shard,
1572 0 : child_location_conf,
1573 0 : None,
1574 0 : SpawnMode::Eager,
1575 0 : ctx,
1576 0 : )
1577 0 : .await?;
1578 : }
1579 :
1580 0 : fail::fail_point!("shard-split-post-child-conf", |_| Err(anyhow::anyhow!(
1581 0 : "failpoint"
1582 0 : )));
1583 :
1584 : // Phase 4: wait for child chards WAL ingest to catch up to target LSN
1585 0 : for child_shard_id in &child_shards {
1586 0 : let child_shard_id = *child_shard_id;
1587 0 : let child_shard = {
1588 0 : let locked = self.tenants.read().unwrap();
1589 0 : let peek_slot =
1590 0 : tenant_map_peek_slot(&locked, &child_shard_id, TenantSlotPeekMode::Read)?;
1591 0 : peek_slot.and_then(|s| s.get_attached()).cloned()
1592 : };
1593 0 : if let Some(t) = child_shard {
1594 : // Wait for the child shard to become active: this should be very quick because it only
1595 : // has to download the index_part that we just uploaded when creating it.
1596 0 : if let Err(e) = t.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await {
1597 : // This is not fatal: we have durably created the child shard. It just makes the
1598 : // split operation less seamless for clients, as we will may detach the parent
1599 : // shard before the child shards are fully ready to serve requests.
1600 0 : tracing::warn!("Failed to wait for shard {child_shard_id} to activate: {e}");
1601 0 : continue;
1602 0 : }
1603 0 :
1604 0 : let timelines = t.timelines.lock().unwrap().clone();
1605 0 : for timeline in timelines.values() {
1606 0 : let Some(target_lsn) = target_lsns.get(&timeline.timeline_id) else {
1607 0 : continue;
1608 : };
1609 :
1610 0 : tracing::info!(
1611 0 : "Waiting for child shard {}/{} to reach target lsn {}...",
1612 0 : child_shard_id,
1613 0 : timeline.timeline_id,
1614 : target_lsn
1615 : );
1616 :
1617 0 : fail::fail_point!("shard-split-lsn-wait", |_| Err(anyhow::anyhow!(
1618 0 : "failpoint"
1619 0 : )));
1620 0 : if let Err(e) = timeline
1621 0 : .wait_lsn(
1622 0 : *target_lsn,
1623 0 : crate::tenant::timeline::WaitLsnWaiter::Tenant,
1624 0 : ctx,
1625 0 : )
1626 0 : .await
1627 : {
1628 : // Failure here might mean shutdown, in any case this part is an optimization
1629 : // and we shouldn't hold up the split operation.
1630 0 : tracing::warn!(
1631 0 : "Failed to wait for timeline {} to reach lsn {target_lsn}: {e}",
1632 0 : timeline.timeline_id
1633 : );
1634 : } else {
1635 0 : tracing::info!(
1636 0 : "Child shard {}/{} reached target lsn {}",
1637 0 : child_shard_id,
1638 0 : timeline.timeline_id,
1639 : target_lsn
1640 : );
1641 : }
1642 : }
1643 0 : }
1644 : }
1645 :
1646 : // Phase 5: Shut down the parent shard, and erase it from disk
1647 0 : let (_guard, progress) = completion::channel();
1648 0 : match parent.shutdown(progress, ShutdownMode::Hard).await {
1649 0 : Ok(()) => {}
1650 0 : Err(other) => {
1651 0 : other.wait().await;
1652 : }
1653 : }
1654 0 : let local_tenant_directory = self.conf.tenant_path(&tenant_shard_id);
1655 0 : let tmp_path = safe_rename_tenant_dir(&local_tenant_directory)
1656 0 : .await
1657 0 : .with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))?;
1658 0 : spawn_background_purge(tmp_path);
1659 0 :
1660 0 : fail::fail_point!("shard-split-pre-finish", |_| Err(anyhow::anyhow!(
1661 0 : "failpoint"
1662 0 : )));
1663 :
1664 0 : parent_slot_guard.drop_old_value()?;
1665 :
1666 : // Phase 6: Release the InProgress on the parent shard
1667 0 : drop(parent_slot_guard);
1668 0 :
1669 0 : Ok(child_shards)
1670 0 : }
1671 :
1672 : /// Part of [`Self::shard_split`]: hard link parent shard layers into child shards, as an optimization
1673 : /// to avoid the children downloading them again.
1674 : ///
1675 : /// For each resident layer in the parent shard, we will hard link it into all of the child shards.
1676 0 : async fn shard_split_hardlink(
1677 0 : &self,
1678 0 : parent_shard: &Tenant,
1679 0 : child_shards: Vec<TenantShardId>,
1680 0 : ) -> anyhow::Result<()> {
1681 0 : debug_assert_current_span_has_tenant_id();
1682 0 :
1683 0 : let parent_path = self.conf.tenant_path(parent_shard.get_tenant_shard_id());
1684 0 : let (parent_timelines, parent_layers) = {
1685 0 : let mut parent_layers = Vec::new();
1686 0 : let timelines = parent_shard.timelines.lock().unwrap().clone();
1687 0 : let parent_timelines = timelines.keys().cloned().collect::<Vec<_>>();
1688 0 : for timeline in timelines.values() {
1689 0 : tracing::info!(timeline_id=%timeline.timeline_id, "Loading list of layers to hardlink");
1690 0 : let timeline_layers = timeline
1691 0 : .layers
1692 0 : .read()
1693 0 : .await
1694 0 : .likely_resident_layers()
1695 0 : .collect::<Vec<_>>();
1696 :
1697 0 : for layer in timeline_layers {
1698 0 : let relative_path = layer
1699 0 : .local_path()
1700 0 : .strip_prefix(&parent_path)
1701 0 : .context("Removing prefix from parent layer path")?;
1702 0 : parent_layers.push(relative_path.to_owned());
1703 : }
1704 : }
1705 0 : debug_assert!(
1706 0 : !parent_layers.is_empty(),
1707 0 : "shutdown cannot empty the layermap"
1708 : );
1709 0 : (parent_timelines, parent_layers)
1710 0 : };
1711 0 :
1712 0 : let mut child_prefixes = Vec::new();
1713 0 : let mut create_dirs = Vec::new();
1714 :
1715 0 : for child in child_shards {
1716 0 : let child_prefix = self.conf.tenant_path(&child);
1717 0 : create_dirs.push(child_prefix.clone());
1718 0 : create_dirs.extend(
1719 0 : parent_timelines
1720 0 : .iter()
1721 0 : .map(|t| self.conf.timeline_path(&child, t)),
1722 0 : );
1723 0 :
1724 0 : child_prefixes.push(child_prefix);
1725 0 : }
1726 :
1727 : // Since we will do a large number of small filesystem metadata operations, batch them into
1728 : // spawn_blocking calls rather than doing each one as a tokio::fs round-trip.
1729 0 : let span = tracing::Span::current();
1730 0 : let jh = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> {
1731 0 : // Run this synchronous code in the same log context as the outer function that spawned it.
1732 0 : let _span = span.enter();
1733 0 :
1734 0 : tracing::info!("Creating {} directories", create_dirs.len());
1735 0 : for dir in &create_dirs {
1736 0 : if let Err(e) = std::fs::create_dir_all(dir) {
1737 : // Ignore AlreadyExists errors, drop out on all other errors
1738 0 : match e.kind() {
1739 0 : std::io::ErrorKind::AlreadyExists => {}
1740 : _ => {
1741 0 : return Err(anyhow::anyhow!(e).context(format!("Creating {dir}")));
1742 : }
1743 : }
1744 0 : }
1745 : }
1746 :
1747 0 : for child_prefix in child_prefixes {
1748 0 : tracing::info!(
1749 0 : "Hard-linking {} parent layers into child path {}",
1750 0 : parent_layers.len(),
1751 : child_prefix
1752 : );
1753 0 : for relative_layer in &parent_layers {
1754 0 : let parent_path = parent_path.join(relative_layer);
1755 0 : let child_path = child_prefix.join(relative_layer);
1756 0 : if let Err(e) = std::fs::hard_link(&parent_path, &child_path) {
1757 0 : match e.kind() {
1758 0 : std::io::ErrorKind::AlreadyExists => {}
1759 : std::io::ErrorKind::NotFound => {
1760 0 : tracing::info!(
1761 0 : "Layer {} not found during hard-linking, evicted during split?",
1762 : relative_layer
1763 : );
1764 : }
1765 : _ => {
1766 0 : return Err(anyhow::anyhow!(e).context(format!(
1767 0 : "Hard linking {relative_layer} into {child_prefix}"
1768 0 : )))
1769 : }
1770 : }
1771 0 : }
1772 : }
1773 : }
1774 :
1775 : // Durability is not required for correctness, but if we crashed during split and
1776 : // then came restarted with empty timeline dirs, it would be very inefficient to
1777 : // re-populate from remote storage.
1778 0 : tracing::info!("fsyncing {} directories", create_dirs.len());
1779 0 : for dir in create_dirs {
1780 0 : if let Err(e) = crashsafe::fsync(&dir) {
1781 : // Something removed a newly created timeline dir out from underneath us? Extremely
1782 : // unexpected, but not worth panic'ing over as this whole function is just an
1783 : // optimization.
1784 0 : tracing::warn!("Failed to fsync directory {dir}: {e}")
1785 0 : }
1786 : }
1787 :
1788 0 : Ok(parent_layers.len())
1789 0 : });
1790 0 :
1791 0 : match jh.await {
1792 0 : Ok(Ok(layer_count)) => {
1793 0 : tracing::info!(count = layer_count, "Hard linked layers into child shards");
1794 : }
1795 0 : Ok(Err(e)) => {
1796 0 : // This is an optimization, so we tolerate failure.
1797 0 : tracing::warn!("Error hard-linking layers, proceeding anyway: {e}")
1798 : }
1799 0 : Err(e) => {
1800 0 : // This is something totally unexpected like a panic, so bail out.
1801 0 : anyhow::bail!("Error joining hard linking task: {e}");
1802 : }
1803 : }
1804 :
1805 0 : Ok(())
1806 0 : }
1807 :
1808 : ///
1809 : /// Shut down all tenants. This runs as part of pageserver shutdown.
1810 : ///
1811 : /// NB: We leave the tenants in the map, so that they remain accessible through
1812 : /// the management API until we shut it down. If we removed the shut-down tenants
1813 : /// from the tenants map, the management API would return 404 for these tenants,
1814 : /// because TenantsMap::get() now returns `None`.
1815 : /// That could be easily misinterpreted by control plane, the consumer of the
1816 : /// management API. For example, it could attach the tenant on a different pageserver.
1817 : /// We would then be in split-brain once this pageserver restarts.
1818 0 : #[instrument(skip_all)]
1819 : pub(crate) async fn shutdown(&self) {
1820 : self.cancel.cancel();
1821 :
1822 : shutdown_all_tenants0(self.tenants).await
1823 : }
1824 :
1825 0 : pub(crate) async fn detach_tenant(
1826 0 : &self,
1827 0 : conf: &'static PageServerConf,
1828 0 : tenant_shard_id: TenantShardId,
1829 0 : deletion_queue_client: &DeletionQueueClient,
1830 0 : ) -> Result<(), TenantStateError> {
1831 0 : let tmp_path = self
1832 0 : .detach_tenant0(conf, tenant_shard_id, deletion_queue_client)
1833 0 : .await?;
1834 0 : spawn_background_purge(tmp_path);
1835 0 :
1836 0 : Ok(())
1837 0 : }
1838 :
1839 0 : async fn detach_tenant0(
1840 0 : &self,
1841 0 : conf: &'static PageServerConf,
1842 0 : tenant_shard_id: TenantShardId,
1843 0 : deletion_queue_client: &DeletionQueueClient,
1844 0 : ) -> Result<Utf8PathBuf, TenantStateError> {
1845 0 : let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
1846 0 : let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
1847 0 : safe_rename_tenant_dir(&local_tenant_directory)
1848 0 : .await
1849 0 : .with_context(|| {
1850 0 : format!("local tenant directory {local_tenant_directory:?} rename")
1851 0 : })
1852 0 : };
1853 :
1854 0 : let removal_result = remove_tenant_from_memory(
1855 0 : self.tenants,
1856 0 : tenant_shard_id,
1857 0 : tenant_dir_rename_operation(tenant_shard_id),
1858 0 : )
1859 0 : .await;
1860 :
1861 : // Flush pending deletions, so that they have a good chance of passing validation
1862 : // before this tenant is potentially re-attached elsewhere.
1863 0 : deletion_queue_client.flush_advisory();
1864 0 :
1865 0 : removal_result
1866 0 : }
1867 :
1868 0 : pub(crate) fn list_tenants(
1869 0 : &self,
1870 0 : ) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
1871 0 : let tenants = self.tenants.read().unwrap();
1872 0 : let m = match &*tenants {
1873 0 : TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
1874 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
1875 0 : };
1876 0 : Ok(m.iter()
1877 0 : .filter_map(|(id, tenant)| match tenant {
1878 0 : TenantSlot::Attached(tenant) => {
1879 0 : Some((*id, tenant.current_state(), tenant.generation()))
1880 : }
1881 0 : TenantSlot::Secondary(_) => None,
1882 0 : TenantSlot::InProgress(_) => None,
1883 0 : })
1884 0 : .collect())
1885 0 : }
1886 :
1887 : /// Completes an earlier prepared timeline detach ancestor.
1888 0 : pub(crate) async fn complete_detaching_timeline_ancestor(
1889 0 : &self,
1890 0 : tenant_shard_id: TenantShardId,
1891 0 : timeline_id: TimelineId,
1892 0 : prepared: PreparedTimelineDetach,
1893 0 : ctx: &RequestContext,
1894 0 : ) -> Result<Vec<TimelineId>, anyhow::Error> {
1895 : struct RevertOnDropSlot(Option<SlotGuard>);
1896 :
1897 : impl Drop for RevertOnDropSlot {
1898 0 : fn drop(&mut self) {
1899 0 : if let Some(taken) = self.0.take() {
1900 0 : taken.revert();
1901 0 : }
1902 0 : }
1903 : }
1904 :
1905 : impl RevertOnDropSlot {
1906 0 : fn into_inner(mut self) -> SlotGuard {
1907 0 : self.0.take().unwrap()
1908 0 : }
1909 : }
1910 :
1911 : impl std::ops::Deref for RevertOnDropSlot {
1912 : type Target = SlotGuard;
1913 :
1914 0 : fn deref(&self) -> &Self::Target {
1915 0 : self.0.as_ref().unwrap()
1916 0 : }
1917 : }
1918 :
1919 0 : let slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1920 0 : let slot_guard = RevertOnDropSlot(Some(slot_guard));
1921 :
1922 0 : let tenant = {
1923 0 : let Some(old_slot) = slot_guard.get_old_value() else {
1924 0 : anyhow::bail!(
1925 0 : "Tenant not found when trying to complete detaching timeline ancestor"
1926 0 : );
1927 : };
1928 :
1929 0 : let Some(tenant) = old_slot.get_attached() else {
1930 0 : anyhow::bail!("Tenant is not in attached state");
1931 : };
1932 :
1933 0 : if !tenant.is_active() {
1934 0 : anyhow::bail!("Tenant is not active");
1935 0 : }
1936 0 :
1937 0 : tenant.clone()
1938 : };
1939 :
1940 0 : let timeline = tenant.get_timeline(timeline_id, true)?;
1941 :
1942 0 : let reparented = timeline
1943 0 : .complete_detaching_timeline_ancestor(&tenant, prepared, ctx)
1944 0 : .await?;
1945 :
1946 0 : let mut slot_guard = slot_guard.into_inner();
1947 0 :
1948 0 : let (_guard, progress) = utils::completion::channel();
1949 0 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1950 : Ok(()) => {
1951 0 : slot_guard.drop_old_value()?;
1952 : }
1953 0 : Err(_barrier) => {
1954 0 : slot_guard.revert();
1955 0 : // this really should not happen, at all, unless shutdown was already going?
1956 0 : anyhow::bail!("Cannot restart Tenant, already shutting down");
1957 : }
1958 : }
1959 :
1960 0 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1961 0 : let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
1962 :
1963 0 : let shard_identity = config.shard;
1964 0 : let tenant = tenant_spawn(
1965 0 : self.conf,
1966 0 : tenant_shard_id,
1967 0 : &tenant_path,
1968 0 : self.resources.clone(),
1969 0 : AttachedTenantConf::try_from(config)?,
1970 0 : shard_identity,
1971 0 : None,
1972 0 : SpawnMode::Eager,
1973 0 : ctx,
1974 0 : );
1975 0 :
1976 0 : slot_guard.upsert(TenantSlot::Attached(tenant))?;
1977 :
1978 0 : Ok(reparented)
1979 0 : }
1980 :
1981 : /// A page service client sends a TenantId, and to look up the correct Tenant we must
1982 : /// resolve this to a fully qualified TenantShardId.
1983 : ///
1984 : /// During shard splits: we shall see parent shards in InProgress state and skip them, and
1985 : /// instead match on child shards which should appear in Attached state. Very early in a shard
1986 : /// split, or in other cases where a shard is InProgress, we will return our own InProgress result
1987 : /// to instruct the caller to wait for that to finish before querying again.
1988 0 : pub(crate) fn resolve_attached_shard(
1989 0 : &self,
1990 0 : tenant_id: &TenantId,
1991 0 : selector: ShardSelector,
1992 0 : ) -> ShardResolveResult {
1993 0 : let tenants = self.tenants.read().unwrap();
1994 0 : let mut want_shard = None;
1995 0 : let mut any_in_progress = None;
1996 0 :
1997 0 : match &*tenants {
1998 0 : TenantsMap::Initializing => ShardResolveResult::NotFound,
1999 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
2000 0 : for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
2001 : // Ignore all slots that don't contain an attached tenant
2002 0 : let tenant = match &slot.1 {
2003 0 : TenantSlot::Attached(t) => t,
2004 0 : TenantSlot::InProgress(barrier) => {
2005 0 : // We might still find a usable shard, but in case we don't, remember that
2006 0 : // we saw at least one InProgress slot, so that we can distinguish this case
2007 0 : // from a simple NotFound in our return value.
2008 0 : any_in_progress = Some(barrier.clone());
2009 0 : continue;
2010 : }
2011 0 : _ => continue,
2012 : };
2013 :
2014 0 : match selector {
2015 0 : ShardSelector::First => return ShardResolveResult::Found(tenant.clone()),
2016 0 : ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
2017 0 : return ShardResolveResult::Found(tenant.clone())
2018 : }
2019 0 : ShardSelector::Page(key) => {
2020 0 : // First slot we see for this tenant, calculate the expected shard number
2021 0 : // for the key: we will use this for checking if this and subsequent
2022 0 : // slots contain the key, rather than recalculating the hash each time.
2023 0 : if want_shard.is_none() {
2024 0 : want_shard = Some(tenant.shard_identity.get_shard_number(&key));
2025 0 : }
2026 :
2027 0 : if Some(tenant.shard_identity.number) == want_shard {
2028 0 : return ShardResolveResult::Found(tenant.clone());
2029 0 : }
2030 : }
2031 0 : ShardSelector::Known(shard)
2032 0 : if tenant.shard_identity.shard_index() == shard =>
2033 0 : {
2034 0 : return ShardResolveResult::Found(tenant.clone());
2035 : }
2036 0 : _ => continue,
2037 : }
2038 : }
2039 :
2040 : // Fall through: we didn't find a slot that was in Attached state & matched our selector. If
2041 : // we found one or more InProgress slot, indicate to caller that they should retry later. Otherwise
2042 : // this requested shard simply isn't found.
2043 0 : if let Some(barrier) = any_in_progress {
2044 0 : ShardResolveResult::InProgress(barrier)
2045 : } else {
2046 0 : ShardResolveResult::NotFound
2047 : }
2048 : }
2049 : }
2050 0 : }
2051 : }
2052 :
2053 0 : #[derive(Debug, thiserror::Error)]
2054 : pub(crate) enum GetTenantError {
2055 : /// NotFound is a TenantId rather than TenantShardId, because this error type is used from
2056 : /// getters that use a TenantId and a ShardSelector, not just getters that target a specific shard.
2057 : #[error("Tenant {0} not found")]
2058 : NotFound(TenantId),
2059 :
2060 : #[error("Tenant {0} is not active")]
2061 : NotActive(TenantShardId),
2062 :
2063 : // Initializing or shutting down: cannot authoritatively say whether we have this tenant
2064 : #[error("Tenant map is not available: {0}")]
2065 : MapState(#[from] TenantMapError),
2066 : }
2067 :
2068 0 : #[derive(thiserror::Error, Debug)]
2069 : pub(crate) enum GetActiveTenantError {
2070 : /// We may time out either while TenantSlot is InProgress, or while the Tenant
2071 : /// is in a non-Active state
2072 : #[error(
2073 : "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
2074 : )]
2075 : WaitForActiveTimeout {
2076 : latest_state: Option<TenantState>,
2077 : wait_time: Duration,
2078 : },
2079 :
2080 : /// The TenantSlot is absent, or in secondary mode
2081 : #[error(transparent)]
2082 : NotFound(#[from] GetTenantError),
2083 :
2084 : /// Cancellation token fired while we were waiting
2085 : #[error("cancelled")]
2086 : Cancelled,
2087 :
2088 : /// Tenant exists, but is in a state that cannot become active (e.g. Stopping, Broken)
2089 : #[error("will not become active. Current state: {0}")]
2090 : WillNotBecomeActive(TenantState),
2091 :
2092 : /// Broken is logically a subset of WillNotBecomeActive, but a distinct error is useful as
2093 : /// WillNotBecomeActive is a permitted error under some circumstances, whereas broken should
2094 : /// never happen.
2095 : #[error("Tenant is broken: {0}")]
2096 : Broken(String),
2097 : }
2098 :
2099 0 : #[derive(Debug, thiserror::Error)]
2100 : pub(crate) enum DeleteTimelineError {
2101 : #[error("Tenant {0}")]
2102 : Tenant(#[from] GetTenantError),
2103 :
2104 : #[error("Timeline {0}")]
2105 : Timeline(#[from] crate::tenant::DeleteTimelineError),
2106 : }
2107 :
2108 0 : #[derive(Debug, thiserror::Error)]
2109 : pub(crate) enum TenantStateError {
2110 : #[error("Tenant {0} is stopping")]
2111 : IsStopping(TenantShardId),
2112 : #[error(transparent)]
2113 : SlotError(#[from] TenantSlotError),
2114 : #[error(transparent)]
2115 : SlotUpsertError(#[from] TenantSlotUpsertError),
2116 : #[error(transparent)]
2117 : Other(#[from] anyhow::Error),
2118 : }
2119 :
2120 0 : #[derive(Debug, thiserror::Error)]
2121 : pub(crate) enum TenantMapListError {
2122 : #[error("tenant map is still initiailizing")]
2123 : Initializing,
2124 : }
2125 :
2126 0 : #[derive(Debug, thiserror::Error)]
2127 : pub(crate) enum TenantMapInsertError {
2128 : #[error(transparent)]
2129 : SlotError(#[from] TenantSlotError),
2130 : #[error(transparent)]
2131 : SlotUpsertError(#[from] TenantSlotUpsertError),
2132 : #[error(transparent)]
2133 : Other(#[from] anyhow::Error),
2134 : }
2135 :
2136 : /// Superset of TenantMapError: issues that can occur when acquiring a slot
2137 : /// for a particular tenant ID.
2138 0 : #[derive(Debug, thiserror::Error)]
2139 : pub(crate) enum TenantSlotError {
2140 : /// When acquiring a slot with the expectation that the tenant already exists.
2141 : #[error("Tenant {0} not found")]
2142 : NotFound(TenantShardId),
2143 :
2144 : // Tried to read a slot that is currently being mutated by another administrative
2145 : // operation.
2146 : #[error("tenant has a state change in progress, try again later")]
2147 : InProgress,
2148 :
2149 : #[error(transparent)]
2150 : MapState(#[from] TenantMapError),
2151 : }
2152 :
2153 : /// Superset of TenantMapError: issues that can occur when using a SlotGuard
2154 : /// to insert a new value.
2155 0 : #[derive(thiserror::Error)]
2156 : pub(crate) enum TenantSlotUpsertError {
2157 : /// An error where the slot is in an unexpected state, indicating a code bug
2158 : #[error("Internal error updating Tenant")]
2159 : InternalError(Cow<'static, str>),
2160 :
2161 : #[error(transparent)]
2162 : MapState(TenantMapError),
2163 :
2164 : // If we encounter TenantManager shutdown during upsert, we must carry the Completion
2165 : // from the SlotGuard, so that the caller can hold it while they clean up: otherwise
2166 : // TenantManager shutdown might race ahead before we're done cleaning up any Tenant that
2167 : // was protected by the SlotGuard.
2168 : #[error("Shutting down")]
2169 : ShuttingDown((TenantSlot, utils::completion::Completion)),
2170 : }
2171 :
2172 : impl std::fmt::Debug for TenantSlotUpsertError {
2173 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2174 0 : match self {
2175 0 : Self::InternalError(reason) => write!(f, "Internal Error {reason}"),
2176 0 : Self::MapState(map_error) => write!(f, "Tenant map state: {map_error:?}"),
2177 0 : Self::ShuttingDown(_completion) => write!(f, "Tenant map shutting down"),
2178 : }
2179 0 : }
2180 : }
2181 :
2182 0 : #[derive(Debug, thiserror::Error)]
2183 : enum TenantSlotDropError {
2184 : /// It is only legal to drop a TenantSlot if its contents are fully shut down
2185 : #[error("Tenant was not shut down")]
2186 : NotShutdown,
2187 : }
2188 :
2189 : /// Errors that can happen any time we are walking the tenant map to try and acquire
2190 : /// the TenantSlot for a particular tenant.
2191 0 : #[derive(Debug, thiserror::Error)]
2192 : pub enum TenantMapError {
2193 : // Tried to read while initializing
2194 : #[error("tenant map is still initializing")]
2195 : StillInitializing,
2196 :
2197 : // Tried to read while shutting down
2198 : #[error("tenant map is shutting down")]
2199 : ShuttingDown,
2200 : }
2201 :
2202 : /// Guards a particular tenant_id's content in the TenantsMap. While this
2203 : /// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
2204 : /// for this tenant, which acts as a marker for any operations targeting
2205 : /// this tenant to retry later, or wait for the InProgress state to end.
2206 : ///
2207 : /// This structure enforces the important invariant that we do not have overlapping
2208 : /// tasks that will try use local storage for a the same tenant ID: we enforce that
2209 : /// the previous contents of a slot have been shut down before the slot can be
2210 : /// left empty or used for something else
2211 : ///
2212 : /// Holders of a SlotGuard should explicitly dispose of it, using either `upsert`
2213 : /// to provide a new value, or `revert` to put the slot back into its initial
2214 : /// state. If the SlotGuard is dropped without calling either of these, then
2215 : /// we will leave the slot empty if our `old_value` is already shut down, else
2216 : /// we will replace the slot with `old_value` (equivalent to doing a revert).
2217 : ///
2218 : /// The `old_value` may be dropped before the SlotGuard is dropped, by calling
2219 : /// `drop_old_value`. It is an error to call this without shutting down
2220 : /// the conents of `old_value`.
2221 : pub struct SlotGuard {
2222 : tenant_shard_id: TenantShardId,
2223 : old_value: Option<TenantSlot>,
2224 : upserted: bool,
2225 :
2226 : /// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will
2227 : /// release any waiters as soon as this SlotGuard is dropped.
2228 : completion: utils::completion::Completion,
2229 : }
2230 :
2231 : impl SlotGuard {
2232 2 : fn new(
2233 2 : tenant_shard_id: TenantShardId,
2234 2 : old_value: Option<TenantSlot>,
2235 2 : completion: utils::completion::Completion,
2236 2 : ) -> Self {
2237 2 : Self {
2238 2 : tenant_shard_id,
2239 2 : old_value,
2240 2 : upserted: false,
2241 2 : completion,
2242 2 : }
2243 2 : }
2244 :
2245 : /// Get any value that was present in the slot before we acquired ownership
2246 : /// of it: in state transitions, this will be the old state.
2247 2 : fn get_old_value(&self) -> &Option<TenantSlot> {
2248 2 : &self.old_value
2249 2 : }
2250 :
2251 : /// Emplace a new value in the slot. This consumes the guard, and after
2252 : /// returning, the slot is no longer protected from concurrent changes.
2253 0 : fn upsert(mut self, new_value: TenantSlot) -> Result<(), TenantSlotUpsertError> {
2254 0 : if !self.old_value_is_shutdown() {
2255 : // This is a bug: callers should never try to drop an old value without
2256 : // shutting it down
2257 0 : return Err(TenantSlotUpsertError::InternalError(
2258 0 : "Old TenantSlot value not shut down".into(),
2259 0 : ));
2260 0 : }
2261 :
2262 0 : let replaced = {
2263 0 : let mut locked = TENANTS.write().unwrap();
2264 0 :
2265 0 : if let TenantSlot::InProgress(_) = new_value {
2266 : // It is never expected to try and upsert InProgress via this path: it should
2267 : // only be written via the tenant_map_acquire_slot path. If we hit this it's a bug.
2268 0 : return Err(TenantSlotUpsertError::InternalError(
2269 0 : "Attempt to upsert an InProgress state".into(),
2270 0 : ));
2271 0 : }
2272 :
2273 0 : let m = match &mut *locked {
2274 : TenantsMap::Initializing => {
2275 0 : return Err(TenantSlotUpsertError::MapState(
2276 0 : TenantMapError::StillInitializing,
2277 0 : ))
2278 : }
2279 : TenantsMap::ShuttingDown(_) => {
2280 0 : return Err(TenantSlotUpsertError::ShuttingDown((
2281 0 : new_value,
2282 0 : self.completion.clone(),
2283 0 : )));
2284 : }
2285 0 : TenantsMap::Open(m) => m,
2286 0 : };
2287 0 :
2288 0 : METRICS.slot_inserted(&new_value);
2289 0 :
2290 0 : let replaced = m.insert(self.tenant_shard_id, new_value);
2291 0 : self.upserted = true;
2292 0 : if let Some(replaced) = replaced.as_ref() {
2293 0 : METRICS.slot_removed(replaced);
2294 0 : }
2295 :
2296 0 : replaced
2297 : };
2298 :
2299 : // Sanity check: on an upsert we should always be replacing an InProgress marker
2300 0 : match replaced {
2301 : Some(TenantSlot::InProgress(_)) => {
2302 : // Expected case: we find our InProgress in the map: nothing should have
2303 : // replaced it because the code that acquires slots will not grant another
2304 : // one for the same TenantId.
2305 0 : Ok(())
2306 : }
2307 : None => {
2308 0 : METRICS.unexpected_errors.inc();
2309 0 : error!(
2310 : tenant_shard_id = %self.tenant_shard_id,
2311 0 : "Missing InProgress marker during tenant upsert, this is a bug."
2312 : );
2313 0 : Err(TenantSlotUpsertError::InternalError(
2314 0 : "Missing InProgress marker during tenant upsert".into(),
2315 0 : ))
2316 : }
2317 0 : Some(slot) => {
2318 0 : METRICS.unexpected_errors.inc();
2319 0 : error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot);
2320 0 : Err(TenantSlotUpsertError::InternalError(
2321 0 : "Unexpected contents of TenantSlot".into(),
2322 0 : ))
2323 : }
2324 : }
2325 0 : }
2326 :
2327 : /// Replace the InProgress slot with whatever was in the guard when we started
2328 0 : fn revert(mut self) {
2329 0 : if let Some(value) = self.old_value.take() {
2330 0 : match self.upsert(value) {
2331 0 : Err(TenantSlotUpsertError::InternalError(_)) => {
2332 0 : // We already logged the error, nothing else we can do.
2333 0 : }
2334 : Err(
2335 : TenantSlotUpsertError::MapState(_) | TenantSlotUpsertError::ShuttingDown(_),
2336 0 : ) => {
2337 0 : // If the map is shutting down, we need not replace anything
2338 0 : }
2339 0 : Ok(()) => {}
2340 : }
2341 0 : }
2342 0 : }
2343 :
2344 : /// We may never drop our old value until it is cleanly shut down: otherwise we might leave
2345 : /// rogue background tasks that would write to the local tenant directory that this guard
2346 : /// is responsible for protecting
2347 2 : fn old_value_is_shutdown(&self) -> bool {
2348 2 : match self.old_value.as_ref() {
2349 2 : Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(),
2350 0 : Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(),
2351 : Some(TenantSlot::InProgress(_)) => {
2352 : // A SlotGuard cannot be constructed for a slot that was already InProgress
2353 0 : unreachable!()
2354 : }
2355 0 : None => true,
2356 : }
2357 2 : }
2358 :
2359 : /// The guard holder is done with the old value of the slot: they are obliged to already
2360 : /// shut it down before we reach this point.
2361 2 : fn drop_old_value(&mut self) -> Result<(), TenantSlotDropError> {
2362 2 : if !self.old_value_is_shutdown() {
2363 0 : Err(TenantSlotDropError::NotShutdown)
2364 : } else {
2365 2 : self.old_value.take();
2366 2 : Ok(())
2367 : }
2368 2 : }
2369 : }
2370 :
2371 : impl Drop for SlotGuard {
2372 2 : fn drop(&mut self) {
2373 2 : if self.upserted {
2374 0 : return;
2375 2 : }
2376 2 : // Our old value is already shutdown, or it never existed: it is safe
2377 2 : // for us to fully release the TenantSlot back into an empty state
2378 2 :
2379 2 : let mut locked = TENANTS.write().unwrap();
2380 :
2381 2 : let m = match &mut *locked {
2382 : TenantsMap::Initializing => {
2383 : // There is no map, this should never happen.
2384 2 : return;
2385 : }
2386 : TenantsMap::ShuttingDown(_) => {
2387 : // When we transition to shutdown, InProgress elements are removed
2388 : // from the map, so we do not need to clean up our Inprogress marker.
2389 : // See [`shutdown_all_tenants0`]
2390 0 : return;
2391 : }
2392 0 : TenantsMap::Open(m) => m,
2393 0 : };
2394 0 :
2395 0 : use std::collections::btree_map::Entry;
2396 0 : match m.entry(self.tenant_shard_id) {
2397 0 : Entry::Occupied(mut entry) => {
2398 0 : if !matches!(entry.get(), TenantSlot::InProgress(_)) {
2399 0 : METRICS.unexpected_errors.inc();
2400 0 : error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get());
2401 0 : }
2402 :
2403 0 : if self.old_value_is_shutdown() {
2404 0 : METRICS.slot_removed(entry.get());
2405 0 : entry.remove();
2406 0 : } else {
2407 0 : let inserting = self.old_value.take().unwrap();
2408 0 : METRICS.slot_inserted(&inserting);
2409 0 : let replaced = entry.insert(inserting);
2410 0 : METRICS.slot_removed(&replaced);
2411 0 : }
2412 : }
2413 : Entry::Vacant(_) => {
2414 0 : METRICS.unexpected_errors.inc();
2415 0 : error!(
2416 : tenant_shard_id = %self.tenant_shard_id,
2417 0 : "Missing InProgress marker during SlotGuard drop, this is a bug."
2418 : );
2419 : }
2420 : }
2421 2 : }
2422 : }
2423 :
2424 : enum TenantSlotPeekMode {
2425 : /// In Read mode, peek will be permitted to see the slots even if the pageserver is shutting down
2426 : Read,
2427 : /// In Write mode, trying to peek at a slot while the pageserver is shutting down is an error
2428 : Write,
2429 : }
2430 :
2431 0 : fn tenant_map_peek_slot<'a>(
2432 0 : tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
2433 0 : tenant_shard_id: &TenantShardId,
2434 0 : mode: TenantSlotPeekMode,
2435 0 : ) -> Result<Option<&'a TenantSlot>, TenantMapError> {
2436 0 : match tenants.deref() {
2437 0 : TenantsMap::Initializing => Err(TenantMapError::StillInitializing),
2438 0 : TenantsMap::ShuttingDown(m) => match mode {
2439 : TenantSlotPeekMode::Read => Ok(Some(
2440 : // When reading in ShuttingDown state, we must translate None results
2441 : // into a ShuttingDown error, because absence of a tenant shard ID in the map
2442 : // isn't a reliable indicator of the tenant being gone: it might have been
2443 : // InProgress when shutdown started, and cleaned up from that state such
2444 : // that it's now no longer in the map. Callers will have to wait until
2445 : // we next start up to get a proper answer. This avoids incorrect 404 API responses.
2446 0 : m.get(tenant_shard_id).ok_or(TenantMapError::ShuttingDown)?,
2447 : )),
2448 0 : TenantSlotPeekMode::Write => Err(TenantMapError::ShuttingDown),
2449 : },
2450 0 : TenantsMap::Open(m) => Ok(m.get(tenant_shard_id)),
2451 : }
2452 0 : }
2453 :
2454 : enum TenantSlotAcquireMode {
2455 : /// Acquire the slot irrespective of current state, or whether it already exists
2456 : Any,
2457 : /// Return an error if trying to acquire a slot and it doesn't already exist
2458 : MustExist,
2459 : }
2460 :
2461 0 : fn tenant_map_acquire_slot(
2462 0 : tenant_shard_id: &TenantShardId,
2463 0 : mode: TenantSlotAcquireMode,
2464 0 : ) -> Result<SlotGuard, TenantSlotError> {
2465 0 : tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode)
2466 0 : }
2467 :
2468 2 : fn tenant_map_acquire_slot_impl(
2469 2 : tenant_shard_id: &TenantShardId,
2470 2 : tenants: &std::sync::RwLock<TenantsMap>,
2471 2 : mode: TenantSlotAcquireMode,
2472 2 : ) -> Result<SlotGuard, TenantSlotError> {
2473 2 : use TenantSlotAcquireMode::*;
2474 2 : METRICS.tenant_slot_writes.inc();
2475 2 :
2476 2 : let mut locked = tenants.write().unwrap();
2477 2 : let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug());
2478 2 : let _guard = span.enter();
2479 :
2480 2 : let m = match &mut *locked {
2481 0 : TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
2482 0 : TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
2483 2 : TenantsMap::Open(m) => m,
2484 2 : };
2485 2 :
2486 2 : use std::collections::btree_map::Entry;
2487 2 :
2488 2 : let entry = m.entry(*tenant_shard_id);
2489 2 :
2490 2 : match entry {
2491 0 : Entry::Vacant(v) => match mode {
2492 : MustExist => {
2493 0 : tracing::debug!("Vacant && MustExist: return NotFound");
2494 0 : Err(TenantSlotError::NotFound(*tenant_shard_id))
2495 : }
2496 : _ => {
2497 0 : let (completion, barrier) = utils::completion::channel();
2498 0 : let inserting = TenantSlot::InProgress(barrier);
2499 0 : METRICS.slot_inserted(&inserting);
2500 0 : v.insert(inserting);
2501 0 : tracing::debug!("Vacant, inserted InProgress");
2502 0 : Ok(SlotGuard::new(*tenant_shard_id, None, completion))
2503 : }
2504 : },
2505 2 : Entry::Occupied(mut o) => {
2506 2 : // Apply mode-driven checks
2507 2 : match (o.get(), mode) {
2508 : (TenantSlot::InProgress(_), _) => {
2509 0 : tracing::debug!("Occupied, failing for InProgress");
2510 0 : Err(TenantSlotError::InProgress)
2511 : }
2512 : _ => {
2513 : // Happy case: the slot was not in any state that violated our mode
2514 2 : let (completion, barrier) = utils::completion::channel();
2515 2 : let in_progress = TenantSlot::InProgress(barrier);
2516 2 : METRICS.slot_inserted(&in_progress);
2517 2 : let old_value = o.insert(in_progress);
2518 2 : METRICS.slot_removed(&old_value);
2519 2 : tracing::debug!("Occupied, replaced with InProgress");
2520 2 : Ok(SlotGuard::new(
2521 2 : *tenant_shard_id,
2522 2 : Some(old_value),
2523 2 : completion,
2524 2 : ))
2525 : }
2526 : }
2527 : }
2528 : }
2529 2 : }
2530 :
2531 : /// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
2532 : /// Allows to remove other tenant resources manually, via `tenant_cleanup`.
2533 : /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
2534 : /// operation would be needed to remove it.
2535 2 : async fn remove_tenant_from_memory<V, F>(
2536 2 : tenants: &std::sync::RwLock<TenantsMap>,
2537 2 : tenant_shard_id: TenantShardId,
2538 2 : tenant_cleanup: F,
2539 2 : ) -> Result<V, TenantStateError>
2540 2 : where
2541 2 : F: std::future::Future<Output = anyhow::Result<V>>,
2542 2 : {
2543 2 : let mut slot_guard =
2544 2 : tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
2545 :
2546 : // allow pageserver shutdown to await for our completion
2547 2 : let (_guard, progress) = completion::channel();
2548 :
2549 : // The SlotGuard allows us to manipulate the Tenant object without fear of some
2550 : // concurrent API request doing something else for the same tenant ID.
2551 2 : let attached_tenant = match slot_guard.get_old_value() {
2552 2 : Some(TenantSlot::Attached(tenant)) => {
2553 2 : // whenever we remove a tenant from memory, we don't want to flush and wait for upload
2554 2 : let shutdown_mode = ShutdownMode::Hard;
2555 2 :
2556 2 : // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
2557 2 : // that we can continue safely to cleanup.
2558 2 : match tenant.shutdown(progress, shutdown_mode).await {
2559 2 : Ok(()) => {}
2560 0 : Err(_other) => {
2561 0 : // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
2562 0 : // wait for it but return an error right away because these are distinct requests.
2563 0 : slot_guard.revert();
2564 0 : return Err(TenantStateError::IsStopping(tenant_shard_id));
2565 : }
2566 : }
2567 2 : Some(tenant)
2568 : }
2569 0 : Some(TenantSlot::Secondary(secondary_state)) => {
2570 0 : tracing::info!("Shutting down in secondary mode");
2571 0 : secondary_state.shutdown().await;
2572 0 : None
2573 : }
2574 : Some(TenantSlot::InProgress(_)) => {
2575 : // Acquiring a slot guarantees its old value was not InProgress
2576 0 : unreachable!();
2577 : }
2578 0 : None => None,
2579 : };
2580 :
2581 2 : match tenant_cleanup
2582 2 : .await
2583 2 : .with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
2584 : {
2585 2 : Ok(hook_value) => {
2586 2 : // Success: drop the old TenantSlot::Attached.
2587 2 : slot_guard
2588 2 : .drop_old_value()
2589 2 : .expect("We just called shutdown");
2590 2 :
2591 2 : Ok(hook_value)
2592 : }
2593 0 : Err(e) => {
2594 : // If we had a Tenant, set it to Broken and put it back in the TenantsMap
2595 0 : if let Some(attached_tenant) = attached_tenant {
2596 0 : attached_tenant.set_broken(e.to_string()).await;
2597 0 : }
2598 : // Leave the broken tenant in the map
2599 0 : slot_guard.revert();
2600 0 :
2601 0 : Err(TenantStateError::Other(e))
2602 : }
2603 : }
2604 2 : }
2605 :
2606 : use {
2607 : crate::repository::GcResult, pageserver_api::models::TimelineGcRequest,
2608 : utils::http::error::ApiError,
2609 : };
2610 :
2611 0 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))]
2612 : pub(crate) async fn immediate_gc(
2613 : tenant_shard_id: TenantShardId,
2614 : timeline_id: TimelineId,
2615 : gc_req: TimelineGcRequest,
2616 : cancel: CancellationToken,
2617 : ctx: &RequestContext,
2618 : ) -> Result<GcResult, ApiError> {
2619 : let tenant = {
2620 : let guard = TENANTS.read().unwrap();
2621 : guard
2622 : .get(&tenant_shard_id)
2623 : .cloned()
2624 0 : .with_context(|| format!("tenant {tenant_shard_id}"))
2625 0 : .map_err(|e| ApiError::NotFound(e.into()))?
2626 : };
2627 :
2628 0 : let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
2629 : // Use tenant's pitr setting
2630 : let pitr = tenant.get_pitr_interval();
2631 :
2632 : tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
2633 :
2634 : // Run in task_mgr to avoid race with tenant_detach operation
2635 : let ctx: RequestContext =
2636 : ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
2637 :
2638 0 : let _gate_guard = tenant.gate.enter().map_err(|_| ApiError::ShuttingDown)?;
2639 :
2640 : fail::fail_point!("immediate_gc_task_pre");
2641 :
2642 : #[allow(unused_mut)]
2643 : let mut result = tenant
2644 : .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
2645 : .await;
2646 : // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
2647 : // better once the types support it.
2648 :
2649 : #[cfg(feature = "testing")]
2650 : {
2651 : // we need to synchronize with drop completion for python tests without polling for
2652 : // log messages
2653 : if let Ok(result) = result.as_mut() {
2654 : let mut js = tokio::task::JoinSet::new();
2655 : for layer in std::mem::take(&mut result.doomed_layers) {
2656 : js.spawn(layer.wait_drop());
2657 : }
2658 : tracing::info!(
2659 : total = js.len(),
2660 : "starting to wait for the gc'd layers to be dropped"
2661 : );
2662 : while let Some(res) = js.join_next().await {
2663 : res.expect("wait_drop should not panic");
2664 : }
2665 : }
2666 :
2667 : let timeline = tenant.get_timeline(timeline_id, false).ok();
2668 0 : let rtc = timeline.as_ref().map(|x| &x.remote_client);
2669 :
2670 : if let Some(rtc) = rtc {
2671 : // layer drops schedule actions on remote timeline client to actually do the
2672 : // deletions; don't care about the shutdown error, just exit fast
2673 : drop(rtc.wait_completion().await);
2674 : }
2675 : }
2676 :
2677 0 : result.map_err(|e| match e {
2678 0 : GcError::TenantCancelled | GcError::TimelineCancelled => ApiError::ShuttingDown,
2679 : GcError::TimelineNotFound => {
2680 0 : ApiError::NotFound(anyhow::anyhow!("Timeline not found").into())
2681 : }
2682 0 : other => ApiError::InternalServerError(anyhow::anyhow!(other)),
2683 0 : })
2684 : }
2685 :
2686 : #[cfg(test)]
2687 : mod tests {
2688 : use std::collections::BTreeMap;
2689 : use std::sync::Arc;
2690 : use tracing::Instrument;
2691 :
2692 : use crate::tenant::mgr::TenantSlot;
2693 :
2694 : use super::{super::harness::TenantHarness, TenantsMap};
2695 :
2696 : #[tokio::test(start_paused = true)]
2697 2 : async fn shutdown_awaits_in_progress_tenant() {
2698 2 : // Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully
2699 2 : // wait for it to complete before proceeding.
2700 2 :
2701 2 : let h = TenantHarness::create("shutdown_awaits_in_progress_tenant").unwrap();
2702 8 : let (t, _ctx) = h.load().await;
2703 2 :
2704 2 : // harness loads it to active, which is forced and nothing is running on the tenant
2705 2 :
2706 2 : let id = t.tenant_shard_id();
2707 2 :
2708 2 : // tenant harness configures the logging and we cannot escape it
2709 2 : let span = h.span();
2710 2 : let _e = span.enter();
2711 2 :
2712 2 : let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
2713 2 : let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
2714 2 :
2715 2 : // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
2716 2 : // permit it to proceed: that will stick the tenant in InProgress
2717 2 :
2718 2 : let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
2719 2 : let (until_cleanup_started, cleanup_started) = utils::completion::channel();
2720 2 : let mut remove_tenant_from_memory_task = {
2721 2 : let jh = tokio::spawn({
2722 2 : let tenants = tenants.clone();
2723 2 : async move {
2724 2 : let cleanup = async move {
2725 2 : drop(until_cleanup_started);
2726 2 : can_complete_cleanup.wait().await;
2727 2 : anyhow::Ok(())
2728 2 : };
2729 2 : super::remove_tenant_from_memory(&tenants, id, cleanup).await
2730 2 : }
2731 2 : .instrument(h.span())
2732 2 : });
2733 2 :
2734 2 : // now the long cleanup should be in place, with the stopping state
2735 2 : cleanup_started.wait().await;
2736 2 : jh
2737 2 : };
2738 2 :
2739 2 : let mut shutdown_task = {
2740 2 : let (until_shutdown_started, shutdown_started) = utils::completion::channel();
2741 2 :
2742 2 : let shutdown_task = tokio::spawn(async move {
2743 2 : drop(until_shutdown_started);
2744 4 : super::shutdown_all_tenants0(&tenants).await;
2745 2 : });
2746 2 :
2747 2 : shutdown_started.wait().await;
2748 2 : shutdown_task
2749 2 : };
2750 2 :
2751 2 : let long_time = std::time::Duration::from_secs(15);
2752 2 : tokio::select! {
2753 2 : _ = &mut shutdown_task => unreachable!("shutdown should block on remove_tenant_from_memory completing"),
2754 2 : _ = &mut remove_tenant_from_memory_task => unreachable!("remove_tenant_from_memory_task should not complete until explicitly unblocked"),
2755 2 : _ = tokio::time::sleep(long_time) => {},
2756 2 : }
2757 2 :
2758 2 : drop(until_cleanup_completed);
2759 2 :
2760 2 : // Now that we allow it to proceed, shutdown should complete immediately
2761 2 : remove_tenant_from_memory_task.await.unwrap().unwrap();
2762 2 : shutdown_task.await.unwrap();
2763 2 : }
2764 : }
|