Line data Source code
1 : //! This module acts as a switchboard to access different repositories managed by this
2 : //! page server.
3 :
4 : use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
5 : use itertools::Itertools;
6 : use pageserver_api::key::Key;
7 : use pageserver_api::models::LocationConfigMode;
8 : use pageserver_api::shard::{
9 : ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
10 : };
11 : use pageserver_api::upcall_api::ReAttachResponseTenant;
12 : use rand::{distributions::Alphanumeric, Rng};
13 : use std::borrow::Cow;
14 : use std::cmp::Ordering;
15 : use std::collections::{BTreeMap, HashMap};
16 : use std::ops::Deref;
17 : use std::sync::Arc;
18 : use std::time::{Duration, Instant};
19 : use sysinfo::SystemExt;
20 : use tokio::fs;
21 : use utils::timeout::{timeout_cancellable, TimeoutCancellableError};
22 :
23 : use anyhow::Context;
24 : use once_cell::sync::Lazy;
25 : use tokio::task::JoinSet;
26 : use tokio_util::sync::CancellationToken;
27 : use tracing::*;
28 :
29 : use remote_storage::GenericRemoteStorage;
30 : use utils::{completion, crashsafe};
31 :
32 : use crate::config::PageServerConf;
33 : use crate::context::{DownloadBehavior, RequestContext};
34 : use crate::control_plane_client::{
35 : ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError,
36 : };
37 : use crate::deletion_queue::DeletionQueueClient;
38 : use crate::http::routes::ACTIVE_TENANT_TIMEOUT;
39 : use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
40 : use crate::task_mgr::{self, TaskKind};
41 : use crate::tenant::config::{
42 : AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, SecondaryLocationConfig,
43 : };
44 : use crate::tenant::delete::DeleteTenantFlow;
45 : use crate::tenant::span::debug_assert_current_span_has_tenant_id;
46 : use crate::tenant::storage_layer::inmemory_layer;
47 : use crate::tenant::timeline::ShutdownMode;
48 : use crate::tenant::{AttachedTenantConf, SpawnMode, Tenant, TenantState};
49 : use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TEMP_FILE_SUFFIX};
50 :
51 : use utils::crashsafe::path_with_suffix_extension;
52 : use utils::fs_ext::PathExt;
53 : use utils::generation::Generation;
54 : use utils::id::{TenantId, TimelineId};
55 :
56 : use super::delete::DeleteTenantError;
57 : use super::secondary::SecondaryTenant;
58 : use super::TenantSharedResources;
59 :
60 : /// For a tenant that appears in TenantsMap, it may either be
61 : /// - `Attached`: has a full Tenant object, is elegible to service
62 : /// reads and ingest WAL.
63 : /// - `Secondary`: is only keeping a local cache warm.
64 : ///
65 : /// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
66 : /// that way we avoid having to carefully switch a tenant's ingestion etc on and off during
67 : /// its lifetime, and we can preserve some important safety invariants like `Tenant` always
68 : /// having a properly acquired generation (Secondary doesn't need a generation)
69 : #[derive(Clone)]
70 : pub(crate) enum TenantSlot {
71 : Attached(Arc<Tenant>),
72 : Secondary(Arc<SecondaryTenant>),
73 : /// In this state, other administrative operations acting on the TenantId should
74 : /// block, or return a retry indicator equivalent to HTTP 503.
75 : InProgress(utils::completion::Barrier),
76 : }
77 :
78 : impl std::fmt::Debug for TenantSlot {
79 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 0 : match self {
81 0 : Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()),
82 0 : Self::Secondary(_) => write!(f, "Secondary"),
83 0 : Self::InProgress(_) => write!(f, "InProgress"),
84 : }
85 0 : }
86 : }
87 :
88 : impl TenantSlot {
89 : /// Return the `Tenant` in this slot if attached, else None
90 0 : fn get_attached(&self) -> Option<&Arc<Tenant>> {
91 0 : match self {
92 0 : Self::Attached(t) => Some(t),
93 0 : Self::Secondary(_) => None,
94 0 : Self::InProgress(_) => None,
95 : }
96 0 : }
97 : }
98 :
99 : /// The tenants known to the pageserver.
100 : /// The enum variants are used to distinguish the different states that the pageserver can be in.
101 : pub(crate) enum TenantsMap {
102 : /// [`init_tenant_mgr`] is not done yet.
103 : Initializing,
104 : /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
105 : /// New tenants can be added using [`tenant_map_acquire_slot`].
106 : Open(BTreeMap<TenantShardId, TenantSlot>),
107 : /// The pageserver has entered shutdown mode via [`TenantManager::shutdown`].
108 : /// Existing tenants are still accessible, but no new tenants can be created.
109 : ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
110 : }
111 :
112 : pub(crate) enum TenantsMapRemoveResult {
113 : Occupied(TenantSlot),
114 : Vacant,
115 : InProgress(utils::completion::Barrier),
116 : }
117 :
118 : /// When resolving a TenantId to a shard, we may be looking for the 0th
119 : /// shard, or we might be looking for whichever shard holds a particular page.
120 : pub(crate) enum ShardSelector {
121 : /// Only return the 0th shard, if it is present. If a non-0th shard is present,
122 : /// ignore it.
123 : Zero,
124 : /// Pick the first shard we find for the TenantId
125 : First,
126 : /// Pick the shard that holds this key
127 : Page(Key),
128 : }
129 :
130 : /// A convenience for use with the re_attach ControlPlaneClient function: rather
131 : /// than the serializable struct, we build this enum that encapsulates
132 : /// the invariant that attached tenants always have generations.
133 : ///
134 : /// This represents the subset of a LocationConfig that we receive during re-attach.
135 : pub(crate) enum TenantStartupMode {
136 : Attached((AttachmentMode, Generation)),
137 : Secondary,
138 : }
139 :
140 : impl TenantStartupMode {
141 : /// Return the generation & mode that should be used when starting
142 : /// this tenant.
143 : ///
144 : /// If this returns None, the re-attach struct is in an invalid state and
145 : /// should be ignored in the response.
146 0 : fn from_reattach_tenant(rart: ReAttachResponseTenant) -> Option<Self> {
147 0 : match (rart.mode, rart.gen) {
148 0 : (LocationConfigMode::Detached, _) => None,
149 0 : (LocationConfigMode::Secondary, _) => Some(Self::Secondary),
150 0 : (LocationConfigMode::AttachedMulti, Some(g)) => {
151 0 : Some(Self::Attached((AttachmentMode::Multi, Generation::new(g))))
152 : }
153 0 : (LocationConfigMode::AttachedSingle, Some(g)) => {
154 0 : Some(Self::Attached((AttachmentMode::Single, Generation::new(g))))
155 : }
156 0 : (LocationConfigMode::AttachedStale, Some(g)) => {
157 0 : Some(Self::Attached((AttachmentMode::Stale, Generation::new(g))))
158 : }
159 : _ => {
160 0 : tracing::warn!(
161 0 : "Received invalid re-attach state for tenant {}: {rart:?}",
162 0 : rart.id
163 0 : );
164 0 : None
165 : }
166 : }
167 0 : }
168 : }
169 :
170 : impl TenantsMap {
171 : /// Convenience function for typical usage, where we want to get a `Tenant` object, for
172 : /// working with attached tenants. If the TenantId is in the map but in Secondary state,
173 : /// None is returned.
174 0 : pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc<Tenant>> {
175 0 : match self {
176 0 : TenantsMap::Initializing => None,
177 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
178 0 : m.get(tenant_shard_id).and_then(|slot| slot.get_attached())
179 : }
180 : }
181 0 : }
182 :
183 : /// A page service client sends a TenantId, and to look up the correct Tenant we must
184 : /// resolve this to a fully qualified TenantShardId.
185 0 : fn resolve_attached_shard(
186 0 : &self,
187 0 : tenant_id: &TenantId,
188 0 : selector: ShardSelector,
189 0 : ) -> Option<TenantShardId> {
190 0 : let mut want_shard = None;
191 0 : match self {
192 0 : TenantsMap::Initializing => None,
193 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
194 0 : for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
195 : // Ignore all slots that don't contain an attached tenant
196 0 : let tenant = match &slot.1 {
197 0 : TenantSlot::Attached(t) => t,
198 0 : _ => continue,
199 : };
200 :
201 0 : match selector {
202 0 : ShardSelector::First => return Some(*slot.0),
203 0 : ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
204 0 : return Some(*slot.0)
205 : }
206 0 : ShardSelector::Page(key) => {
207 0 : // First slot we see for this tenant, calculate the expected shard number
208 0 : // for the key: we will use this for checking if this and subsequent
209 0 : // slots contain the key, rather than recalculating the hash each time.
210 0 : if want_shard.is_none() {
211 0 : want_shard = Some(tenant.shard_identity.get_shard_number(&key));
212 0 : }
213 :
214 0 : if Some(tenant.shard_identity.number) == want_shard {
215 0 : return Some(*slot.0);
216 0 : }
217 : }
218 0 : _ => continue,
219 : }
220 : }
221 :
222 : // Fall through: we didn't find an acceptable shard
223 0 : None
224 : }
225 : }
226 0 : }
227 :
228 : /// Only for use from DeleteTenantFlow. This method directly removes a TenantSlot from the map.
229 : ///
230 : /// The normal way to remove a tenant is using a SlotGuard, which will gracefully remove the guarded
231 : /// slot if the enclosed tenant is shutdown.
232 0 : pub(crate) fn remove(&mut self, tenant_shard_id: TenantShardId) -> TenantsMapRemoveResult {
233 0 : use std::collections::btree_map::Entry;
234 0 : match self {
235 0 : TenantsMap::Initializing => TenantsMapRemoveResult::Vacant,
236 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => match m.entry(tenant_shard_id) {
237 0 : Entry::Occupied(entry) => match entry.get() {
238 0 : TenantSlot::InProgress(barrier) => {
239 0 : TenantsMapRemoveResult::InProgress(barrier.clone())
240 : }
241 0 : _ => TenantsMapRemoveResult::Occupied(entry.remove()),
242 : },
243 0 : Entry::Vacant(_entry) => TenantsMapRemoveResult::Vacant,
244 : },
245 : }
246 0 : }
247 :
248 0 : pub(crate) fn len(&self) -> usize {
249 0 : match self {
250 0 : TenantsMap::Initializing => 0,
251 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.len(),
252 : }
253 0 : }
254 : }
255 :
256 : /// This is "safe" in that that it won't leave behind a partially deleted directory
257 : /// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting
258 : /// the contents.
259 : ///
260 : /// This is pageserver-specific, as it relies on future processes after a crash to check
261 : /// for TEMP_FILE_SUFFIX when loading things.
262 0 : async fn safe_remove_tenant_dir_all(path: impl AsRef<Utf8Path>) -> std::io::Result<()> {
263 0 : let tmp_path = safe_rename_tenant_dir(path).await?;
264 0 : fs::remove_dir_all(tmp_path).await
265 0 : }
266 :
267 0 : async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<Utf8PathBuf> {
268 0 : let parent = path
269 0 : .as_ref()
270 0 : .parent()
271 0 : // It is invalid to call this function with a relative path. Tenant directories
272 0 : // should always have a parent.
273 0 : .ok_or(std::io::Error::new(
274 0 : std::io::ErrorKind::InvalidInput,
275 0 : "Path must be absolute",
276 0 : ))?;
277 0 : let rand_suffix = rand::thread_rng()
278 0 : .sample_iter(&Alphanumeric)
279 0 : .take(8)
280 0 : .map(char::from)
281 0 : .collect::<String>()
282 0 : + TEMP_FILE_SUFFIX;
283 0 : let tmp_path = path_with_suffix_extension(&path, &rand_suffix);
284 0 : fs::rename(path.as_ref(), &tmp_path).await?;
285 0 : fs::File::open(parent).await?.sync_all().await?;
286 0 : Ok(tmp_path)
287 0 : }
288 :
289 : static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
290 2 : Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
291 :
292 : /// The TenantManager is responsible for storing and mutating the collection of all tenants
293 : /// that this pageserver process has state for. Every Tenant and SecondaryTenant instance
294 : /// lives inside the TenantManager.
295 : ///
296 : /// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
297 : /// the same tenant twice concurrently, or trying to configure the same tenant into secondary
298 : /// and attached modes concurrently.
299 : pub struct TenantManager {
300 : conf: &'static PageServerConf,
301 : // TODO: currently this is a &'static pointing to TENANTs. When we finish refactoring
302 : // out of that static variable, the TenantManager can own this.
303 : // See https://github.com/neondatabase/neon/issues/5796
304 : tenants: &'static std::sync::RwLock<TenantsMap>,
305 : resources: TenantSharedResources,
306 :
307 : // Long-running operations that happen outside of a [`Tenant`] lifetime should respect this token.
308 : // This is for edge cases like tenant deletion. In normal cases (within a Tenant lifetime),
309 : // tenants have their own cancellation tokens, which we fire individually in [`Self::shutdown`], or
310 : // when the tenant detaches.
311 : cancel: CancellationToken,
312 : }
313 :
314 0 : fn emergency_generations(
315 0 : tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
316 0 : ) -> HashMap<TenantShardId, TenantStartupMode> {
317 0 : tenant_confs
318 0 : .iter()
319 0 : .filter_map(|(tid, lc)| {
320 0 : let lc = match lc {
321 0 : Ok(lc) => lc,
322 0 : Err(_) => return None,
323 : };
324 : Some((
325 0 : *tid,
326 0 : match &lc.mode {
327 0 : LocationMode::Attached(alc) => {
328 0 : TenantStartupMode::Attached((alc.attach_mode, alc.generation))
329 : }
330 0 : LocationMode::Secondary(_) => TenantStartupMode::Secondary,
331 : },
332 : ))
333 0 : })
334 0 : .collect()
335 0 : }
336 :
337 0 : async fn init_load_generations(
338 0 : conf: &'static PageServerConf,
339 0 : tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
340 0 : resources: &TenantSharedResources,
341 0 : cancel: &CancellationToken,
342 0 : ) -> anyhow::Result<Option<HashMap<TenantShardId, TenantStartupMode>>> {
343 0 : let generations = if conf.control_plane_emergency_mode {
344 0 : error!(
345 0 : "Emergency mode! Tenants will be attached unsafely using their last known generation"
346 0 : );
347 0 : emergency_generations(tenant_confs)
348 0 : } else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
349 0 : info!("Calling control plane API to re-attach tenants");
350 : // If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
351 0 : match client.re_attach(conf).await {
352 0 : Ok(tenants) => tenants
353 0 : .into_iter()
354 0 : .flat_map(|(id, rart)| {
355 0 : TenantStartupMode::from_reattach_tenant(rart).map(|tsm| (id, tsm))
356 0 : })
357 0 : .collect(),
358 : Err(RetryForeverError::ShuttingDown) => {
359 0 : anyhow::bail!("Shut down while waiting for control plane re-attach response")
360 : }
361 : }
362 : } else {
363 0 : info!("Control plane API not configured, tenant generations are disabled");
364 0 : return Ok(None);
365 : };
366 :
367 : // The deletion queue needs to know about the startup attachment state to decide which (if any) stored
368 : // deletion list entries may still be valid. We provide that by pushing a recovery operation into
369 : // the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
370 : // are processed, even though we don't block on recovery completing here.
371 : //
372 : // Must only do this if remote storage is enabled, otherwise deletion queue
373 : // is not running and channel push will fail.
374 0 : if resources.remote_storage.is_some() {
375 0 : let attached_tenants = generations
376 0 : .iter()
377 0 : .flat_map(|(id, start_mode)| {
378 0 : match start_mode {
379 0 : TenantStartupMode::Attached((_mode, generation)) => Some(generation),
380 0 : TenantStartupMode::Secondary => None,
381 : }
382 0 : .map(|gen| (*id, *gen))
383 0 : })
384 0 : .collect();
385 0 : resources.deletion_queue_client.recover(attached_tenants)?;
386 0 : }
387 :
388 0 : Ok(Some(generations))
389 0 : }
390 :
391 : /// Given a directory discovered in the pageserver's tenants/ directory, attempt
392 : /// to load a tenant config from it.
393 : ///
394 : /// If file is missing, return Ok(None)
395 0 : fn load_tenant_config(
396 0 : conf: &'static PageServerConf,
397 0 : dentry: Utf8DirEntry,
398 0 : ) -> anyhow::Result<Option<(TenantShardId, anyhow::Result<LocationConf>)>> {
399 0 : let tenant_dir_path = dentry.path().to_path_buf();
400 0 : if crate::is_temporary(&tenant_dir_path) {
401 0 : info!("Found temporary tenant directory, removing: {tenant_dir_path}");
402 : // No need to use safe_remove_tenant_dir_all because this is already
403 : // a temporary path
404 0 : if let Err(e) = std::fs::remove_dir_all(&tenant_dir_path) {
405 0 : error!(
406 0 : "Failed to remove temporary directory '{}': {:?}",
407 0 : tenant_dir_path, e
408 0 : );
409 0 : }
410 0 : return Ok(None);
411 0 : }
412 :
413 : // This case happens if we crash during attachment before writing a config into the dir
414 0 : let is_empty = tenant_dir_path
415 0 : .is_empty_dir()
416 0 : .with_context(|| format!("Failed to check whether {tenant_dir_path:?} is an empty dir"))?;
417 0 : if is_empty {
418 0 : info!("removing empty tenant directory {tenant_dir_path:?}");
419 0 : if let Err(e) = std::fs::remove_dir(&tenant_dir_path) {
420 0 : error!(
421 0 : "Failed to remove empty tenant directory '{}': {e:#}",
422 0 : tenant_dir_path
423 0 : )
424 0 : }
425 0 : return Ok(None);
426 0 : }
427 :
428 0 : let tenant_shard_id = match tenant_dir_path
429 0 : .file_name()
430 0 : .unwrap_or_default()
431 0 : .parse::<TenantShardId>()
432 : {
433 0 : Ok(id) => id,
434 : Err(_) => {
435 0 : warn!("Invalid tenant path (garbage in our repo directory?): {tenant_dir_path}",);
436 0 : return Ok(None);
437 : }
438 : };
439 :
440 : // Clean up legacy `metadata` files.
441 : // Doing it here because every single tenant directory is visited here.
442 : // In any later code, there's different treatment of tenant dirs
443 : // ... depending on whether the tenant is in re-attach response or not
444 : // ... epending on whether the tenant is ignored or not
445 0 : assert_eq!(
446 0 : &conf.tenant_path(&tenant_shard_id),
447 0 : &tenant_dir_path,
448 0 : "later use of conf....path() methods would be dubious"
449 : );
450 0 : let timelines: Vec<TimelineId> = match conf.timelines_path(&tenant_shard_id).read_dir_utf8() {
451 0 : Ok(iter) => {
452 0 : let mut timelines = Vec::new();
453 0 : for res in iter {
454 0 : let p = res?;
455 0 : let Some(timeline_id) = p.file_name().parse::<TimelineId>().ok() else {
456 : // skip any entries that aren't TimelineId, such as
457 : // - *.___temp dirs
458 : // - unfinished initdb uploads (test_non_uploaded_root_timeline_is_deleted_after_restart)
459 0 : continue;
460 : };
461 0 : timelines.push(timeline_id);
462 : }
463 0 : timelines
464 : }
465 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => vec![],
466 0 : Err(e) => return Err(anyhow::anyhow!(e)),
467 : };
468 0 : for timeline_id in timelines {
469 0 : let timeline_path = &conf.timeline_path(&tenant_shard_id, &timeline_id);
470 0 : let metadata_path = timeline_path.join(METADATA_FILE_NAME);
471 0 : match std::fs::remove_file(&metadata_path) {
472 : Ok(()) => {
473 0 : crashsafe::fsync(timeline_path)
474 0 : .context("fsync timeline dir after removing legacy metadata file")?;
475 0 : info!("removed legacy metadata file at {metadata_path}");
476 : }
477 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
478 0 : // something removed the file earlier, or it was never there
479 0 : // We don't care, this software version doesn't write it again, so, we're good.
480 0 : }
481 0 : Err(e) => {
482 0 : anyhow::bail!("remove legacy metadata file: {e}: {metadata_path}");
483 : }
484 : }
485 : }
486 :
487 0 : let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
488 0 : if tenant_ignore_mark_file.exists() {
489 0 : info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
490 0 : return Ok(None);
491 0 : }
492 0 :
493 0 : Ok(Some((
494 0 : tenant_shard_id,
495 0 : Tenant::load_tenant_config(conf, &tenant_shard_id),
496 0 : )))
497 0 : }
498 :
499 : /// Initial stage of load: walk the local tenants directory, clean up any temp files,
500 : /// and load configurations for the tenants we found.
501 : ///
502 : /// Do this in parallel, because we expect 10k+ tenants, so serial execution can take
503 : /// seconds even on reasonably fast drives.
504 0 : async fn init_load_tenant_configs(
505 0 : conf: &'static PageServerConf,
506 0 : ) -> anyhow::Result<HashMap<TenantShardId, anyhow::Result<LocationConf>>> {
507 0 : let tenants_dir = conf.tenants_path();
508 :
509 0 : let dentries = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<Utf8DirEntry>> {
510 0 : let dir_entries = tenants_dir
511 0 : .read_dir_utf8()
512 0 : .with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
513 :
514 0 : Ok(dir_entries.collect::<Result<Vec<_>, std::io::Error>>()?)
515 0 : })
516 0 : .await??;
517 :
518 0 : let mut configs = HashMap::new();
519 0 :
520 0 : let mut join_set = JoinSet::new();
521 0 : for dentry in dentries {
522 0 : join_set.spawn_blocking(move || load_tenant_config(conf, dentry));
523 0 : }
524 :
525 0 : while let Some(r) = join_set.join_next().await {
526 0 : if let Some((tenant_id, tenant_config)) = r?? {
527 0 : configs.insert(tenant_id, tenant_config);
528 0 : }
529 : }
530 :
531 0 : Ok(configs)
532 0 : }
533 :
534 : /// Initialize repositories with locally available timelines.
535 : /// Timelines that are only partially available locally (remote storage has more data than this pageserver)
536 : /// are scheduled for download and added to the tenant once download is completed.
537 0 : #[instrument(skip_all)]
538 : pub async fn init_tenant_mgr(
539 : conf: &'static PageServerConf,
540 : resources: TenantSharedResources,
541 : init_order: InitializationOrder,
542 : cancel: CancellationToken,
543 : ) -> anyhow::Result<TenantManager> {
544 : let mut tenants = BTreeMap::new();
545 :
546 : let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
547 :
548 : // Initialize dynamic limits that depend on system resources
549 : let system_memory =
550 : sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory())
551 : .total_memory();
552 : let max_ephemeral_layer_bytes =
553 : conf.ephemeral_bytes_per_memory_kb as u64 * (system_memory / 1024);
554 0 : tracing::info!("Initialized ephemeral layer size limit to {max_ephemeral_layer_bytes}, for {system_memory} bytes of memory");
555 : inmemory_layer::GLOBAL_RESOURCES.max_dirty_bytes.store(
556 : max_ephemeral_layer_bytes,
557 : std::sync::atomic::Ordering::Relaxed,
558 : );
559 :
560 : // Scan local filesystem for attached tenants
561 : let tenant_configs = init_load_tenant_configs(conf).await?;
562 :
563 : // Determine which tenants are to be secondary or attached, and in which generation
564 : let tenant_modes = init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
565 :
566 0 : tracing::info!(
567 0 : "Attaching {} tenants at startup, warming up {} at a time",
568 0 : tenant_configs.len(),
569 0 : conf.concurrent_tenant_warmup.initial_permits()
570 0 : );
571 : TENANT.startup_scheduled.inc_by(tenant_configs.len() as u64);
572 :
573 : // Construct `Tenant` objects and start them running
574 : for (tenant_shard_id, location_conf) in tenant_configs {
575 : let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
576 :
577 : let mut location_conf = match location_conf {
578 : Ok(l) => l,
579 : Err(e) => {
580 0 : warn!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Marking tenant broken, failed to {e:#}");
581 :
582 : tenants.insert(
583 : tenant_shard_id,
584 : TenantSlot::Attached(Tenant::create_broken_tenant(
585 : conf,
586 : tenant_shard_id,
587 : format!("{}", e),
588 : )),
589 : );
590 : continue;
591 : }
592 : };
593 :
594 : // FIXME: if we were attached, and get demoted to secondary on re-attach, we
595 : // don't have a place to get a config.
596 : // (https://github.com/neondatabase/neon/issues/5377)
597 : const DEFAULT_SECONDARY_CONF: SecondaryLocationConfig =
598 : SecondaryLocationConfig { warm: true };
599 :
600 : // Update the location config according to the re-attach response
601 : if let Some(tenant_modes) = &tenant_modes {
602 : // We have a generation map: treat it as the authority for whether
603 : // this tenant is really attached.
604 : match tenant_modes.get(&tenant_shard_id) {
605 : None => {
606 0 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Detaching tenant, control plane omitted it in re-attach response");
607 : if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await {
608 0 : error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
609 0 : "Failed to remove detached tenant directory '{tenant_dir_path}': {e:?}",
610 0 : );
611 : }
612 :
613 : // We deleted local content: move on to next tenant, don't try and spawn this one.
614 : continue;
615 : }
616 : Some(TenantStartupMode::Secondary) => {
617 : if !matches!(location_conf.mode, LocationMode::Secondary(_)) {
618 : location_conf.mode = LocationMode::Secondary(DEFAULT_SECONDARY_CONF);
619 : }
620 : }
621 : Some(TenantStartupMode::Attached((attach_mode, generation))) => {
622 : let old_gen_higher = match &location_conf.mode {
623 : LocationMode::Attached(AttachedLocationConfig {
624 : generation: old_generation,
625 : attach_mode: _attach_mode,
626 : }) => {
627 : if old_generation > generation {
628 : Some(old_generation)
629 : } else {
630 : None
631 : }
632 : }
633 : _ => None,
634 : };
635 : if let Some(old_generation) = old_gen_higher {
636 0 : tracing::error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
637 0 : "Control plane gave decreasing generation ({generation:?}) in re-attach response for tenant that was attached in generation {:?}, demoting to secondary",
638 0 : old_generation
639 0 : );
640 :
641 : // We cannot safely attach this tenant given a bogus generation number, but let's avoid throwing away
642 : // local disk content: demote to secondary rather than detaching.
643 : location_conf.mode = LocationMode::Secondary(DEFAULT_SECONDARY_CONF);
644 : } else {
645 : location_conf.attach_in_generation(*attach_mode, *generation);
646 : }
647 : }
648 : }
649 : } else {
650 : // Legacy mode: no generation information, any tenant present
651 : // on local disk may activate
652 0 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Starting tenant in legacy mode, no generation",);
653 : };
654 :
655 : // Presence of a generation number implies attachment: attach the tenant
656 : // if it wasn't already, and apply the generation number.
657 : Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
658 :
659 : let shard_identity = location_conf.shard;
660 : let slot = match location_conf.mode {
661 : LocationMode::Attached(attached_conf) => {
662 : match tenant_spawn(
663 : conf,
664 : tenant_shard_id,
665 : &tenant_dir_path,
666 : resources.clone(),
667 : AttachedTenantConf::new(location_conf.tenant_conf, attached_conf),
668 : shard_identity,
669 : Some(init_order.clone()),
670 : &TENANTS,
671 : SpawnMode::Lazy,
672 : &ctx,
673 : ) {
674 : Ok(tenant) => TenantSlot::Attached(tenant),
675 : Err(e) => {
676 0 : error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to start tenant: {e:#}");
677 : continue;
678 : }
679 : }
680 : }
681 : LocationMode::Secondary(secondary_conf) => {
682 0 : info!(
683 0 : tenant_id = %tenant_shard_id.tenant_id,
684 0 : shard_id = %tenant_shard_id.shard_slug(),
685 0 : "Starting secondary tenant"
686 0 : );
687 : TenantSlot::Secondary(SecondaryTenant::new(
688 : tenant_shard_id,
689 : shard_identity,
690 : location_conf.tenant_conf,
691 : &secondary_conf,
692 : ))
693 : }
694 : };
695 :
696 : tenants.insert(tenant_shard_id, slot);
697 : }
698 :
699 0 : info!("Processed {} local tenants at startup", tenants.len());
700 :
701 : let mut tenants_map = TENANTS.write().unwrap();
702 : assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
703 : METRICS.tenant_slots.set(tenants.len() as u64);
704 : *tenants_map = TenantsMap::Open(tenants);
705 :
706 : Ok(TenantManager {
707 : conf,
708 : tenants: &TENANTS,
709 : resources,
710 : cancel: CancellationToken::new(),
711 : })
712 : }
713 :
714 : /// Wrapper for Tenant::spawn that checks invariants before running, and inserts
715 : /// a broken tenant in the map if Tenant::spawn fails.
716 : #[allow(clippy::too_many_arguments)]
717 0 : fn tenant_spawn(
718 0 : conf: &'static PageServerConf,
719 0 : tenant_shard_id: TenantShardId,
720 0 : tenant_path: &Utf8Path,
721 0 : resources: TenantSharedResources,
722 0 : location_conf: AttachedTenantConf,
723 0 : shard_identity: ShardIdentity,
724 0 : init_order: Option<InitializationOrder>,
725 0 : tenants: &'static std::sync::RwLock<TenantsMap>,
726 0 : mode: SpawnMode,
727 0 : ctx: &RequestContext,
728 0 : ) -> anyhow::Result<Arc<Tenant>> {
729 0 : anyhow::ensure!(
730 0 : tenant_path.is_dir(),
731 0 : "Cannot load tenant from path {tenant_path:?}, it either does not exist or not a directory"
732 : );
733 0 : anyhow::ensure!(
734 0 : !crate::is_temporary(tenant_path),
735 0 : "Cannot load tenant from temporary path {tenant_path:?}"
736 : );
737 0 : anyhow::ensure!(
738 0 : !tenant_path.is_empty_dir().with_context(|| {
739 0 : format!("Failed to check whether {tenant_path:?} is an empty dir")
740 0 : })?,
741 0 : "Cannot load tenant from empty directory {tenant_path:?}"
742 : );
743 :
744 0 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
745 0 : anyhow::ensure!(
746 0 : !conf.tenant_ignore_mark_file_path(&tenant_shard_id).exists(),
747 0 : "Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
748 : );
749 :
750 0 : let tenant = match Tenant::spawn(
751 0 : conf,
752 0 : tenant_shard_id,
753 0 : resources,
754 0 : location_conf,
755 0 : shard_identity,
756 0 : init_order,
757 0 : tenants,
758 0 : mode,
759 0 : ctx,
760 0 : ) {
761 0 : Ok(tenant) => tenant,
762 0 : Err(e) => {
763 0 : error!("Failed to spawn tenant {tenant_shard_id}, reason: {e:#}");
764 0 : Tenant::create_broken_tenant(conf, tenant_shard_id, format!("{e:#}"))
765 : }
766 : };
767 :
768 0 : Ok(tenant)
769 0 : }
770 :
771 2 : async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
772 2 : let mut join_set = JoinSet::new();
773 :
774 : // Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
775 2 : let (total_in_progress, total_attached) = {
776 2 : let mut m = tenants.write().unwrap();
777 2 : match &mut *m {
778 : TenantsMap::Initializing => {
779 0 : *m = TenantsMap::ShuttingDown(BTreeMap::default());
780 0 : info!("tenants map is empty");
781 0 : return;
782 : }
783 2 : TenantsMap::Open(tenants) => {
784 2 : let mut shutdown_state = BTreeMap::new();
785 2 : let mut total_in_progress = 0;
786 2 : let mut total_attached = 0;
787 :
788 2 : for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
789 2 : match v {
790 0 : TenantSlot::Attached(t) => {
791 0 : shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
792 0 : join_set.spawn(
793 0 : async move {
794 0 : let res = {
795 0 : let (_guard, shutdown_progress) = completion::channel();
796 0 : t.shutdown(shutdown_progress, ShutdownMode::FreezeAndFlush).await
797 : };
798 :
799 0 : if let Err(other_progress) = res {
800 : // join the another shutdown in progress
801 0 : other_progress.wait().await;
802 0 : }
803 :
804 : // we cannot afford per tenant logging here, because if s3 is degraded, we are
805 : // going to log too many lines
806 0 : debug!("tenant successfully stopped");
807 0 : }
808 0 : .instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())),
809 : );
810 :
811 0 : total_attached += 1;
812 : }
813 0 : TenantSlot::Secondary(state) => {
814 0 : // We don't need to wait for this individually per-tenant: the
815 0 : // downloader task will be waited on eventually, this cancel
816 0 : // is just to encourage it to drop out if it is doing work
817 0 : // for this tenant right now.
818 0 : state.cancel.cancel();
819 0 :
820 0 : shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state));
821 0 : }
822 2 : TenantSlot::InProgress(notify) => {
823 2 : // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
824 2 : // wait for their notifications to fire in this function.
825 2 : join_set.spawn(async move {
826 2 : notify.wait().await;
827 2 : });
828 2 :
829 2 : total_in_progress += 1;
830 2 : }
831 : }
832 : }
833 2 : *m = TenantsMap::ShuttingDown(shutdown_state);
834 2 : (total_in_progress, total_attached)
835 : }
836 : TenantsMap::ShuttingDown(_) => {
837 0 : error!("already shutting down, this function isn't supposed to be called more than once");
838 0 : return;
839 : }
840 : }
841 : };
842 :
843 2 : let started_at = std::time::Instant::now();
844 2 :
845 2 : info!(
846 2 : "Waiting for {} InProgress tenants and {} Attached tenants to shut down",
847 2 : total_in_progress, total_attached
848 2 : );
849 :
850 2 : let total = join_set.len();
851 2 : let mut panicked = 0;
852 2 : let mut buffering = true;
853 2 : const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
854 2 : let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
855 :
856 6 : while !join_set.is_empty() {
857 8 : tokio::select! {
858 2 : Some(joined) = join_set.join_next() => {
859 : match joined {
860 : Ok(()) => {},
861 : Err(join_error) if join_error.is_cancelled() => {
862 : unreachable!("we are not cancelling any of the tasks");
863 : }
864 : Err(join_error) if join_error.is_panic() => {
865 : // cannot really do anything, as this panic is likely a bug
866 : panicked += 1;
867 : }
868 : Err(join_error) => {
869 0 : warn!("unknown kind of JoinError: {join_error}");
870 : }
871 : }
872 : if !buffering {
873 : // buffer so that every 500ms since the first update (or starting) we'll log
874 : // how far away we are; this is because we will get SIGKILL'd at 10s, and we
875 : // are not able to log *then*.
876 : buffering = true;
877 : buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
878 : }
879 : },
880 : _ = &mut buffered, if buffering => {
881 : buffering = false;
882 2 : info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
883 : }
884 : }
885 : }
886 :
887 2 : if panicked > 0 {
888 0 : warn!(
889 0 : panicked,
890 0 : total, "observed panicks while shutting down tenants"
891 0 : );
892 2 : }
893 :
894 : // caller will log how long we took
895 2 : }
896 :
897 0 : #[derive(thiserror::Error, Debug)]
898 : pub(crate) enum UpsertLocationError {
899 : #[error("Bad config request: {0}")]
900 : BadRequest(anyhow::Error),
901 :
902 : #[error("Cannot change config in this state: {0}")]
903 : Unavailable(#[from] TenantMapError),
904 :
905 : #[error("Tenant is already being modified")]
906 : InProgress,
907 :
908 : #[error("Failed to flush: {0}")]
909 : Flush(anyhow::Error),
910 :
911 : #[error("Internal error: {0}")]
912 : Other(#[from] anyhow::Error),
913 : }
914 :
915 : impl TenantManager {
916 : /// Convenience function so that anyone with a TenantManager can get at the global configuration, without
917 : /// having to pass it around everywhere as a separate object.
918 0 : pub(crate) fn get_conf(&self) -> &'static PageServerConf {
919 0 : self.conf
920 0 : }
921 :
922 : /// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or currently
923 : /// undergoing a state change (i.e. slot is InProgress).
924 : ///
925 : /// The return Tenant is not guaranteed to be active: check its status after obtaing it, or
926 : /// use [`Tenant::wait_to_become_active`] before using it if you will do I/O on it.
927 0 : pub(crate) fn get_attached_tenant_shard(
928 0 : &self,
929 0 : tenant_shard_id: TenantShardId,
930 0 : ) -> Result<Arc<Tenant>, GetTenantError> {
931 0 : let locked = self.tenants.read().unwrap();
932 :
933 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
934 :
935 0 : match peek_slot {
936 0 : Some(TenantSlot::Attached(tenant)) => Ok(Arc::clone(tenant)),
937 0 : Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
938 : None | Some(TenantSlot::Secondary(_)) => {
939 0 : Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
940 : }
941 : }
942 0 : }
943 :
944 0 : pub(crate) fn get_secondary_tenant_shard(
945 0 : &self,
946 0 : tenant_shard_id: TenantShardId,
947 0 : ) -> Option<Arc<SecondaryTenant>> {
948 0 : let locked = self.tenants.read().unwrap();
949 0 :
950 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
951 0 : .ok()
952 0 : .flatten();
953 :
954 0 : match peek_slot {
955 0 : Some(TenantSlot::Secondary(s)) => Some(s.clone()),
956 0 : _ => None,
957 : }
958 0 : }
959 :
960 : /// Whether the `TenantManager` is responsible for the tenant shard
961 0 : pub(crate) fn manages_tenant_shard(&self, tenant_shard_id: TenantShardId) -> bool {
962 0 : let locked = self.tenants.read().unwrap();
963 0 :
964 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
965 0 : .ok()
966 0 : .flatten();
967 0 :
968 0 : peek_slot.is_some()
969 0 : }
970 :
971 0 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
972 : pub(crate) async fn upsert_location(
973 : &self,
974 : tenant_shard_id: TenantShardId,
975 : new_location_config: LocationConf,
976 : flush: Option<Duration>,
977 : mut spawn_mode: SpawnMode,
978 : ctx: &RequestContext,
979 : ) -> Result<Option<Arc<Tenant>>, UpsertLocationError> {
980 : debug_assert_current_span_has_tenant_id();
981 0 : info!("configuring tenant location to state {new_location_config:?}");
982 :
983 : enum FastPathModified {
984 : Attached(Arc<Tenant>),
985 : Secondary(Arc<SecondaryTenant>),
986 : }
987 :
988 : // Special case fast-path for updates to existing slots: if our upsert is only updating configuration,
989 : // then we do not need to set the slot to InProgress, we can just call into the
990 : // existng tenant.
991 : let fast_path_taken = {
992 : let locked = self.tenants.read().unwrap();
993 : let peek_slot =
994 : tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
995 : match (&new_location_config.mode, peek_slot) {
996 : (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
997 : match attach_conf.generation.cmp(&tenant.generation) {
998 : Ordering::Equal => {
999 : // A transition from Attached to Attached in the same generation, we may
1000 : // take our fast path and just provide the updated configuration
1001 : // to the tenant.
1002 : tenant.set_new_location_config(
1003 : AttachedTenantConf::try_from(new_location_config.clone())
1004 : .map_err(UpsertLocationError::BadRequest)?,
1005 : );
1006 :
1007 : Some(FastPathModified::Attached(tenant.clone()))
1008 : }
1009 : Ordering::Less => {
1010 : return Err(UpsertLocationError::BadRequest(anyhow::anyhow!(
1011 : "Generation {:?} is less than existing {:?}",
1012 : attach_conf.generation,
1013 : tenant.generation
1014 : )));
1015 : }
1016 : Ordering::Greater => {
1017 : // Generation advanced, fall through to general case of replacing `Tenant` object
1018 : None
1019 : }
1020 : }
1021 : }
1022 : (
1023 : LocationMode::Secondary(secondary_conf),
1024 : Some(TenantSlot::Secondary(secondary_tenant)),
1025 : ) => {
1026 : secondary_tenant.set_config(secondary_conf);
1027 : secondary_tenant.set_tenant_conf(&new_location_config.tenant_conf);
1028 : Some(FastPathModified::Secondary(secondary_tenant.clone()))
1029 : }
1030 : _ => {
1031 : // Not an Attached->Attached transition, fall through to general case
1032 : None
1033 : }
1034 : }
1035 : };
1036 :
1037 : // Fast-path continued: having dropped out of the self.tenants lock, do the async
1038 : // phase of writing config and/or waiting for flush, before returning.
1039 : match fast_path_taken {
1040 : Some(FastPathModified::Attached(tenant)) => {
1041 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
1042 : .await?;
1043 :
1044 : // Transition to AttachedStale means we may well hold a valid generation
1045 : // still, and have been requested to go stale as part of a migration. If
1046 : // the caller set `flush`, then flush to remote storage.
1047 : if let LocationMode::Attached(AttachedLocationConfig {
1048 : generation: _,
1049 : attach_mode: AttachmentMode::Stale,
1050 : }) = &new_location_config.mode
1051 : {
1052 : if let Some(flush_timeout) = flush {
1053 : match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
1054 : Ok(Err(e)) => {
1055 : return Err(UpsertLocationError::Flush(e));
1056 : }
1057 : Ok(Ok(_)) => return Ok(Some(tenant)),
1058 : Err(_) => {
1059 0 : tracing::warn!(
1060 0 : timeout_ms = flush_timeout.as_millis(),
1061 0 : "Timed out waiting for flush to remote storage, proceeding anyway."
1062 0 : )
1063 : }
1064 : }
1065 : }
1066 : }
1067 :
1068 : return Ok(Some(tenant));
1069 : }
1070 : Some(FastPathModified::Secondary(_secondary_tenant)) => {
1071 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
1072 : .await?;
1073 :
1074 : return Ok(None);
1075 : }
1076 : None => {
1077 : // Proceed with the general case procedure, where we will shutdown & remove any existing
1078 : // slot contents and replace with a fresh one
1079 : }
1080 : };
1081 :
1082 : // General case for upserts to TenantsMap, excluding the case above: we will substitute an
1083 : // InProgress value to the slot while we make whatever changes are required. The state for
1084 : // the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
1085 : // the state is ill-defined while we're in transition. Transitions are async, but fast: we do
1086 : // not do significant I/O, and shutdowns should be prompt via cancellation tokens.
1087 : let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)
1088 0 : .map_err(|e| match e {
1089 : TenantSlotError::AlreadyExists(_, _) | TenantSlotError::NotFound(_) => {
1090 0 : unreachable!("Called with mode Any")
1091 : }
1092 0 : TenantSlotError::InProgress => UpsertLocationError::InProgress,
1093 0 : TenantSlotError::MapState(s) => UpsertLocationError::Unavailable(s),
1094 0 : })?;
1095 :
1096 : match slot_guard.get_old_value() {
1097 : Some(TenantSlot::Attached(tenant)) => {
1098 : // The case where we keep a Tenant alive was covered above in the special case
1099 : // for Attached->Attached transitions in the same generation. By this point,
1100 : // if we see an attached tenant we know it will be discarded and should be
1101 : // shut down.
1102 : let (_guard, progress) = utils::completion::channel();
1103 :
1104 : match tenant.get_attach_mode() {
1105 : AttachmentMode::Single | AttachmentMode::Multi => {
1106 : // Before we leave our state as the presumed holder of the latest generation,
1107 : // flush any outstanding deletions to reduce the risk of leaking objects.
1108 : self.resources.deletion_queue_client.flush_advisory()
1109 : }
1110 : AttachmentMode::Stale => {
1111 : // If we're stale there's not point trying to flush deletions
1112 : }
1113 : };
1114 :
1115 0 : info!("Shutting down attached tenant");
1116 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1117 : Ok(()) => {}
1118 : Err(barrier) => {
1119 0 : info!("Shutdown already in progress, waiting for it to complete");
1120 : barrier.wait().await;
1121 : }
1122 : }
1123 : slot_guard.drop_old_value().expect("We just shut it down");
1124 :
1125 : // Edge case: if we were called with SpawnMode::Create, but a Tenant already existed, then
1126 : // the caller thinks they're creating but the tenant already existed. We must switch to
1127 : // Eager mode so that when starting this Tenant we properly probe remote storage for timelines,
1128 : // rather than assuming it to be empty.
1129 : spawn_mode = SpawnMode::Eager;
1130 : }
1131 : Some(TenantSlot::Secondary(state)) => {
1132 0 : info!("Shutting down secondary tenant");
1133 : state.shutdown().await;
1134 : }
1135 : Some(TenantSlot::InProgress(_)) => {
1136 : // This should never happen: acquire_slot should error out
1137 : // if the contents of a slot were InProgress.
1138 : return Err(UpsertLocationError::Other(anyhow::anyhow!(
1139 : "Acquired an InProgress slot, this is a bug."
1140 : )));
1141 : }
1142 : None => {
1143 : // Slot was vacant, nothing needs shutting down.
1144 : }
1145 : }
1146 :
1147 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1148 : let timelines_path = self.conf.timelines_path(&tenant_shard_id);
1149 :
1150 : // Directory structure is the same for attached and secondary modes:
1151 : // create it if it doesn't exist. Timeline load/creation expects the
1152 : // timelines/ subdir to already exist.
1153 : //
1154 : // Does not need to be fsync'd because local storage is just a cache.
1155 : tokio::fs::create_dir_all(&timelines_path)
1156 : .await
1157 0 : .with_context(|| format!("Creating {timelines_path}"))?;
1158 :
1159 : // Before activating either secondary or attached mode, persist the
1160 : // configuration, so that on restart we will re-attach (or re-start
1161 : // secondary) on the tenant.
1162 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config).await?;
1163 :
1164 : let new_slot = match &new_location_config.mode {
1165 : LocationMode::Secondary(secondary_config) => {
1166 : let shard_identity = new_location_config.shard;
1167 : TenantSlot::Secondary(SecondaryTenant::new(
1168 : tenant_shard_id,
1169 : shard_identity,
1170 : new_location_config.tenant_conf,
1171 : secondary_config,
1172 : ))
1173 : }
1174 : LocationMode::Attached(_attach_config) => {
1175 : let shard_identity = new_location_config.shard;
1176 :
1177 : // Testing hack: if we are configured with no control plane, then drop the generation
1178 : // from upserts. This enables creating generation-less tenants even though neon_local
1179 : // always uses generations when calling the location conf API.
1180 : let attached_conf = if cfg!(feature = "testing") {
1181 : let mut conf = AttachedTenantConf::try_from(new_location_config)?;
1182 : if self.conf.control_plane_api.is_none() {
1183 : conf.location.generation = Generation::none();
1184 : }
1185 : conf
1186 : } else {
1187 : AttachedTenantConf::try_from(new_location_config)?
1188 : };
1189 :
1190 : let tenant = tenant_spawn(
1191 : self.conf,
1192 : tenant_shard_id,
1193 : &tenant_path,
1194 : self.resources.clone(),
1195 : attached_conf,
1196 : shard_identity,
1197 : None,
1198 : self.tenants,
1199 : spawn_mode,
1200 : ctx,
1201 : )?;
1202 :
1203 : TenantSlot::Attached(tenant)
1204 : }
1205 : };
1206 :
1207 : let attached_tenant = if let TenantSlot::Attached(tenant) = &new_slot {
1208 : Some(tenant.clone())
1209 : } else {
1210 : None
1211 : };
1212 :
1213 : match slot_guard.upsert(new_slot) {
1214 : Err(TenantSlotUpsertError::InternalError(e)) => {
1215 : Err(UpsertLocationError::Other(anyhow::anyhow!(e)))
1216 : }
1217 : Err(TenantSlotUpsertError::MapState(e)) => Err(UpsertLocationError::Unavailable(e)),
1218 : Err(TenantSlotUpsertError::ShuttingDown((new_slot, _completion))) => {
1219 : // If we just called tenant_spawn() on a new tenant, and can't insert it into our map, then
1220 : // we must not leak it: this would violate the invariant that after shutdown_all_tenants, all tenants
1221 : // are shutdown.
1222 : //
1223 : // We must shut it down inline here.
1224 : match new_slot {
1225 : TenantSlot::InProgress(_) => {
1226 : // Unreachable because we never insert an InProgress
1227 : unreachable!()
1228 : }
1229 : TenantSlot::Attached(tenant) => {
1230 : let (_guard, progress) = utils::completion::channel();
1231 0 : info!("Shutting down just-spawned tenant, because tenant manager is shut down");
1232 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1233 : Ok(()) => {
1234 0 : info!("Finished shutting down just-spawned tenant");
1235 : }
1236 : Err(barrier) => {
1237 0 : info!("Shutdown already in progress, waiting for it to complete");
1238 : barrier.wait().await;
1239 : }
1240 : }
1241 : }
1242 : TenantSlot::Secondary(secondary_tenant) => {
1243 : secondary_tenant.shutdown().await;
1244 : }
1245 : }
1246 :
1247 : Err(UpsertLocationError::Unavailable(
1248 : TenantMapError::ShuttingDown,
1249 : ))
1250 : }
1251 : Ok(()) => Ok(attached_tenant),
1252 : }
1253 : }
1254 :
1255 : /// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
1256 : /// LocationConf that was last used to attach it. Optionally, the local file cache may be
1257 : /// dropped before re-attaching.
1258 : ///
1259 : /// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
1260 : /// where an issue is identified that would go away with a restart of the tenant.
1261 : ///
1262 : /// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
1263 : /// to respect the cancellation tokens used in normal shutdown().
1264 0 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
1265 : pub(crate) async fn reset_tenant(
1266 : &self,
1267 : tenant_shard_id: TenantShardId,
1268 : drop_cache: bool,
1269 : ctx: &RequestContext,
1270 : ) -> anyhow::Result<()> {
1271 : let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1272 : let Some(old_slot) = slot_guard.get_old_value() else {
1273 : anyhow::bail!("Tenant not found when trying to reset");
1274 : };
1275 :
1276 : let Some(tenant) = old_slot.get_attached() else {
1277 : slot_guard.revert();
1278 : anyhow::bail!("Tenant is not in attached state");
1279 : };
1280 :
1281 : let (_guard, progress) = utils::completion::channel();
1282 : match tenant.shutdown(progress, ShutdownMode::Hard).await {
1283 : Ok(()) => {
1284 : slot_guard.drop_old_value()?;
1285 : }
1286 : Err(_barrier) => {
1287 : slot_guard.revert();
1288 : anyhow::bail!("Cannot reset Tenant, already shutting down");
1289 : }
1290 : }
1291 :
1292 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1293 : let timelines_path = self.conf.timelines_path(&tenant_shard_id);
1294 : let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
1295 :
1296 : if drop_cache {
1297 0 : tracing::info!("Dropping local file cache");
1298 :
1299 : match tokio::fs::read_dir(&timelines_path).await {
1300 : Err(e) => {
1301 0 : tracing::warn!("Failed to list timelines while dropping cache: {}", e);
1302 : }
1303 : Ok(mut entries) => {
1304 : while let Some(entry) = entries.next_entry().await? {
1305 : tokio::fs::remove_dir_all(entry.path()).await?;
1306 : }
1307 : }
1308 : }
1309 : }
1310 :
1311 : let shard_identity = config.shard;
1312 : let tenant = tenant_spawn(
1313 : self.conf,
1314 : tenant_shard_id,
1315 : &tenant_path,
1316 : self.resources.clone(),
1317 : AttachedTenantConf::try_from(config)?,
1318 : shard_identity,
1319 : None,
1320 : self.tenants,
1321 : SpawnMode::Eager,
1322 : ctx,
1323 : )?;
1324 :
1325 : slot_guard.upsert(TenantSlot::Attached(tenant))?;
1326 :
1327 : Ok(())
1328 : }
1329 :
1330 0 : pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec<Arc<Tenant>> {
1331 0 : let locked = self.tenants.read().unwrap();
1332 0 : match &*locked {
1333 0 : TenantsMap::Initializing => Vec::new(),
1334 0 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => map
1335 0 : .values()
1336 0 : .filter_map(|slot| {
1337 0 : slot.get_attached()
1338 0 : .and_then(|t| if t.is_active() { Some(t.clone()) } else { None })
1339 0 : })
1340 0 : .collect(),
1341 : }
1342 0 : }
1343 : // Do some synchronous work for all tenant slots in Secondary state. The provided
1344 : // callback should be small and fast, as it will be called inside the global
1345 : // TenantsMap lock.
1346 0 : pub(crate) fn foreach_secondary_tenants<F>(&self, mut func: F)
1347 0 : where
1348 0 : // TODO: let the callback return a hint to drop out of the loop early
1349 0 : F: FnMut(&TenantShardId, &Arc<SecondaryTenant>),
1350 0 : {
1351 0 : let locked = self.tenants.read().unwrap();
1352 :
1353 0 : let map = match &*locked {
1354 0 : TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
1355 0 : TenantsMap::Open(m) => m,
1356 : };
1357 :
1358 0 : for (tenant_id, slot) in map {
1359 0 : if let TenantSlot::Secondary(state) = slot {
1360 : // Only expose secondary tenants that are not currently shutting down
1361 0 : if !state.cancel.is_cancelled() {
1362 0 : func(tenant_id, state)
1363 0 : }
1364 0 : }
1365 : }
1366 0 : }
1367 :
1368 : /// Total list of all tenant slots: this includes attached, secondary, and InProgress.
1369 0 : pub(crate) fn list(&self) -> Vec<(TenantShardId, TenantSlot)> {
1370 0 : let locked = self.tenants.read().unwrap();
1371 0 : match &*locked {
1372 0 : TenantsMap::Initializing => Vec::new(),
1373 0 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
1374 0 : map.iter().map(|(k, v)| (*k, v.clone())).collect()
1375 : }
1376 : }
1377 0 : }
1378 :
1379 0 : pub(crate) fn get(&self, tenant_shard_id: TenantShardId) -> Option<TenantSlot> {
1380 0 : let locked = self.tenants.read().unwrap();
1381 0 : match &*locked {
1382 0 : TenantsMap::Initializing => None,
1383 0 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
1384 0 : map.get(&tenant_shard_id).cloned()
1385 : }
1386 : }
1387 0 : }
1388 :
1389 0 : pub(crate) async fn delete_tenant(
1390 0 : &self,
1391 0 : tenant_shard_id: TenantShardId,
1392 0 : activation_timeout: Duration,
1393 0 : ) -> Result<(), DeleteTenantError> {
1394 0 : super::span::debug_assert_current_span_has_tenant_id();
1395 : // We acquire a SlotGuard during this function to protect against concurrent
1396 : // changes while the ::prepare phase of DeleteTenantFlow executes, but then
1397 : // have to return the Tenant to the map while the background deletion runs.
1398 : //
1399 : // TODO: refactor deletion to happen outside the lifetime of a Tenant.
1400 : // Currently, deletion requires a reference to the tenants map in order to
1401 : // keep the Tenant in the map until deletion is complete, and then remove
1402 : // it at the end.
1403 : //
1404 : // See https://github.com/neondatabase/neon/issues/5080
1405 :
1406 0 : let slot_guard =
1407 0 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
1408 :
1409 : // unwrap is safe because we used MustExist mode when acquiring
1410 0 : let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
1411 0 : TenantSlot::Attached(tenant) => tenant.clone(),
1412 : _ => {
1413 : // Express "not attached" as equivalent to "not found"
1414 0 : return Err(DeleteTenantError::NotAttached);
1415 : }
1416 : };
1417 :
1418 0 : match tenant.current_state() {
1419 : TenantState::Broken { .. } | TenantState::Stopping { .. } => {
1420 : // If deletion is already in progress, return success (the semantics of this
1421 : // function are to rerturn success afterr deletion is spawned in background).
1422 : // Otherwise fall through and let [`DeleteTenantFlow`] handle this state.
1423 0 : if DeleteTenantFlow::is_in_progress(&tenant) {
1424 : // The `delete_progress` lock is held: deletion is already happening
1425 : // in the bacckground
1426 0 : slot_guard.revert();
1427 0 : return Ok(());
1428 0 : }
1429 : }
1430 : _ => {
1431 0 : tenant
1432 0 : .wait_to_become_active(activation_timeout)
1433 0 : .await
1434 0 : .map_err(|e| match e {
1435 : GetActiveTenantError::WillNotBecomeActive(_)
1436 : | GetActiveTenantError::Broken(_) => {
1437 0 : DeleteTenantError::InvalidState(tenant.current_state())
1438 : }
1439 0 : GetActiveTenantError::Cancelled => DeleteTenantError::Cancelled,
1440 0 : GetActiveTenantError::NotFound(_) => DeleteTenantError::NotAttached,
1441 : GetActiveTenantError::WaitForActiveTimeout {
1442 0 : latest_state: _latest_state,
1443 0 : wait_time: _wait_time,
1444 0 : } => DeleteTenantError::InvalidState(tenant.current_state()),
1445 0 : })?;
1446 : }
1447 : }
1448 :
1449 0 : let result = DeleteTenantFlow::run(
1450 0 : self.conf,
1451 0 : self.resources.remote_storage.clone(),
1452 0 : &TENANTS,
1453 0 : tenant,
1454 0 : &self.cancel,
1455 0 : )
1456 0 : .await;
1457 :
1458 : // The Tenant goes back into the map in Stopping state, it will eventually be removed by DeleteTenantFLow
1459 0 : slot_guard.revert();
1460 0 : result
1461 0 : }
1462 :
1463 0 : #[instrument(skip_all, fields(tenant_id=%tenant.get_tenant_shard_id().tenant_id, shard_id=%tenant.get_tenant_shard_id().shard_slug(), new_shard_count=%new_shard_count.literal()))]
1464 : pub(crate) async fn shard_split(
1465 : &self,
1466 : tenant: Arc<Tenant>,
1467 : new_shard_count: ShardCount,
1468 : new_stripe_size: Option<ShardStripeSize>,
1469 : ctx: &RequestContext,
1470 : ) -> anyhow::Result<Vec<TenantShardId>> {
1471 : let tenant_shard_id = *tenant.get_tenant_shard_id();
1472 : let r = self
1473 : .do_shard_split(tenant, new_shard_count, new_stripe_size, ctx)
1474 : .await;
1475 : if r.is_err() {
1476 : // Shard splitting might have left the original shard in a partially shut down state (it
1477 : // stops the shard's remote timeline client). Reset it to ensure we leave things in
1478 : // a working state.
1479 : if self.get(tenant_shard_id).is_some() {
1480 0 : tracing::warn!("Resetting after shard split failure");
1481 : if let Err(e) = self.reset_tenant(tenant_shard_id, false, ctx).await {
1482 : // Log this error because our return value will still be the original error, not this one. This is
1483 : // a severe error: if this happens, we might be leaving behind a tenant that is not fully functional
1484 : // (e.g. has uploads disabled). We can't do anything else: if reset fails then shutting the tenant down or
1485 : // setting it broken probably won't help either.
1486 0 : tracing::error!("Failed to reset: {e}");
1487 : }
1488 : }
1489 : }
1490 :
1491 : r
1492 : }
1493 :
1494 0 : pub(crate) async fn do_shard_split(
1495 0 : &self,
1496 0 : tenant: Arc<Tenant>,
1497 0 : new_shard_count: ShardCount,
1498 0 : new_stripe_size: Option<ShardStripeSize>,
1499 0 : ctx: &RequestContext,
1500 0 : ) -> anyhow::Result<Vec<TenantShardId>> {
1501 0 : let tenant_shard_id = *tenant.get_tenant_shard_id();
1502 0 :
1503 0 : // Validate the incoming request
1504 0 : if new_shard_count.count() <= tenant_shard_id.shard_count.count() {
1505 0 : anyhow::bail!("Requested shard count is not an increase");
1506 0 : }
1507 0 : let expansion_factor = new_shard_count.count() / tenant_shard_id.shard_count.count();
1508 0 : if !expansion_factor.is_power_of_two() {
1509 0 : anyhow::bail!("Requested split is not a power of two");
1510 0 : }
1511 :
1512 0 : if let Some(new_stripe_size) = new_stripe_size {
1513 0 : if tenant.get_shard_stripe_size() != new_stripe_size
1514 0 : && tenant_shard_id.shard_count.count() > 1
1515 : {
1516 : // This tenant already has multiple shards, it is illegal to try and change its stripe size
1517 0 : anyhow::bail!(
1518 0 : "Shard stripe size may not be modified once tenant has multiple shards"
1519 0 : );
1520 0 : }
1521 0 : }
1522 :
1523 : // Plan: identify what the new child shards will be
1524 0 : let child_shards = tenant_shard_id.split(new_shard_count);
1525 0 : tracing::info!(
1526 0 : "Shard {} splits into: {}",
1527 0 : tenant_shard_id.to_index(),
1528 0 : child_shards
1529 0 : .iter()
1530 0 : .map(|id| format!("{}", id.to_index()))
1531 0 : .join(",")
1532 0 : );
1533 :
1534 0 : fail::fail_point!("shard-split-pre-prepare", |_| Err(anyhow::anyhow!(
1535 0 : "failpoint"
1536 0 : )));
1537 :
1538 0 : let parent_shard_identity = tenant.shard_identity;
1539 0 : let parent_tenant_conf = tenant.get_tenant_conf();
1540 0 : let parent_generation = tenant.generation;
1541 :
1542 : // Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
1543 0 : if let Err(e) = tenant.split_prepare(&child_shards).await {
1544 : // If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
1545 : // have been left in a partially-shut-down state.
1546 0 : tracing::warn!("Failed to prepare for split: {e}, reloading Tenant before returning");
1547 0 : return Err(e);
1548 0 : }
1549 0 :
1550 0 : fail::fail_point!("shard-split-post-prepare", |_| Err(anyhow::anyhow!(
1551 0 : "failpoint"
1552 0 : )));
1553 :
1554 0 : self.resources.deletion_queue_client.flush_advisory();
1555 0 :
1556 0 : // Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
1557 0 : drop(tenant);
1558 0 : let mut parent_slot_guard =
1559 0 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1560 0 : let parent = match parent_slot_guard.get_old_value() {
1561 0 : Some(TenantSlot::Attached(t)) => t,
1562 0 : Some(TenantSlot::Secondary(_)) => anyhow::bail!("Tenant location in secondary mode"),
1563 : Some(TenantSlot::InProgress(_)) => {
1564 : // tenant_map_acquire_slot never returns InProgress, if a slot was InProgress
1565 : // it would return an error.
1566 0 : unreachable!()
1567 : }
1568 : None => {
1569 : // We don't actually need the parent shard to still be attached to do our work, but it's
1570 : // a weird enough situation that the caller probably didn't want us to continue working
1571 : // if they had detached the tenant they requested the split on.
1572 0 : anyhow::bail!("Detached parent shard in the middle of split!")
1573 : }
1574 : };
1575 0 : fail::fail_point!("shard-split-pre-hardlink", |_| Err(anyhow::anyhow!(
1576 0 : "failpoint"
1577 0 : )));
1578 : // Optimization: hardlink layers from the parent into the children, so that they don't have to
1579 : // re-download & duplicate the data referenced in their initial IndexPart
1580 0 : self.shard_split_hardlink(parent, child_shards.clone())
1581 0 : .await?;
1582 0 : fail::fail_point!("shard-split-post-hardlink", |_| Err(anyhow::anyhow!(
1583 0 : "failpoint"
1584 0 : )));
1585 :
1586 : // Take a snapshot of where the parent's WAL ingest had got to: we will wait for
1587 : // child shards to reach this point.
1588 0 : let mut target_lsns = HashMap::new();
1589 0 : for timeline in parent.timelines.lock().unwrap().clone().values() {
1590 0 : target_lsns.insert(timeline.timeline_id, timeline.get_last_record_lsn());
1591 0 : }
1592 :
1593 : // TODO: we should have the parent shard stop its WAL ingest here, it's a waste of resources
1594 : // and could slow down the children trying to catch up.
1595 :
1596 : // Phase 3: Spawn the child shards
1597 0 : for child_shard in &child_shards {
1598 0 : let mut child_shard_identity = parent_shard_identity;
1599 0 : if let Some(new_stripe_size) = new_stripe_size {
1600 0 : child_shard_identity.stripe_size = new_stripe_size;
1601 0 : }
1602 0 : child_shard_identity.count = child_shard.shard_count;
1603 0 : child_shard_identity.number = child_shard.shard_number;
1604 0 :
1605 0 : let child_location_conf = LocationConf {
1606 0 : mode: LocationMode::Attached(AttachedLocationConfig {
1607 0 : generation: parent_generation,
1608 0 : attach_mode: AttachmentMode::Single,
1609 0 : }),
1610 0 : shard: child_shard_identity,
1611 0 : tenant_conf: parent_tenant_conf.clone(),
1612 0 : };
1613 0 :
1614 0 : self.upsert_location(
1615 0 : *child_shard,
1616 0 : child_location_conf,
1617 0 : None,
1618 0 : SpawnMode::Eager,
1619 0 : ctx,
1620 0 : )
1621 0 : .await?;
1622 : }
1623 :
1624 0 : fail::fail_point!("shard-split-post-child-conf", |_| Err(anyhow::anyhow!(
1625 0 : "failpoint"
1626 0 : )));
1627 :
1628 : // Phase 4: wait for child chards WAL ingest to catch up to target LSN
1629 0 : for child_shard_id in &child_shards {
1630 0 : let child_shard_id = *child_shard_id;
1631 0 : let child_shard = {
1632 0 : let locked = TENANTS.read().unwrap();
1633 0 : let peek_slot =
1634 0 : tenant_map_peek_slot(&locked, &child_shard_id, TenantSlotPeekMode::Read)?;
1635 0 : peek_slot.and_then(|s| s.get_attached()).cloned()
1636 : };
1637 0 : if let Some(t) = child_shard {
1638 : // Wait for the child shard to become active: this should be very quick because it only
1639 : // has to download the index_part that we just uploaded when creating it.
1640 0 : if let Err(e) = t.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await {
1641 : // This is not fatal: we have durably created the child shard. It just makes the
1642 : // split operation less seamless for clients, as we will may detach the parent
1643 : // shard before the child shards are fully ready to serve requests.
1644 0 : tracing::warn!("Failed to wait for shard {child_shard_id} to activate: {e}");
1645 0 : continue;
1646 0 : }
1647 0 :
1648 0 : let timelines = t.timelines.lock().unwrap().clone();
1649 0 : for timeline in timelines.values() {
1650 0 : let Some(target_lsn) = target_lsns.get(&timeline.timeline_id) else {
1651 0 : continue;
1652 : };
1653 :
1654 0 : tracing::info!(
1655 0 : "Waiting for child shard {}/{} to reach target lsn {}...",
1656 0 : child_shard_id,
1657 0 : timeline.timeline_id,
1658 0 : target_lsn
1659 0 : );
1660 :
1661 0 : fail::fail_point!("shard-split-lsn-wait", |_| Err(anyhow::anyhow!(
1662 0 : "failpoint"
1663 0 : )));
1664 0 : if let Err(e) = timeline
1665 0 : .wait_lsn(
1666 0 : *target_lsn,
1667 0 : crate::tenant::timeline::WaitLsnWaiter::Tenant,
1668 0 : ctx,
1669 0 : )
1670 0 : .await
1671 : {
1672 : // Failure here might mean shutdown, in any case this part is an optimization
1673 : // and we shouldn't hold up the split operation.
1674 0 : tracing::warn!(
1675 0 : "Failed to wait for timeline {} to reach lsn {target_lsn}: {e}",
1676 0 : timeline.timeline_id
1677 0 : );
1678 : } else {
1679 0 : tracing::info!(
1680 0 : "Child shard {}/{} reached target lsn {}",
1681 0 : child_shard_id,
1682 0 : timeline.timeline_id,
1683 0 : target_lsn
1684 0 : );
1685 : }
1686 : }
1687 0 : }
1688 : }
1689 :
1690 : // Phase 5: Shut down the parent shard, and erase it from disk
1691 0 : let (_guard, progress) = completion::channel();
1692 0 : match parent.shutdown(progress, ShutdownMode::Hard).await {
1693 0 : Ok(()) => {}
1694 0 : Err(other) => {
1695 0 : other.wait().await;
1696 : }
1697 : }
1698 0 : let local_tenant_directory = self.conf.tenant_path(&tenant_shard_id);
1699 0 : let tmp_path = safe_rename_tenant_dir(&local_tenant_directory)
1700 0 : .await
1701 0 : .with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))?;
1702 0 : self.spawn_background_purge(tmp_path);
1703 0 :
1704 0 : fail::fail_point!("shard-split-pre-finish", |_| Err(anyhow::anyhow!(
1705 0 : "failpoint"
1706 0 : )));
1707 :
1708 0 : parent_slot_guard.drop_old_value()?;
1709 :
1710 : // Phase 6: Release the InProgress on the parent shard
1711 0 : drop(parent_slot_guard);
1712 0 :
1713 0 : Ok(child_shards)
1714 0 : }
1715 :
1716 : /// Part of [`Self::shard_split`]: hard link parent shard layers into child shards, as an optimization
1717 : /// to avoid the children downloading them again.
1718 : ///
1719 : /// For each resident layer in the parent shard, we will hard link it into all of the child shards.
1720 0 : async fn shard_split_hardlink(
1721 0 : &self,
1722 0 : parent_shard: &Tenant,
1723 0 : child_shards: Vec<TenantShardId>,
1724 0 : ) -> anyhow::Result<()> {
1725 0 : debug_assert_current_span_has_tenant_id();
1726 0 :
1727 0 : let parent_path = self.conf.tenant_path(parent_shard.get_tenant_shard_id());
1728 0 : let (parent_timelines, parent_layers) = {
1729 0 : let mut parent_layers = Vec::new();
1730 0 : let timelines = parent_shard.timelines.lock().unwrap().clone();
1731 0 : let parent_timelines = timelines.keys().cloned().collect::<Vec<_>>();
1732 0 : for timeline in timelines.values() {
1733 0 : let timeline_layers = timeline
1734 0 : .layers
1735 0 : .read()
1736 0 : .await
1737 0 : .likely_resident_layers()
1738 0 : .collect::<Vec<_>>();
1739 :
1740 0 : for layer in timeline_layers {
1741 0 : let relative_path = layer
1742 0 : .local_path()
1743 0 : .strip_prefix(&parent_path)
1744 0 : .context("Removing prefix from parent layer path")?;
1745 0 : parent_layers.push(relative_path.to_owned());
1746 : }
1747 : }
1748 0 : debug_assert!(
1749 0 : !parent_layers.is_empty(),
1750 0 : "shutdown cannot empty the layermap"
1751 : );
1752 0 : (parent_timelines, parent_layers)
1753 0 : };
1754 0 :
1755 0 : let mut child_prefixes = Vec::new();
1756 0 : let mut create_dirs = Vec::new();
1757 :
1758 0 : for child in child_shards {
1759 0 : let child_prefix = self.conf.tenant_path(&child);
1760 0 : create_dirs.push(child_prefix.clone());
1761 0 : create_dirs.extend(
1762 0 : parent_timelines
1763 0 : .iter()
1764 0 : .map(|t| self.conf.timeline_path(&child, t)),
1765 0 : );
1766 0 :
1767 0 : child_prefixes.push(child_prefix);
1768 0 : }
1769 :
1770 : // Since we will do a large number of small filesystem metadata operations, batch them into
1771 : // spawn_blocking calls rather than doing each one as a tokio::fs round-trip.
1772 0 : let jh = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> {
1773 0 : for dir in &create_dirs {
1774 0 : if let Err(e) = std::fs::create_dir_all(dir) {
1775 : // Ignore AlreadyExists errors, drop out on all other errors
1776 0 : match e.kind() {
1777 0 : std::io::ErrorKind::AlreadyExists => {}
1778 : _ => {
1779 0 : return Err(anyhow::anyhow!(e).context(format!("Creating {dir}")));
1780 : }
1781 : }
1782 0 : }
1783 : }
1784 :
1785 0 : for child_prefix in child_prefixes {
1786 0 : for relative_layer in &parent_layers {
1787 0 : let parent_path = parent_path.join(relative_layer);
1788 0 : let child_path = child_prefix.join(relative_layer);
1789 0 : if let Err(e) = std::fs::hard_link(&parent_path, &child_path) {
1790 0 : match e.kind() {
1791 0 : std::io::ErrorKind::AlreadyExists => {}
1792 : std::io::ErrorKind::NotFound => {
1793 0 : tracing::info!(
1794 0 : "Layer {} not found during hard-linking, evicted during split?",
1795 0 : relative_layer
1796 0 : );
1797 : }
1798 : _ => {
1799 0 : return Err(anyhow::anyhow!(e).context(format!(
1800 0 : "Hard linking {relative_layer} into {child_prefix}"
1801 0 : )))
1802 : }
1803 : }
1804 0 : }
1805 : }
1806 : }
1807 :
1808 : // Durability is not required for correctness, but if we crashed during split and
1809 : // then came restarted with empty timeline dirs, it would be very inefficient to
1810 : // re-populate from remote storage.
1811 0 : for dir in create_dirs {
1812 0 : if let Err(e) = crashsafe::fsync(&dir) {
1813 : // Something removed a newly created timeline dir out from underneath us? Extremely
1814 : // unexpected, but not worth panic'ing over as this whole function is just an
1815 : // optimization.
1816 0 : tracing::warn!("Failed to fsync directory {dir}: {e}")
1817 0 : }
1818 : }
1819 :
1820 0 : Ok(parent_layers.len())
1821 0 : });
1822 0 :
1823 0 : match jh.await {
1824 0 : Ok(Ok(layer_count)) => {
1825 0 : tracing::info!(count = layer_count, "Hard linked layers into child shards");
1826 : }
1827 0 : Ok(Err(e)) => {
1828 0 : // This is an optimization, so we tolerate failure.
1829 0 : tracing::warn!("Error hard-linking layers, proceeding anyway: {e}")
1830 : }
1831 0 : Err(e) => {
1832 0 : // This is something totally unexpected like a panic, so bail out.
1833 0 : anyhow::bail!("Error joining hard linking task: {e}");
1834 : }
1835 : }
1836 :
1837 0 : Ok(())
1838 0 : }
1839 :
1840 : ///
1841 : /// Shut down all tenants. This runs as part of pageserver shutdown.
1842 : ///
1843 : /// NB: We leave the tenants in the map, so that they remain accessible through
1844 : /// the management API until we shut it down. If we removed the shut-down tenants
1845 : /// from the tenants map, the management API would return 404 for these tenants,
1846 : /// because TenantsMap::get() now returns `None`.
1847 : /// That could be easily misinterpreted by control plane, the consumer of the
1848 : /// management API. For example, it could attach the tenant on a different pageserver.
1849 : /// We would then be in split-brain once this pageserver restarts.
1850 0 : #[instrument(skip_all)]
1851 : pub(crate) async fn shutdown(&self) {
1852 : self.cancel.cancel();
1853 :
1854 : shutdown_all_tenants0(self.tenants).await
1855 : }
1856 :
1857 : /// When we have moved a tenant's content to a temporary directory, we may delete it lazily in
1858 : /// the background, and thereby avoid blocking any API requests on this deletion completing.
1859 0 : fn spawn_background_purge(&self, tmp_path: Utf8PathBuf) {
1860 0 : // Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
1861 0 : // After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
1862 0 : let task_tenant_id = None;
1863 0 :
1864 0 : task_mgr::spawn(
1865 0 : task_mgr::BACKGROUND_RUNTIME.handle(),
1866 0 : TaskKind::MgmtRequest,
1867 0 : task_tenant_id,
1868 0 : None,
1869 0 : "tenant_files_delete",
1870 0 : false,
1871 0 : async move {
1872 0 : fs::remove_dir_all(tmp_path.as_path())
1873 0 : .await
1874 0 : .with_context(|| format!("tenant directory {:?} deletion", tmp_path))
1875 0 : },
1876 0 : );
1877 0 : }
1878 :
1879 0 : pub(crate) async fn detach_tenant(
1880 0 : &self,
1881 0 : conf: &'static PageServerConf,
1882 0 : tenant_shard_id: TenantShardId,
1883 0 : detach_ignored: bool,
1884 0 : deletion_queue_client: &DeletionQueueClient,
1885 0 : ) -> Result<(), TenantStateError> {
1886 0 : let tmp_path = self
1887 0 : .detach_tenant0(
1888 0 : conf,
1889 0 : &TENANTS,
1890 0 : tenant_shard_id,
1891 0 : detach_ignored,
1892 0 : deletion_queue_client,
1893 0 : )
1894 0 : .await?;
1895 0 : self.spawn_background_purge(tmp_path);
1896 0 :
1897 0 : Ok(())
1898 0 : }
1899 :
1900 0 : async fn detach_tenant0(
1901 0 : &self,
1902 0 : conf: &'static PageServerConf,
1903 0 : tenants: &std::sync::RwLock<TenantsMap>,
1904 0 : tenant_shard_id: TenantShardId,
1905 0 : detach_ignored: bool,
1906 0 : deletion_queue_client: &DeletionQueueClient,
1907 0 : ) -> Result<Utf8PathBuf, TenantStateError> {
1908 0 : let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
1909 0 : let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
1910 0 : safe_rename_tenant_dir(&local_tenant_directory)
1911 0 : .await
1912 0 : .with_context(|| {
1913 0 : format!("local tenant directory {local_tenant_directory:?} rename")
1914 0 : })
1915 0 : };
1916 :
1917 0 : let removal_result = remove_tenant_from_memory(
1918 0 : tenants,
1919 0 : tenant_shard_id,
1920 0 : tenant_dir_rename_operation(tenant_shard_id),
1921 0 : )
1922 0 : .await;
1923 :
1924 : // Flush pending deletions, so that they have a good chance of passing validation
1925 : // before this tenant is potentially re-attached elsewhere.
1926 0 : deletion_queue_client.flush_advisory();
1927 0 :
1928 0 : // Ignored tenants are not present in memory and will bail the removal from memory operation.
1929 0 : // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
1930 0 : if detach_ignored
1931 0 : && matches!(
1932 0 : removal_result,
1933 : Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
1934 : )
1935 : {
1936 0 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
1937 0 : if tenant_ignore_mark.exists() {
1938 0 : info!("Detaching an ignored tenant");
1939 0 : let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
1940 0 : .await
1941 0 : .with_context(|| {
1942 0 : format!("Ignored tenant {tenant_shard_id} local directory rename")
1943 0 : })?;
1944 0 : return Ok(tmp_path);
1945 0 : }
1946 0 : }
1947 :
1948 0 : removal_result
1949 0 : }
1950 :
1951 0 : pub(crate) fn list_tenants(
1952 0 : &self,
1953 0 : ) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
1954 0 : let tenants = TENANTS.read().unwrap();
1955 0 : let m = match &*tenants {
1956 0 : TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
1957 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
1958 0 : };
1959 0 : Ok(m.iter()
1960 0 : .filter_map(|(id, tenant)| match tenant {
1961 0 : TenantSlot::Attached(tenant) => {
1962 0 : Some((*id, tenant.current_state(), tenant.generation()))
1963 : }
1964 0 : TenantSlot::Secondary(_) => None,
1965 0 : TenantSlot::InProgress(_) => None,
1966 0 : })
1967 0 : .collect())
1968 0 : }
1969 : }
1970 :
1971 0 : #[derive(Debug, thiserror::Error)]
1972 : pub(crate) enum GetTenantError {
1973 : /// NotFound is a TenantId rather than TenantShardId, because this error type is used from
1974 : /// getters that use a TenantId and a ShardSelector, not just getters that target a specific shard.
1975 : #[error("Tenant {0} not found")]
1976 : NotFound(TenantId),
1977 :
1978 : #[error("Tenant {0} is not active")]
1979 : NotActive(TenantShardId),
1980 :
1981 : // Initializing or shutting down: cannot authoritatively say whether we have this tenant
1982 : #[error("Tenant map is not available: {0}")]
1983 : MapState(#[from] TenantMapError),
1984 : }
1985 :
1986 0 : #[derive(thiserror::Error, Debug)]
1987 : pub(crate) enum GetActiveTenantError {
1988 : /// We may time out either while TenantSlot is InProgress, or while the Tenant
1989 : /// is in a non-Active state
1990 : #[error(
1991 : "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
1992 : )]
1993 : WaitForActiveTimeout {
1994 : latest_state: Option<TenantState>,
1995 : wait_time: Duration,
1996 : },
1997 :
1998 : /// The TenantSlot is absent, or in secondary mode
1999 : #[error(transparent)]
2000 : NotFound(#[from] GetTenantError),
2001 :
2002 : /// Cancellation token fired while we were waiting
2003 : #[error("cancelled")]
2004 : Cancelled,
2005 :
2006 : /// Tenant exists, but is in a state that cannot become active (e.g. Stopping, Broken)
2007 : #[error("will not become active. Current state: {0}")]
2008 : WillNotBecomeActive(TenantState),
2009 :
2010 : /// Broken is logically a subset of WillNotBecomeActive, but a distinct error is useful as
2011 : /// WillNotBecomeActive is a permitted error under some circumstances, whereas broken should
2012 : /// never happen.
2013 : #[error("Tenant is broken: {0}")]
2014 : Broken(String),
2015 : }
2016 :
2017 : /// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`]
2018 : /// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`],
2019 : /// then wait for up to `timeout` (minus however long we waited for the slot).
2020 0 : pub(crate) async fn get_active_tenant_with_timeout(
2021 0 : tenant_id: TenantId,
2022 0 : shard_selector: ShardSelector,
2023 0 : timeout: Duration,
2024 0 : cancel: &CancellationToken,
2025 0 : ) -> Result<Arc<Tenant>, GetActiveTenantError> {
2026 0 : enum WaitFor {
2027 0 : Barrier(utils::completion::Barrier),
2028 0 : Tenant(Arc<Tenant>),
2029 0 : }
2030 0 :
2031 0 : let wait_start = Instant::now();
2032 0 : let deadline = wait_start + timeout;
2033 :
2034 0 : let (wait_for, tenant_shard_id) = {
2035 0 : let locked = TENANTS.read().unwrap();
2036 :
2037 : // Resolve TenantId to TenantShardId
2038 0 : let tenant_shard_id = locked
2039 0 : .resolve_attached_shard(&tenant_id, shard_selector)
2040 0 : .ok_or(GetActiveTenantError::NotFound(GetTenantError::NotFound(
2041 0 : tenant_id,
2042 0 : )))?;
2043 :
2044 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
2045 0 : .map_err(GetTenantError::MapState)?;
2046 0 : match peek_slot {
2047 0 : Some(TenantSlot::Attached(tenant)) => {
2048 0 : match tenant.current_state() {
2049 : TenantState::Active => {
2050 : // Fast path: we don't need to do any async waiting.
2051 0 : return Ok(tenant.clone());
2052 : }
2053 : _ => {
2054 0 : tenant.activate_now();
2055 0 : (WaitFor::Tenant(tenant.clone()), tenant_shard_id)
2056 : }
2057 : }
2058 : }
2059 : Some(TenantSlot::Secondary(_)) => {
2060 0 : return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
2061 0 : tenant_shard_id,
2062 0 : )))
2063 : }
2064 0 : Some(TenantSlot::InProgress(barrier)) => {
2065 0 : (WaitFor::Barrier(barrier.clone()), tenant_shard_id)
2066 : }
2067 : None => {
2068 0 : return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
2069 0 : tenant_id,
2070 0 : )))
2071 : }
2072 : }
2073 : };
2074 :
2075 0 : let tenant = match wait_for {
2076 0 : WaitFor::Barrier(barrier) => {
2077 0 : tracing::debug!("Waiting for tenant InProgress state to pass...");
2078 0 : timeout_cancellable(
2079 0 : deadline.duration_since(Instant::now()),
2080 0 : cancel,
2081 0 : barrier.wait(),
2082 0 : )
2083 0 : .await
2084 0 : .map_err(|e| match e {
2085 0 : TimeoutCancellableError::Timeout => GetActiveTenantError::WaitForActiveTimeout {
2086 0 : latest_state: None,
2087 0 : wait_time: wait_start.elapsed(),
2088 0 : },
2089 0 : TimeoutCancellableError::Cancelled => GetActiveTenantError::Cancelled,
2090 0 : })?;
2091 : {
2092 0 : let locked = TENANTS.read().unwrap();
2093 0 : let peek_slot =
2094 0 : tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
2095 0 : .map_err(GetTenantError::MapState)?;
2096 0 : match peek_slot {
2097 0 : Some(TenantSlot::Attached(tenant)) => tenant.clone(),
2098 : _ => {
2099 0 : return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
2100 0 : tenant_shard_id,
2101 0 : )))
2102 : }
2103 : }
2104 : }
2105 : }
2106 0 : WaitFor::Tenant(tenant) => tenant,
2107 : };
2108 :
2109 0 : tracing::debug!("Waiting for tenant to enter active state...");
2110 0 : tenant
2111 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
2112 0 : .await?;
2113 0 : Ok(tenant)
2114 0 : }
2115 :
2116 0 : #[derive(Debug, thiserror::Error)]
2117 : pub(crate) enum DeleteTimelineError {
2118 : #[error("Tenant {0}")]
2119 : Tenant(#[from] GetTenantError),
2120 :
2121 : #[error("Timeline {0}")]
2122 : Timeline(#[from] crate::tenant::DeleteTimelineError),
2123 : }
2124 :
2125 0 : #[derive(Debug, thiserror::Error)]
2126 : pub(crate) enum TenantStateError {
2127 : #[error("Tenant {0} is stopping")]
2128 : IsStopping(TenantShardId),
2129 : #[error(transparent)]
2130 : SlotError(#[from] TenantSlotError),
2131 : #[error(transparent)]
2132 : SlotUpsertError(#[from] TenantSlotUpsertError),
2133 : #[error(transparent)]
2134 : Other(#[from] anyhow::Error),
2135 : }
2136 :
2137 0 : pub(crate) async fn load_tenant(
2138 0 : conf: &'static PageServerConf,
2139 0 : tenant_id: TenantId,
2140 0 : generation: Generation,
2141 0 : broker_client: storage_broker::BrokerClientChannel,
2142 0 : remote_storage: Option<GenericRemoteStorage>,
2143 0 : deletion_queue_client: DeletionQueueClient,
2144 0 : ctx: &RequestContext,
2145 0 : ) -> Result<(), TenantMapInsertError> {
2146 0 : // This is a legacy API (replaced by `/location_conf`). It does not support sharding
2147 0 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
2148 :
2149 0 : let slot_guard =
2150 0 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
2151 0 : let tenant_path = conf.tenant_path(&tenant_shard_id);
2152 0 :
2153 0 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
2154 0 : if tenant_ignore_mark.exists() {
2155 0 : std::fs::remove_file(&tenant_ignore_mark).with_context(|| {
2156 0 : format!(
2157 0 : "Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"
2158 0 : )
2159 0 : })?;
2160 0 : }
2161 :
2162 0 : let resources = TenantSharedResources {
2163 0 : broker_client,
2164 0 : remote_storage,
2165 0 : deletion_queue_client,
2166 0 : };
2167 :
2168 0 : let mut location_conf =
2169 0 : Tenant::load_tenant_config(conf, &tenant_shard_id).map_err(TenantMapInsertError::Other)?;
2170 0 : location_conf.attach_in_generation(AttachmentMode::Single, generation);
2171 0 :
2172 0 : Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
2173 :
2174 0 : let shard_identity = location_conf.shard;
2175 0 : let new_tenant = tenant_spawn(
2176 0 : conf,
2177 0 : tenant_shard_id,
2178 0 : &tenant_path,
2179 0 : resources,
2180 0 : AttachedTenantConf::try_from(location_conf)?,
2181 0 : shard_identity,
2182 0 : None,
2183 0 : &TENANTS,
2184 0 : SpawnMode::Eager,
2185 0 : ctx,
2186 0 : )
2187 0 : .with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))?;
2188 :
2189 0 : slot_guard.upsert(TenantSlot::Attached(new_tenant))?;
2190 0 : Ok(())
2191 0 : }
2192 :
2193 0 : pub(crate) async fn ignore_tenant(
2194 0 : conf: &'static PageServerConf,
2195 0 : tenant_id: TenantId,
2196 0 : ) -> Result<(), TenantStateError> {
2197 0 : ignore_tenant0(conf, &TENANTS, tenant_id).await
2198 0 : }
2199 :
2200 0 : #[instrument(skip_all, fields(shard_id))]
2201 : async fn ignore_tenant0(
2202 : conf: &'static PageServerConf,
2203 : tenants: &std::sync::RwLock<TenantsMap>,
2204 : tenant_id: TenantId,
2205 : ) -> Result<(), TenantStateError> {
2206 : // This is a legacy API (replaced by `/location_conf`). It does not support sharding
2207 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
2208 : tracing::Span::current().record(
2209 : "shard_id",
2210 : tracing::field::display(tenant_shard_id.shard_slug()),
2211 : );
2212 :
2213 0 : remove_tenant_from_memory(tenants, tenant_shard_id, async {
2214 0 : let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
2215 0 : fs::File::create(&ignore_mark_file)
2216 0 : .await
2217 0 : .context("Failed to create ignore mark file")
2218 0 : .and_then(|_| {
2219 0 : crashsafe::fsync_file_and_parent(&ignore_mark_file)
2220 0 : .context("Failed to fsync ignore mark file")
2221 0 : })
2222 0 : .with_context(|| format!("Failed to crate ignore mark for tenant {tenant_shard_id}"))?;
2223 0 : Ok(())
2224 0 : })
2225 : .await
2226 : }
2227 :
2228 0 : #[derive(Debug, thiserror::Error)]
2229 : pub(crate) enum TenantMapListError {
2230 : #[error("tenant map is still initiailizing")]
2231 : Initializing,
2232 : }
2233 :
2234 0 : #[derive(Debug, thiserror::Error)]
2235 : pub(crate) enum TenantMapInsertError {
2236 : #[error(transparent)]
2237 : SlotError(#[from] TenantSlotError),
2238 : #[error(transparent)]
2239 : SlotUpsertError(#[from] TenantSlotUpsertError),
2240 : #[error(transparent)]
2241 : Other(#[from] anyhow::Error),
2242 : }
2243 :
2244 : /// Superset of TenantMapError: issues that can occur when acquiring a slot
2245 : /// for a particular tenant ID.
2246 0 : #[derive(Debug, thiserror::Error)]
2247 : pub(crate) enum TenantSlotError {
2248 : /// When acquiring a slot with the expectation that the tenant already exists.
2249 : #[error("Tenant {0} not found")]
2250 : NotFound(TenantShardId),
2251 :
2252 : /// When acquiring a slot with the expectation that the tenant does not already exist.
2253 : #[error("tenant {0} already exists, state: {1:?}")]
2254 : AlreadyExists(TenantShardId, TenantState),
2255 :
2256 : // Tried to read a slot that is currently being mutated by another administrative
2257 : // operation.
2258 : #[error("tenant has a state change in progress, try again later")]
2259 : InProgress,
2260 :
2261 : #[error(transparent)]
2262 : MapState(#[from] TenantMapError),
2263 : }
2264 :
2265 : /// Superset of TenantMapError: issues that can occur when using a SlotGuard
2266 : /// to insert a new value.
2267 0 : #[derive(thiserror::Error)]
2268 : pub(crate) enum TenantSlotUpsertError {
2269 : /// An error where the slot is in an unexpected state, indicating a code bug
2270 : #[error("Internal error updating Tenant")]
2271 : InternalError(Cow<'static, str>),
2272 :
2273 : #[error(transparent)]
2274 : MapState(TenantMapError),
2275 :
2276 : // If we encounter TenantManager shutdown during upsert, we must carry the Completion
2277 : // from the SlotGuard, so that the caller can hold it while they clean up: otherwise
2278 : // TenantManager shutdown might race ahead before we're done cleaning up any Tenant that
2279 : // was protected by the SlotGuard.
2280 : #[error("Shutting down")]
2281 : ShuttingDown((TenantSlot, utils::completion::Completion)),
2282 : }
2283 :
2284 : impl std::fmt::Debug for TenantSlotUpsertError {
2285 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2286 0 : match self {
2287 0 : Self::InternalError(reason) => write!(f, "Internal Error {reason}"),
2288 0 : Self::MapState(map_error) => write!(f, "Tenant map state: {map_error:?}"),
2289 0 : Self::ShuttingDown(_completion) => write!(f, "Tenant map shutting down"),
2290 : }
2291 0 : }
2292 : }
2293 :
2294 0 : #[derive(Debug, thiserror::Error)]
2295 : enum TenantSlotDropError {
2296 : /// It is only legal to drop a TenantSlot if its contents are fully shut down
2297 : #[error("Tenant was not shut down")]
2298 : NotShutdown,
2299 : }
2300 :
2301 : /// Errors that can happen any time we are walking the tenant map to try and acquire
2302 : /// the TenantSlot for a particular tenant.
2303 0 : #[derive(Debug, thiserror::Error)]
2304 : pub enum TenantMapError {
2305 : // Tried to read while initializing
2306 : #[error("tenant map is still initializing")]
2307 : StillInitializing,
2308 :
2309 : // Tried to read while shutting down
2310 : #[error("tenant map is shutting down")]
2311 : ShuttingDown,
2312 : }
2313 :
2314 : /// Guards a particular tenant_id's content in the TenantsMap. While this
2315 : /// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
2316 : /// for this tenant, which acts as a marker for any operations targeting
2317 : /// this tenant to retry later, or wait for the InProgress state to end.
2318 : ///
2319 : /// This structure enforces the important invariant that we do not have overlapping
2320 : /// tasks that will try use local storage for a the same tenant ID: we enforce that
2321 : /// the previous contents of a slot have been shut down before the slot can be
2322 : /// left empty or used for something else
2323 : ///
2324 : /// Holders of a SlotGuard should explicitly dispose of it, using either `upsert`
2325 : /// to provide a new value, or `revert` to put the slot back into its initial
2326 : /// state. If the SlotGuard is dropped without calling either of these, then
2327 : /// we will leave the slot empty if our `old_value` is already shut down, else
2328 : /// we will replace the slot with `old_value` (equivalent to doing a revert).
2329 : ///
2330 : /// The `old_value` may be dropped before the SlotGuard is dropped, by calling
2331 : /// `drop_old_value`. It is an error to call this without shutting down
2332 : /// the conents of `old_value`.
2333 : pub struct SlotGuard {
2334 : tenant_shard_id: TenantShardId,
2335 : old_value: Option<TenantSlot>,
2336 : upserted: bool,
2337 :
2338 : /// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will
2339 : /// release any waiters as soon as this SlotGuard is dropped.
2340 : completion: utils::completion::Completion,
2341 : }
2342 :
2343 : impl SlotGuard {
2344 2 : fn new(
2345 2 : tenant_shard_id: TenantShardId,
2346 2 : old_value: Option<TenantSlot>,
2347 2 : completion: utils::completion::Completion,
2348 2 : ) -> Self {
2349 2 : Self {
2350 2 : tenant_shard_id,
2351 2 : old_value,
2352 2 : upserted: false,
2353 2 : completion,
2354 2 : }
2355 2 : }
2356 :
2357 : /// Get any value that was present in the slot before we acquired ownership
2358 : /// of it: in state transitions, this will be the old state.
2359 2 : fn get_old_value(&self) -> &Option<TenantSlot> {
2360 2 : &self.old_value
2361 2 : }
2362 :
2363 : /// Emplace a new value in the slot. This consumes the guard, and after
2364 : /// returning, the slot is no longer protected from concurrent changes.
2365 0 : fn upsert(mut self, new_value: TenantSlot) -> Result<(), TenantSlotUpsertError> {
2366 0 : if !self.old_value_is_shutdown() {
2367 : // This is a bug: callers should never try to drop an old value without
2368 : // shutting it down
2369 0 : return Err(TenantSlotUpsertError::InternalError(
2370 0 : "Old TenantSlot value not shut down".into(),
2371 0 : ));
2372 0 : }
2373 :
2374 0 : let replaced = {
2375 0 : let mut locked = TENANTS.write().unwrap();
2376 0 :
2377 0 : if let TenantSlot::InProgress(_) = new_value {
2378 : // It is never expected to try and upsert InProgress via this path: it should
2379 : // only be written via the tenant_map_acquire_slot path. If we hit this it's a bug.
2380 0 : return Err(TenantSlotUpsertError::InternalError(
2381 0 : "Attempt to upsert an InProgress state".into(),
2382 0 : ));
2383 0 : }
2384 :
2385 0 : let m = match &mut *locked {
2386 : TenantsMap::Initializing => {
2387 0 : return Err(TenantSlotUpsertError::MapState(
2388 0 : TenantMapError::StillInitializing,
2389 0 : ))
2390 : }
2391 : TenantsMap::ShuttingDown(_) => {
2392 0 : return Err(TenantSlotUpsertError::ShuttingDown((
2393 0 : new_value,
2394 0 : self.completion.clone(),
2395 0 : )));
2396 : }
2397 0 : TenantsMap::Open(m) => m,
2398 0 : };
2399 0 :
2400 0 : let replaced = m.insert(self.tenant_shard_id, new_value);
2401 0 : self.upserted = true;
2402 0 :
2403 0 : METRICS.tenant_slots.set(m.len() as u64);
2404 0 :
2405 0 : replaced
2406 : };
2407 :
2408 : // Sanity check: on an upsert we should always be replacing an InProgress marker
2409 0 : match replaced {
2410 : Some(TenantSlot::InProgress(_)) => {
2411 : // Expected case: we find our InProgress in the map: nothing should have
2412 : // replaced it because the code that acquires slots will not grant another
2413 : // one for the same TenantId.
2414 0 : Ok(())
2415 : }
2416 : None => {
2417 0 : METRICS.unexpected_errors.inc();
2418 0 : error!(
2419 0 : tenant_shard_id = %self.tenant_shard_id,
2420 0 : "Missing InProgress marker during tenant upsert, this is a bug."
2421 0 : );
2422 0 : Err(TenantSlotUpsertError::InternalError(
2423 0 : "Missing InProgress marker during tenant upsert".into(),
2424 0 : ))
2425 : }
2426 0 : Some(slot) => {
2427 0 : METRICS.unexpected_errors.inc();
2428 0 : error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot);
2429 0 : Err(TenantSlotUpsertError::InternalError(
2430 0 : "Unexpected contents of TenantSlot".into(),
2431 0 : ))
2432 : }
2433 : }
2434 0 : }
2435 :
2436 : /// Replace the InProgress slot with whatever was in the guard when we started
2437 0 : fn revert(mut self) {
2438 0 : if let Some(value) = self.old_value.take() {
2439 0 : match self.upsert(value) {
2440 0 : Err(TenantSlotUpsertError::InternalError(_)) => {
2441 0 : // We already logged the error, nothing else we can do.
2442 0 : }
2443 : Err(
2444 : TenantSlotUpsertError::MapState(_) | TenantSlotUpsertError::ShuttingDown(_),
2445 0 : ) => {
2446 0 : // If the map is shutting down, we need not replace anything
2447 0 : }
2448 0 : Ok(()) => {}
2449 : }
2450 0 : }
2451 0 : }
2452 :
2453 : /// We may never drop our old value until it is cleanly shut down: otherwise we might leave
2454 : /// rogue background tasks that would write to the local tenant directory that this guard
2455 : /// is responsible for protecting
2456 2 : fn old_value_is_shutdown(&self) -> bool {
2457 2 : match self.old_value.as_ref() {
2458 2 : Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(),
2459 0 : Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(),
2460 : Some(TenantSlot::InProgress(_)) => {
2461 : // A SlotGuard cannot be constructed for a slot that was already InProgress
2462 0 : unreachable!()
2463 : }
2464 0 : None => true,
2465 : }
2466 2 : }
2467 :
2468 : /// The guard holder is done with the old value of the slot: they are obliged to already
2469 : /// shut it down before we reach this point.
2470 2 : fn drop_old_value(&mut self) -> Result<(), TenantSlotDropError> {
2471 2 : if !self.old_value_is_shutdown() {
2472 0 : Err(TenantSlotDropError::NotShutdown)
2473 : } else {
2474 2 : self.old_value.take();
2475 2 : Ok(())
2476 : }
2477 2 : }
2478 : }
2479 :
2480 : impl Drop for SlotGuard {
2481 2 : fn drop(&mut self) {
2482 2 : if self.upserted {
2483 0 : return;
2484 2 : }
2485 2 : // Our old value is already shutdown, or it never existed: it is safe
2486 2 : // for us to fully release the TenantSlot back into an empty state
2487 2 :
2488 2 : let mut locked = TENANTS.write().unwrap();
2489 :
2490 2 : let m = match &mut *locked {
2491 : TenantsMap::Initializing => {
2492 : // There is no map, this should never happen.
2493 2 : return;
2494 : }
2495 : TenantsMap::ShuttingDown(_) => {
2496 : // When we transition to shutdown, InProgress elements are removed
2497 : // from the map, so we do not need to clean up our Inprogress marker.
2498 : // See [`shutdown_all_tenants0`]
2499 0 : return;
2500 : }
2501 0 : TenantsMap::Open(m) => m,
2502 0 : };
2503 0 :
2504 0 : use std::collections::btree_map::Entry;
2505 0 : match m.entry(self.tenant_shard_id) {
2506 0 : Entry::Occupied(mut entry) => {
2507 0 : if !matches!(entry.get(), TenantSlot::InProgress(_)) {
2508 0 : METRICS.unexpected_errors.inc();
2509 0 : error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get());
2510 0 : }
2511 :
2512 0 : if self.old_value_is_shutdown() {
2513 0 : entry.remove();
2514 0 : } else {
2515 0 : entry.insert(self.old_value.take().unwrap());
2516 0 : }
2517 : }
2518 : Entry::Vacant(_) => {
2519 0 : METRICS.unexpected_errors.inc();
2520 0 : error!(
2521 0 : tenant_shard_id = %self.tenant_shard_id,
2522 0 : "Missing InProgress marker during SlotGuard drop, this is a bug."
2523 0 : );
2524 : }
2525 : }
2526 :
2527 0 : METRICS.tenant_slots.set(m.len() as u64);
2528 2 : }
2529 : }
2530 :
2531 : enum TenantSlotPeekMode {
2532 : /// In Read mode, peek will be permitted to see the slots even if the pageserver is shutting down
2533 : Read,
2534 : /// In Write mode, trying to peek at a slot while the pageserver is shutting down is an error
2535 : Write,
2536 : }
2537 :
2538 0 : fn tenant_map_peek_slot<'a>(
2539 0 : tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
2540 0 : tenant_shard_id: &TenantShardId,
2541 0 : mode: TenantSlotPeekMode,
2542 0 : ) -> Result<Option<&'a TenantSlot>, TenantMapError> {
2543 0 : match tenants.deref() {
2544 0 : TenantsMap::Initializing => Err(TenantMapError::StillInitializing),
2545 0 : TenantsMap::ShuttingDown(m) => match mode {
2546 : TenantSlotPeekMode::Read => Ok(Some(
2547 : // When reading in ShuttingDown state, we must translate None results
2548 : // into a ShuttingDown error, because absence of a tenant shard ID in the map
2549 : // isn't a reliable indicator of the tenant being gone: it might have been
2550 : // InProgress when shutdown started, and cleaned up from that state such
2551 : // that it's now no longer in the map. Callers will have to wait until
2552 : // we next start up to get a proper answer. This avoids incorrect 404 API responses.
2553 0 : m.get(tenant_shard_id).ok_or(TenantMapError::ShuttingDown)?,
2554 : )),
2555 0 : TenantSlotPeekMode::Write => Err(TenantMapError::ShuttingDown),
2556 : },
2557 0 : TenantsMap::Open(m) => Ok(m.get(tenant_shard_id)),
2558 : }
2559 0 : }
2560 :
2561 : enum TenantSlotAcquireMode {
2562 : /// Acquire the slot irrespective of current state, or whether it already exists
2563 : Any,
2564 : /// Return an error if trying to acquire a slot and it doesn't already exist
2565 : MustExist,
2566 : /// Return an error if trying to acquire a slot and it already exists
2567 : MustNotExist,
2568 : }
2569 :
2570 0 : fn tenant_map_acquire_slot(
2571 0 : tenant_shard_id: &TenantShardId,
2572 0 : mode: TenantSlotAcquireMode,
2573 0 : ) -> Result<SlotGuard, TenantSlotError> {
2574 0 : tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode)
2575 0 : }
2576 :
2577 2 : fn tenant_map_acquire_slot_impl(
2578 2 : tenant_shard_id: &TenantShardId,
2579 2 : tenants: &std::sync::RwLock<TenantsMap>,
2580 2 : mode: TenantSlotAcquireMode,
2581 2 : ) -> Result<SlotGuard, TenantSlotError> {
2582 2 : use TenantSlotAcquireMode::*;
2583 2 : METRICS.tenant_slot_writes.inc();
2584 2 :
2585 2 : let mut locked = tenants.write().unwrap();
2586 2 : let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug());
2587 2 : let _guard = span.enter();
2588 :
2589 2 : let m = match &mut *locked {
2590 0 : TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
2591 0 : TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
2592 2 : TenantsMap::Open(m) => m,
2593 2 : };
2594 2 :
2595 2 : use std::collections::btree_map::Entry;
2596 2 :
2597 2 : let entry = m.entry(*tenant_shard_id);
2598 2 :
2599 2 : match entry {
2600 0 : Entry::Vacant(v) => match mode {
2601 : MustExist => {
2602 0 : tracing::debug!("Vacant && MustExist: return NotFound");
2603 0 : Err(TenantSlotError::NotFound(*tenant_shard_id))
2604 : }
2605 : _ => {
2606 0 : let (completion, barrier) = utils::completion::channel();
2607 0 : v.insert(TenantSlot::InProgress(barrier));
2608 0 : tracing::debug!("Vacant, inserted InProgress");
2609 0 : Ok(SlotGuard::new(*tenant_shard_id, None, completion))
2610 : }
2611 : },
2612 2 : Entry::Occupied(mut o) => {
2613 2 : // Apply mode-driven checks
2614 2 : match (o.get(), mode) {
2615 : (TenantSlot::InProgress(_), _) => {
2616 0 : tracing::debug!("Occupied, failing for InProgress");
2617 0 : Err(TenantSlotError::InProgress)
2618 : }
2619 0 : (slot, MustNotExist) => match slot {
2620 0 : TenantSlot::Attached(tenant) => {
2621 0 : tracing::debug!("Attached && MustNotExist, return AlreadyExists");
2622 0 : Err(TenantSlotError::AlreadyExists(
2623 0 : *tenant_shard_id,
2624 0 : tenant.current_state(),
2625 0 : ))
2626 : }
2627 : _ => {
2628 : // FIXME: the AlreadyExists error assumes that we have a Tenant
2629 : // to get the state from
2630 0 : tracing::debug!("Occupied & MustNotExist, return AlreadyExists");
2631 0 : Err(TenantSlotError::AlreadyExists(
2632 0 : *tenant_shard_id,
2633 0 : TenantState::Broken {
2634 0 : reason: "Present but not attached".to_string(),
2635 0 : backtrace: "".to_string(),
2636 0 : },
2637 0 : ))
2638 : }
2639 : },
2640 : _ => {
2641 : // Happy case: the slot was not in any state that violated our mode
2642 2 : let (completion, barrier) = utils::completion::channel();
2643 2 : let old_value = o.insert(TenantSlot::InProgress(barrier));
2644 2 : tracing::debug!("Occupied, replaced with InProgress");
2645 2 : Ok(SlotGuard::new(
2646 2 : *tenant_shard_id,
2647 2 : Some(old_value),
2648 2 : completion,
2649 2 : ))
2650 : }
2651 : }
2652 : }
2653 : }
2654 2 : }
2655 :
2656 : /// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
2657 : /// Allows to remove other tenant resources manually, via `tenant_cleanup`.
2658 : /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
2659 : /// operation would be needed to remove it.
2660 2 : async fn remove_tenant_from_memory<V, F>(
2661 2 : tenants: &std::sync::RwLock<TenantsMap>,
2662 2 : tenant_shard_id: TenantShardId,
2663 2 : tenant_cleanup: F,
2664 2 : ) -> Result<V, TenantStateError>
2665 2 : where
2666 2 : F: std::future::Future<Output = anyhow::Result<V>>,
2667 2 : {
2668 2 : let mut slot_guard =
2669 2 : tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
2670 :
2671 : // allow pageserver shutdown to await for our completion
2672 2 : let (_guard, progress) = completion::channel();
2673 :
2674 : // The SlotGuard allows us to manipulate the Tenant object without fear of some
2675 : // concurrent API request doing something else for the same tenant ID.
2676 2 : let attached_tenant = match slot_guard.get_old_value() {
2677 2 : Some(TenantSlot::Attached(tenant)) => {
2678 2 : // whenever we remove a tenant from memory, we don't want to flush and wait for upload
2679 2 : let shutdown_mode = ShutdownMode::Hard;
2680 2 :
2681 2 : // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
2682 2 : // that we can continue safely to cleanup.
2683 2 : match tenant.shutdown(progress, shutdown_mode).await {
2684 2 : Ok(()) => {}
2685 0 : Err(_other) => {
2686 0 : // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
2687 0 : // wait for it but return an error right away because these are distinct requests.
2688 0 : slot_guard.revert();
2689 0 : return Err(TenantStateError::IsStopping(tenant_shard_id));
2690 : }
2691 : }
2692 2 : Some(tenant)
2693 : }
2694 0 : Some(TenantSlot::Secondary(secondary_state)) => {
2695 0 : tracing::info!("Shutting down in secondary mode");
2696 0 : secondary_state.shutdown().await;
2697 0 : None
2698 : }
2699 : Some(TenantSlot::InProgress(_)) => {
2700 : // Acquiring a slot guarantees its old value was not InProgress
2701 0 : unreachable!();
2702 : }
2703 0 : None => None,
2704 : };
2705 :
2706 2 : match tenant_cleanup
2707 2 : .await
2708 2 : .with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
2709 : {
2710 2 : Ok(hook_value) => {
2711 2 : // Success: drop the old TenantSlot::Attached.
2712 2 : slot_guard
2713 2 : .drop_old_value()
2714 2 : .expect("We just called shutdown");
2715 2 :
2716 2 : Ok(hook_value)
2717 : }
2718 0 : Err(e) => {
2719 : // If we had a Tenant, set it to Broken and put it back in the TenantsMap
2720 0 : if let Some(attached_tenant) = attached_tenant {
2721 0 : attached_tenant.set_broken(e.to_string()).await;
2722 0 : }
2723 : // Leave the broken tenant in the map
2724 0 : slot_guard.revert();
2725 0 :
2726 0 : Err(TenantStateError::Other(e))
2727 : }
2728 : }
2729 2 : }
2730 :
2731 : use {
2732 : crate::repository::GcResult, pageserver_api::models::TimelineGcRequest,
2733 : utils::http::error::ApiError,
2734 : };
2735 :
2736 0 : pub(crate) fn immediate_gc(
2737 0 : tenant_shard_id: TenantShardId,
2738 0 : timeline_id: TimelineId,
2739 0 : gc_req: TimelineGcRequest,
2740 0 : cancel: CancellationToken,
2741 0 : ctx: &RequestContext,
2742 0 : ) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
2743 0 : let guard = TENANTS.read().unwrap();
2744 :
2745 0 : let tenant = guard
2746 0 : .get(&tenant_shard_id)
2747 0 : .cloned()
2748 0 : .with_context(|| format!("tenant {tenant_shard_id}"))
2749 0 : .map_err(|e| ApiError::NotFound(e.into()))?;
2750 :
2751 0 : let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
2752 0 : // Use tenant's pitr setting
2753 0 : let pitr = tenant.get_pitr_interval();
2754 0 :
2755 0 : // Run in task_mgr to avoid race with tenant_detach operation
2756 0 : let ctx = ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
2757 0 : let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
2758 0 : let span = info_span!("manual_gc", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id);
2759 :
2760 : // TODO: spawning is redundant now, need to hold the gate
2761 0 : task_mgr::spawn(
2762 0 : &tokio::runtime::Handle::current(),
2763 0 : TaskKind::GarbageCollector,
2764 0 : Some(tenant_shard_id),
2765 0 : Some(timeline_id),
2766 0 : &format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"),
2767 0 : false,
2768 0 : async move {
2769 0 : fail::fail_point!("immediate_gc_task_pre");
2770 :
2771 : #[allow(unused_mut)]
2772 0 : let mut result = tenant
2773 0 : .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
2774 0 : .await;
2775 : // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
2776 : // better once the types support it.
2777 :
2778 : #[cfg(feature = "testing")]
2779 : {
2780 : // we need to synchronize with drop completion for python tests without polling for
2781 : // log messages
2782 0 : if let Ok(result) = result.as_mut() {
2783 0 : let mut js = tokio::task::JoinSet::new();
2784 0 : for layer in std::mem::take(&mut result.doomed_layers) {
2785 0 : js.spawn(layer.wait_drop());
2786 0 : }
2787 0 : tracing::info!(total = js.len(), "starting to wait for the gc'd layers to be dropped");
2788 0 : while let Some(res) = js.join_next().await {
2789 0 : res.expect("wait_drop should not panic");
2790 0 : }
2791 0 : }
2792 :
2793 0 : let timeline = tenant.get_timeline(timeline_id, false).ok();
2794 0 : let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref());
2795 :
2796 0 : if let Some(rtc) = rtc {
2797 : // layer drops schedule actions on remote timeline client to actually do the
2798 : // deletions; don't care about the shutdown error, just exit fast
2799 0 : drop(rtc.wait_completion().await);
2800 0 : }
2801 : }
2802 :
2803 0 : match task_done.send(result) {
2804 0 : Ok(_) => (),
2805 0 : Err(result) => error!("failed to send gc result: {result:?}"),
2806 : }
2807 0 : Ok(())
2808 0 : }
2809 0 : .instrument(span)
2810 0 : );
2811 0 :
2812 0 : // drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
2813 0 : drop(guard);
2814 0 :
2815 0 : Ok(wait_task_done)
2816 0 : }
2817 :
2818 : #[cfg(test)]
2819 : mod tests {
2820 : use std::collections::BTreeMap;
2821 : use std::sync::Arc;
2822 : use tracing::Instrument;
2823 :
2824 : use crate::tenant::mgr::TenantSlot;
2825 :
2826 : use super::{super::harness::TenantHarness, TenantsMap};
2827 :
2828 : #[tokio::test(start_paused = true)]
2829 2 : async fn shutdown_awaits_in_progress_tenant() {
2830 2 : // Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully
2831 2 : // wait for it to complete before proceeding.
2832 2 :
2833 2 : let h = TenantHarness::create("shutdown_awaits_in_progress_tenant").unwrap();
2834 2 : let (t, _ctx) = h.load().await;
2835 2 :
2836 2 : // harness loads it to active, which is forced and nothing is running on the tenant
2837 2 :
2838 2 : let id = t.tenant_shard_id();
2839 2 :
2840 2 : // tenant harness configures the logging and we cannot escape it
2841 2 : let span = h.span();
2842 2 : let _e = span.enter();
2843 2 :
2844 2 : let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
2845 2 : let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
2846 2 :
2847 2 : // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
2848 2 : // permit it to proceed: that will stick the tenant in InProgress
2849 2 :
2850 2 : let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
2851 2 : let (until_cleanup_started, cleanup_started) = utils::completion::channel();
2852 2 : let mut remove_tenant_from_memory_task = {
2853 2 : let jh = tokio::spawn({
2854 2 : let tenants = tenants.clone();
2855 2 : async move {
2856 2 : let cleanup = async move {
2857 2 : drop(until_cleanup_started);
2858 2 : can_complete_cleanup.wait().await;
2859 2 : anyhow::Ok(())
2860 2 : };
2861 2 : super::remove_tenant_from_memory(&tenants, id, cleanup).await
2862 2 : }
2863 2 : .instrument(h.span())
2864 2 : });
2865 2 :
2866 2 : // now the long cleanup should be in place, with the stopping state
2867 2 : cleanup_started.wait().await;
2868 2 : jh
2869 2 : };
2870 2 :
2871 2 : let mut shutdown_task = {
2872 2 : let (until_shutdown_started, shutdown_started) = utils::completion::channel();
2873 2 :
2874 2 : let shutdown_task = tokio::spawn(async move {
2875 2 : drop(until_shutdown_started);
2876 4 : super::shutdown_all_tenants0(&tenants).await;
2877 2 : });
2878 2 :
2879 2 : shutdown_started.wait().await;
2880 2 : shutdown_task
2881 2 : };
2882 2 :
2883 2 : let long_time = std::time::Duration::from_secs(15);
2884 4 : tokio::select! {
2885 4 : _ = &mut shutdown_task => unreachable!("shutdown should block on remove_tenant_from_memory completing"),
2886 4 : _ = &mut remove_tenant_from_memory_task => unreachable!("remove_tenant_from_memory_task should not complete until explicitly unblocked"),
2887 4 : _ = tokio::time::sleep(long_time) => {},
2888 4 : }
2889 2 :
2890 2 : drop(until_cleanup_completed);
2891 2 :
2892 2 : // Now that we allow it to proceed, shutdown should complete immediately
2893 2 : remove_tenant_from_memory_task.await.unwrap().unwrap();
2894 2 : shutdown_task.await.unwrap();
2895 2 : }
2896 : }
|