Line data Source code
1 : //! This module acts as a switchboard to access different repositories managed by this
2 : //! page server.
3 :
4 : use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
5 : use futures::stream::StreamExt;
6 : use itertools::Itertools;
7 : use pageserver_api::key::Key;
8 : use pageserver_api::models::ShardParameters;
9 : use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId};
10 : use rand::{distributions::Alphanumeric, Rng};
11 : use std::borrow::Cow;
12 : use std::cmp::Ordering;
13 : use std::collections::{BTreeMap, HashMap};
14 : use std::ops::Deref;
15 : use std::sync::Arc;
16 : use std::time::{Duration, Instant};
17 : use tokio::fs;
18 : use utils::timeout::{timeout_cancellable, TimeoutCancellableError};
19 :
20 : use anyhow::Context;
21 : use once_cell::sync::Lazy;
22 : use tokio::task::JoinSet;
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::*;
25 :
26 : use remote_storage::GenericRemoteStorage;
27 : use utils::{completion, crashsafe};
28 :
29 : use crate::config::PageServerConf;
30 : use crate::context::{DownloadBehavior, RequestContext};
31 : use crate::control_plane_client::{
32 : ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError,
33 : };
34 : use crate::deletion_queue::DeletionQueueClient;
35 : use crate::http::routes::ACTIVE_TENANT_TIMEOUT;
36 : use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
37 : use crate::task_mgr::{self, TaskKind};
38 : use crate::tenant::config::{
39 : AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, SecondaryLocationConfig,
40 : TenantConfOpt,
41 : };
42 : use crate::tenant::delete::DeleteTenantFlow;
43 : use crate::tenant::span::debug_assert_current_span_has_tenant_id;
44 : use crate::tenant::{AttachedTenantConf, SpawnMode, Tenant, TenantState};
45 : use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TEMP_FILE_SUFFIX};
46 :
47 : use utils::crashsafe::path_with_suffix_extension;
48 : use utils::fs_ext::PathExt;
49 : use utils::generation::Generation;
50 : use utils::id::{TenantId, TimelineId};
51 :
52 : use super::delete::DeleteTenantError;
53 : use super::secondary::SecondaryTenant;
54 : use super::TenantSharedResources;
55 :
56 : /// For a tenant that appears in TenantsMap, it may either be
57 : /// - `Attached`: has a full Tenant object, is elegible to service
58 : /// reads and ingest WAL.
59 : /// - `Secondary`: is only keeping a local cache warm.
60 : ///
61 : /// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
62 : /// that way we avoid having to carefully switch a tenant's ingestion etc on and off during
63 : /// its lifetime, and we can preserve some important safety invariants like `Tenant` always
64 : /// having a properly acquired generation (Secondary doesn't need a generation)
65 0 : #[derive(Clone)]
66 : pub(crate) enum TenantSlot {
67 : Attached(Arc<Tenant>),
68 : Secondary(Arc<SecondaryTenant>),
69 : /// In this state, other administrative operations acting on the TenantId should
70 : /// block, or return a retry indicator equivalent to HTTP 503.
71 : InProgress(utils::completion::Barrier),
72 : }
73 :
74 : impl std::fmt::Debug for TenantSlot {
75 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 0 : match self {
77 0 : Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()),
78 0 : Self::Secondary(_) => write!(f, "Secondary"),
79 0 : Self::InProgress(_) => write!(f, "InProgress"),
80 : }
81 0 : }
82 : }
83 :
84 : impl TenantSlot {
85 : /// Return the `Tenant` in this slot if attached, else None
86 0 : fn get_attached(&self) -> Option<&Arc<Tenant>> {
87 0 : match self {
88 0 : Self::Attached(t) => Some(t),
89 0 : Self::Secondary(_) => None,
90 0 : Self::InProgress(_) => None,
91 : }
92 0 : }
93 : }
94 :
95 : /// The tenants known to the pageserver.
96 : /// The enum variants are used to distinguish the different states that the pageserver can be in.
97 : pub(crate) enum TenantsMap {
98 : /// [`init_tenant_mgr`] is not done yet.
99 : Initializing,
100 : /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
101 : /// New tenants can be added using [`tenant_map_acquire_slot`].
102 : Open(BTreeMap<TenantShardId, TenantSlot>),
103 : /// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
104 : /// Existing tenants are still accessible, but no new tenants can be created.
105 : ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
106 : }
107 :
108 : pub(crate) enum TenantsMapRemoveResult {
109 : Occupied(TenantSlot),
110 : Vacant,
111 : InProgress(utils::completion::Barrier),
112 : }
113 :
114 : /// When resolving a TenantId to a shard, we may be looking for the 0th
115 : /// shard, or we might be looking for whichever shard holds a particular page.
116 : pub(crate) enum ShardSelector {
117 : /// Only return the 0th shard, if it is present. If a non-0th shard is present,
118 : /// ignore it.
119 : Zero,
120 : /// Pick the first shard we find for the TenantId
121 : First,
122 : /// Pick the shard that holds this key
123 : Page(Key),
124 : }
125 :
126 : impl TenantsMap {
127 : /// Convenience function for typical usage, where we want to get a `Tenant` object, for
128 : /// working with attached tenants. If the TenantId is in the map but in Secondary state,
129 : /// None is returned.
130 0 : pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc<Tenant>> {
131 0 : match self {
132 0 : TenantsMap::Initializing => None,
133 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
134 0 : m.get(tenant_shard_id).and_then(|slot| slot.get_attached())
135 : }
136 : }
137 0 : }
138 :
139 : /// A page service client sends a TenantId, and to look up the correct Tenant we must
140 : /// resolve this to a fully qualified TenantShardId.
141 0 : fn resolve_attached_shard(
142 0 : &self,
143 0 : tenant_id: &TenantId,
144 0 : selector: ShardSelector,
145 0 : ) -> Option<TenantShardId> {
146 0 : let mut want_shard = None;
147 0 : match self {
148 0 : TenantsMap::Initializing => None,
149 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
150 0 : for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
151 : // Ignore all slots that don't contain an attached tenant
152 0 : let tenant = match &slot.1 {
153 0 : TenantSlot::Attached(t) => t,
154 0 : _ => continue,
155 : };
156 :
157 0 : match selector {
158 0 : ShardSelector::First => return Some(*slot.0),
159 0 : ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
160 0 : return Some(*slot.0)
161 : }
162 0 : ShardSelector::Page(key) => {
163 0 : // First slot we see for this tenant, calculate the expected shard number
164 0 : // for the key: we will use this for checking if this and subsequent
165 0 : // slots contain the key, rather than recalculating the hash each time.
166 0 : if want_shard.is_none() {
167 0 : want_shard = Some(tenant.shard_identity.get_shard_number(&key));
168 0 : }
169 :
170 0 : if Some(tenant.shard_identity.number) == want_shard {
171 0 : return Some(*slot.0);
172 0 : }
173 : }
174 0 : _ => continue,
175 : }
176 : }
177 :
178 : // Fall through: we didn't find an acceptable shard
179 0 : None
180 : }
181 : }
182 0 : }
183 :
184 : /// Only for use from DeleteTenantFlow. This method directly removes a TenantSlot from the map.
185 : ///
186 : /// The normal way to remove a tenant is using a SlotGuard, which will gracefully remove the guarded
187 : /// slot if the enclosed tenant is shutdown.
188 0 : pub(crate) fn remove(&mut self, tenant_shard_id: TenantShardId) -> TenantsMapRemoveResult {
189 0 : use std::collections::btree_map::Entry;
190 0 : match self {
191 0 : TenantsMap::Initializing => TenantsMapRemoveResult::Vacant,
192 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => match m.entry(tenant_shard_id) {
193 0 : Entry::Occupied(entry) => match entry.get() {
194 0 : TenantSlot::InProgress(barrier) => {
195 0 : TenantsMapRemoveResult::InProgress(barrier.clone())
196 : }
197 0 : _ => TenantsMapRemoveResult::Occupied(entry.remove()),
198 : },
199 0 : Entry::Vacant(_entry) => TenantsMapRemoveResult::Vacant,
200 : },
201 : }
202 0 : }
203 :
204 0 : pub(crate) fn len(&self) -> usize {
205 0 : match self {
206 0 : TenantsMap::Initializing => 0,
207 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.len(),
208 : }
209 0 : }
210 : }
211 :
212 : /// This is "safe" in that that it won't leave behind a partially deleted directory
213 : /// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting
214 : /// the contents.
215 : ///
216 : /// This is pageserver-specific, as it relies on future processes after a crash to check
217 : /// for TEMP_FILE_SUFFIX when loading things.
218 0 : async fn safe_remove_tenant_dir_all(path: impl AsRef<Utf8Path>) -> std::io::Result<()> {
219 0 : let tmp_path = safe_rename_tenant_dir(path).await?;
220 0 : fs::remove_dir_all(tmp_path).await
221 0 : }
222 :
223 0 : async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<Utf8PathBuf> {
224 0 : let parent = path
225 0 : .as_ref()
226 0 : .parent()
227 0 : // It is invalid to call this function with a relative path. Tenant directories
228 0 : // should always have a parent.
229 0 : .ok_or(std::io::Error::new(
230 0 : std::io::ErrorKind::InvalidInput,
231 0 : "Path must be absolute",
232 0 : ))?;
233 0 : let rand_suffix = rand::thread_rng()
234 0 : .sample_iter(&Alphanumeric)
235 0 : .take(8)
236 0 : .map(char::from)
237 0 : .collect::<String>()
238 0 : + TEMP_FILE_SUFFIX;
239 0 : let tmp_path = path_with_suffix_extension(&path, &rand_suffix);
240 0 : fs::rename(path.as_ref(), &tmp_path).await?;
241 0 : fs::File::open(parent).await?.sync_all().await?;
242 0 : Ok(tmp_path)
243 0 : }
244 :
245 : static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
246 2 : Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
247 :
248 : /// The TenantManager is responsible for storing and mutating the collection of all tenants
249 : /// that this pageserver process has state for. Every Tenant and SecondaryTenant instance
250 : /// lives inside the TenantManager.
251 : ///
252 : /// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
253 : /// the same tenant twice concurrently, or trying to configure the same tenant into secondary
254 : /// and attached modes concurrently.
255 : pub struct TenantManager {
256 : conf: &'static PageServerConf,
257 : // TODO: currently this is a &'static pointing to TENANTs. When we finish refactoring
258 : // out of that static variable, the TenantManager can own this.
259 : // See https://github.com/neondatabase/neon/issues/5796
260 : tenants: &'static std::sync::RwLock<TenantsMap>,
261 : resources: TenantSharedResources,
262 : }
263 :
264 0 : fn emergency_generations(
265 0 : tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
266 0 : ) -> HashMap<TenantShardId, Generation> {
267 0 : tenant_confs
268 0 : .iter()
269 0 : .filter_map(|(tid, lc)| {
270 0 : let lc = match lc {
271 0 : Ok(lc) => lc,
272 0 : Err(_) => return None,
273 : };
274 0 : let gen = match &lc.mode {
275 0 : LocationMode::Attached(alc) => Some(alc.generation),
276 0 : LocationMode::Secondary(_) => None,
277 : };
278 :
279 0 : gen.map(|g| (*tid, g))
280 0 : })
281 0 : .collect()
282 0 : }
283 :
284 0 : async fn init_load_generations(
285 0 : conf: &'static PageServerConf,
286 0 : tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
287 0 : resources: &TenantSharedResources,
288 0 : cancel: &CancellationToken,
289 0 : ) -> anyhow::Result<Option<HashMap<TenantShardId, Generation>>> {
290 0 : let generations = if conf.control_plane_emergency_mode {
291 0 : error!(
292 0 : "Emergency mode! Tenants will be attached unsafely using their last known generation"
293 0 : );
294 0 : emergency_generations(tenant_confs)
295 0 : } else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
296 0 : info!("Calling control plane API to re-attach tenants");
297 : // If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
298 0 : match client.re_attach().await {
299 0 : Ok(tenants) => tenants,
300 : Err(RetryForeverError::ShuttingDown) => {
301 0 : anyhow::bail!("Shut down while waiting for control plane re-attach response")
302 : }
303 : }
304 : } else {
305 0 : info!("Control plane API not configured, tenant generations are disabled");
306 0 : return Ok(None);
307 : };
308 :
309 : // The deletion queue needs to know about the startup attachment state to decide which (if any) stored
310 : // deletion list entries may still be valid. We provide that by pushing a recovery operation into
311 : // the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
312 : // are processed, even though we don't block on recovery completing here.
313 : //
314 : // Must only do this if remote storage is enabled, otherwise deletion queue
315 : // is not running and channel push will fail.
316 0 : if resources.remote_storage.is_some() {
317 0 : resources
318 0 : .deletion_queue_client
319 0 : .recover(generations.clone())?;
320 0 : }
321 :
322 0 : Ok(Some(generations))
323 0 : }
324 :
325 : /// Given a directory discovered in the pageserver's tenants/ directory, attempt
326 : /// to load a tenant config from it.
327 : ///
328 : /// If file is missing, return Ok(None)
329 0 : fn load_tenant_config(
330 0 : conf: &'static PageServerConf,
331 0 : dentry: Utf8DirEntry,
332 0 : ) -> anyhow::Result<Option<(TenantShardId, anyhow::Result<LocationConf>)>> {
333 0 : let tenant_dir_path = dentry.path().to_path_buf();
334 0 : if crate::is_temporary(&tenant_dir_path) {
335 0 : info!("Found temporary tenant directory, removing: {tenant_dir_path}");
336 : // No need to use safe_remove_tenant_dir_all because this is already
337 : // a temporary path
338 0 : if let Err(e) = std::fs::remove_dir_all(&tenant_dir_path) {
339 0 : error!(
340 0 : "Failed to remove temporary directory '{}': {:?}",
341 0 : tenant_dir_path, e
342 0 : );
343 0 : }
344 0 : return Ok(None);
345 0 : }
346 :
347 : // This case happens if we crash during attachment before writing a config into the dir
348 0 : let is_empty = tenant_dir_path
349 0 : .is_empty_dir()
350 0 : .with_context(|| format!("Failed to check whether {tenant_dir_path:?} is an empty dir"))?;
351 0 : if is_empty {
352 0 : info!("removing empty tenant directory {tenant_dir_path:?}");
353 0 : if let Err(e) = std::fs::remove_dir(&tenant_dir_path) {
354 0 : error!(
355 0 : "Failed to remove empty tenant directory '{}': {e:#}",
356 0 : tenant_dir_path
357 0 : )
358 0 : }
359 0 : return Ok(None);
360 0 : }
361 :
362 0 : let tenant_shard_id = match tenant_dir_path
363 0 : .file_name()
364 0 : .unwrap_or_default()
365 0 : .parse::<TenantShardId>()
366 : {
367 0 : Ok(id) => id,
368 : Err(_) => {
369 0 : warn!("Invalid tenant path (garbage in our repo directory?): {tenant_dir_path}",);
370 0 : return Ok(None);
371 : }
372 : };
373 :
374 : // Clean up legacy `metadata` files.
375 : // Doing it here because every single tenant directory is visited here.
376 : // In any later code, there's different treatment of tenant dirs
377 : // ... depending on whether the tenant is in re-attach response or not
378 : // ... epending on whether the tenant is ignored or not
379 0 : assert_eq!(
380 0 : &conf.tenant_path(&tenant_shard_id),
381 0 : &tenant_dir_path,
382 0 : "later use of conf....path() methods would be dubious"
383 : );
384 0 : let timelines: Vec<TimelineId> = match conf.timelines_path(&tenant_shard_id).read_dir_utf8() {
385 0 : Ok(iter) => {
386 0 : let mut timelines = Vec::new();
387 0 : for res in iter {
388 0 : let p = res?;
389 0 : let Some(timeline_id) = p.file_name().parse::<TimelineId>().ok() else {
390 : // skip any entries that aren't TimelineId, such as
391 : // - *.___temp dirs
392 : // - unfinished initdb uploads (test_non_uploaded_root_timeline_is_deleted_after_restart)
393 0 : continue;
394 : };
395 0 : timelines.push(timeline_id);
396 : }
397 0 : timelines
398 : }
399 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => vec![],
400 0 : Err(e) => return Err(anyhow::anyhow!(e)),
401 : };
402 0 : for timeline_id in timelines {
403 0 : let timeline_path = &conf.timeline_path(&tenant_shard_id, &timeline_id);
404 0 : let metadata_path = timeline_path.join(METADATA_FILE_NAME);
405 0 : match std::fs::remove_file(&metadata_path) {
406 : Ok(()) => {
407 0 : crashsafe::fsync(timeline_path)
408 0 : .context("fsync timeline dir after removing legacy metadata file")?;
409 0 : info!("removed legacy metadata file at {metadata_path}");
410 : }
411 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
412 0 : // something removed the file earlier, or it was never there
413 0 : // We don't care, this software version doesn't write it again, so, we're good.
414 0 : }
415 0 : Err(e) => {
416 0 : anyhow::bail!("remove legacy metadata file: {e}: {metadata_path}");
417 : }
418 : }
419 : }
420 :
421 0 : let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
422 0 : if tenant_ignore_mark_file.exists() {
423 0 : info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
424 0 : return Ok(None);
425 0 : }
426 0 :
427 0 : Ok(Some((
428 0 : tenant_shard_id,
429 0 : Tenant::load_tenant_config(conf, &tenant_shard_id),
430 0 : )))
431 0 : }
432 :
433 : /// Initial stage of load: walk the local tenants directory, clean up any temp files,
434 : /// and load configurations for the tenants we found.
435 : ///
436 : /// Do this in parallel, because we expect 10k+ tenants, so serial execution can take
437 : /// seconds even on reasonably fast drives.
438 0 : async fn init_load_tenant_configs(
439 0 : conf: &'static PageServerConf,
440 0 : ) -> anyhow::Result<HashMap<TenantShardId, anyhow::Result<LocationConf>>> {
441 0 : let tenants_dir = conf.tenants_path();
442 :
443 0 : let dentries = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<Utf8DirEntry>> {
444 0 : let dir_entries = tenants_dir
445 0 : .read_dir_utf8()
446 0 : .with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
447 :
448 0 : Ok(dir_entries.collect::<Result<Vec<_>, std::io::Error>>()?)
449 0 : })
450 0 : .await??;
451 :
452 0 : let mut configs = HashMap::new();
453 0 :
454 0 : let mut join_set = JoinSet::new();
455 0 : for dentry in dentries {
456 0 : join_set.spawn_blocking(move || load_tenant_config(conf, dentry));
457 0 : }
458 :
459 0 : while let Some(r) = join_set.join_next().await {
460 0 : if let Some((tenant_id, tenant_config)) = r?? {
461 0 : configs.insert(tenant_id, tenant_config);
462 0 : }
463 : }
464 :
465 0 : Ok(configs)
466 0 : }
467 :
468 : /// Initialize repositories with locally available timelines.
469 : /// Timelines that are only partially available locally (remote storage has more data than this pageserver)
470 : /// are scheduled for download and added to the tenant once download is completed.
471 0 : #[instrument(skip_all)]
472 : pub async fn init_tenant_mgr(
473 : conf: &'static PageServerConf,
474 : resources: TenantSharedResources,
475 : init_order: InitializationOrder,
476 : cancel: CancellationToken,
477 : ) -> anyhow::Result<TenantManager> {
478 : let mut tenants = BTreeMap::new();
479 :
480 : let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
481 :
482 : // Scan local filesystem for attached tenants
483 : let tenant_configs = init_load_tenant_configs(conf).await?;
484 :
485 : // Determine which tenants are to be attached
486 : let tenant_generations =
487 : init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
488 :
489 0 : tracing::info!(
490 0 : "Attaching {} tenants at startup, warming up {} at a time",
491 0 : tenant_configs.len(),
492 0 : conf.concurrent_tenant_warmup.initial_permits()
493 0 : );
494 : TENANT.startup_scheduled.inc_by(tenant_configs.len() as u64);
495 :
496 : // Construct `Tenant` objects and start them running
497 : for (tenant_shard_id, location_conf) in tenant_configs {
498 : let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
499 :
500 : let mut location_conf = match location_conf {
501 : Ok(l) => l,
502 : Err(e) => {
503 0 : warn!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Marking tenant broken, failed to {e:#}");
504 :
505 : tenants.insert(
506 : tenant_shard_id,
507 : TenantSlot::Attached(Tenant::create_broken_tenant(
508 : conf,
509 : tenant_shard_id,
510 : format!("{}", e),
511 : )),
512 : );
513 : continue;
514 : }
515 : };
516 :
517 : let generation = if let Some(generations) = &tenant_generations {
518 : // We have a generation map: treat it as the authority for whether
519 : // this tenant is really attached.
520 : if let Some(gen) = generations.get(&tenant_shard_id) {
521 : if let LocationMode::Attached(attached) = &location_conf.mode {
522 : if attached.generation > *gen {
523 0 : tracing::error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
524 0 : "Control plane gave decreasing generation ({gen:?}) in re-attach response for tenant that was attached in generation {:?}, demoting to secondary",
525 0 : attached.generation
526 0 : );
527 :
528 : // We cannot safely attach this tenant given a bogus generation number, but let's avoid throwing away
529 : // local disk content: demote to secondary rather than detaching.
530 : tenants.insert(
531 : tenant_shard_id,
532 : TenantSlot::Secondary(SecondaryTenant::new(
533 : tenant_shard_id,
534 : location_conf.shard,
535 : location_conf.tenant_conf.clone(),
536 : &SecondaryLocationConfig { warm: false },
537 : )),
538 : );
539 : }
540 : }
541 : *gen
542 : } else {
543 : match &location_conf.mode {
544 : LocationMode::Secondary(secondary_config) => {
545 : // We do not require the control plane's permission for secondary mode
546 : // tenants, because they do no remote writes and hence require no
547 : // generation number
548 0 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Loaded tenant in secondary mode");
549 : tenants.insert(
550 : tenant_shard_id,
551 : TenantSlot::Secondary(SecondaryTenant::new(
552 : tenant_shard_id,
553 : location_conf.shard,
554 : location_conf.tenant_conf,
555 : secondary_config,
556 : )),
557 : );
558 : }
559 : LocationMode::Attached(_) => {
560 : // TODO: augment re-attach API to enable the control plane to
561 : // instruct us about secondary attachments. That way, instead of throwing
562 : // away local state, we can gracefully fall back to secondary here, if the control
563 : // plane tells us so.
564 : // (https://github.com/neondatabase/neon/issues/5377)
565 0 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Detaching tenant, control plane omitted it in re-attach response");
566 : if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await {
567 0 : error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
568 0 : "Failed to remove detached tenant directory '{tenant_dir_path}': {e:?}",
569 0 : );
570 : }
571 : }
572 : };
573 :
574 : continue;
575 : }
576 : } else {
577 : // Legacy mode: no generation information, any tenant present
578 : // on local disk may activate
579 0 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Starting tenant in legacy mode, no generation",);
580 : Generation::none()
581 : };
582 :
583 : // Presence of a generation number implies attachment: attach the tenant
584 : // if it wasn't already, and apply the generation number.
585 : location_conf.attach_in_generation(generation);
586 : Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
587 :
588 : let shard_identity = location_conf.shard;
589 : match tenant_spawn(
590 : conf,
591 : tenant_shard_id,
592 : &tenant_dir_path,
593 : resources.clone(),
594 : AttachedTenantConf::try_from(location_conf)?,
595 : shard_identity,
596 : Some(init_order.clone()),
597 : &TENANTS,
598 : SpawnMode::Normal,
599 : &ctx,
600 : ) {
601 : Ok(tenant) => {
602 : tenants.insert(tenant_shard_id, TenantSlot::Attached(tenant));
603 : }
604 : Err(e) => {
605 0 : error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to start tenant: {e:#}");
606 : }
607 : }
608 : }
609 :
610 0 : info!("Processed {} local tenants at startup", tenants.len());
611 :
612 : let mut tenants_map = TENANTS.write().unwrap();
613 : assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
614 : METRICS.tenant_slots.set(tenants.len() as u64);
615 : *tenants_map = TenantsMap::Open(tenants);
616 :
617 : Ok(TenantManager {
618 : conf,
619 : tenants: &TENANTS,
620 : resources,
621 : })
622 : }
623 :
624 : /// Wrapper for Tenant::spawn that checks invariants before running, and inserts
625 : /// a broken tenant in the map if Tenant::spawn fails.
626 : #[allow(clippy::too_many_arguments)]
627 0 : pub(crate) fn tenant_spawn(
628 0 : conf: &'static PageServerConf,
629 0 : tenant_shard_id: TenantShardId,
630 0 : tenant_path: &Utf8Path,
631 0 : resources: TenantSharedResources,
632 0 : location_conf: AttachedTenantConf,
633 0 : shard_identity: ShardIdentity,
634 0 : init_order: Option<InitializationOrder>,
635 0 : tenants: &'static std::sync::RwLock<TenantsMap>,
636 0 : mode: SpawnMode,
637 0 : ctx: &RequestContext,
638 0 : ) -> anyhow::Result<Arc<Tenant>> {
639 0 : anyhow::ensure!(
640 0 : tenant_path.is_dir(),
641 0 : "Cannot load tenant from path {tenant_path:?}, it either does not exist or not a directory"
642 : );
643 0 : anyhow::ensure!(
644 0 : !crate::is_temporary(tenant_path),
645 0 : "Cannot load tenant from temporary path {tenant_path:?}"
646 : );
647 : anyhow::ensure!(
648 0 : !tenant_path.is_empty_dir().with_context(|| {
649 0 : format!("Failed to check whether {tenant_path:?} is an empty dir")
650 0 : })?,
651 0 : "Cannot load tenant from empty directory {tenant_path:?}"
652 : );
653 :
654 0 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
655 0 : anyhow::ensure!(
656 0 : !conf.tenant_ignore_mark_file_path(&tenant_shard_id).exists(),
657 0 : "Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
658 : );
659 :
660 0 : let tenant = match Tenant::spawn(
661 0 : conf,
662 0 : tenant_shard_id,
663 0 : resources,
664 0 : location_conf,
665 0 : shard_identity,
666 0 : init_order,
667 0 : tenants,
668 0 : mode,
669 0 : ctx,
670 0 : ) {
671 0 : Ok(tenant) => tenant,
672 0 : Err(e) => {
673 0 : error!("Failed to spawn tenant {tenant_shard_id}, reason: {e:#}");
674 0 : Tenant::create_broken_tenant(conf, tenant_shard_id, format!("{e:#}"))
675 : }
676 : };
677 :
678 0 : Ok(tenant)
679 0 : }
680 :
681 : ///
682 : /// Shut down all tenants. This runs as part of pageserver shutdown.
683 : ///
684 : /// NB: We leave the tenants in the map, so that they remain accessible through
685 : /// the management API until we shut it down. If we removed the shut-down tenants
686 : /// from the tenants map, the management API would return 404 for these tenants,
687 : /// because TenantsMap::get() now returns `None`.
688 : /// That could be easily misinterpreted by control plane, the consumer of the
689 : /// management API. For example, it could attach the tenant on a different pageserver.
690 : /// We would then be in split-brain once this pageserver restarts.
691 0 : #[instrument(skip_all)]
692 : pub(crate) async fn shutdown_all_tenants() {
693 : shutdown_all_tenants0(&TENANTS).await
694 : }
695 :
696 2 : async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
697 2 : let mut join_set = JoinSet::new();
698 :
699 : // Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
700 2 : let (total_in_progress, total_attached) = {
701 2 : let mut m = tenants.write().unwrap();
702 2 : match &mut *m {
703 : TenantsMap::Initializing => {
704 0 : *m = TenantsMap::ShuttingDown(BTreeMap::default());
705 0 : info!("tenants map is empty");
706 0 : return;
707 : }
708 2 : TenantsMap::Open(tenants) => {
709 2 : let mut shutdown_state = BTreeMap::new();
710 2 : let mut total_in_progress = 0;
711 2 : let mut total_attached = 0;
712 :
713 2 : for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
714 2 : match v {
715 0 : TenantSlot::Attached(t) => {
716 0 : shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
717 0 : join_set.spawn(
718 0 : async move {
719 0 : let freeze_and_flush = true;
720 :
721 0 : let res = {
722 0 : let (_guard, shutdown_progress) = completion::channel();
723 0 : t.shutdown(shutdown_progress, freeze_and_flush).await
724 : };
725 :
726 0 : if let Err(other_progress) = res {
727 : // join the another shutdown in progress
728 0 : other_progress.wait().await;
729 0 : }
730 :
731 : // we cannot afford per tenant logging here, because if s3 is degraded, we are
732 : // going to log too many lines
733 0 : debug!("tenant successfully stopped");
734 0 : }
735 0 : .instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())),
736 : );
737 :
738 0 : total_attached += 1;
739 : }
740 0 : TenantSlot::Secondary(state) => {
741 0 : // We don't need to wait for this individually per-tenant: the
742 0 : // downloader task will be waited on eventually, this cancel
743 0 : // is just to encourage it to drop out if it is doing work
744 0 : // for this tenant right now.
745 0 : state.cancel.cancel();
746 0 :
747 0 : shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state));
748 0 : }
749 2 : TenantSlot::InProgress(notify) => {
750 2 : // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
751 2 : // wait for their notifications to fire in this function.
752 2 : join_set.spawn(async move {
753 2 : notify.wait().await;
754 2 : });
755 2 :
756 2 : total_in_progress += 1;
757 2 : }
758 : }
759 : }
760 2 : *m = TenantsMap::ShuttingDown(shutdown_state);
761 2 : (total_in_progress, total_attached)
762 : }
763 : TenantsMap::ShuttingDown(_) => {
764 0 : error!("already shutting down, this function isn't supposed to be called more than once");
765 0 : return;
766 : }
767 : }
768 : };
769 :
770 2 : let started_at = std::time::Instant::now();
771 :
772 2 : info!(
773 2 : "Waiting for {} InProgress tenants and {} Attached tenants to shut down",
774 2 : total_in_progress, total_attached
775 2 : );
776 :
777 2 : let total = join_set.len();
778 2 : let mut panicked = 0;
779 2 : let mut buffering = true;
780 2 : const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
781 2 : let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
782 :
783 6 : while !join_set.is_empty() {
784 4 : tokio::select! {
785 2 : Some(joined) = join_set.join_next() => {
786 : match joined {
787 : Ok(()) => {},
788 : Err(join_error) if join_error.is_cancelled() => {
789 : unreachable!("we are not cancelling any of the tasks");
790 : }
791 : Err(join_error) if join_error.is_panic() => {
792 : // cannot really do anything, as this panic is likely a bug
793 : panicked += 1;
794 : }
795 : Err(join_error) => {
796 0 : warn!("unknown kind of JoinError: {join_error}");
797 : }
798 : }
799 : if !buffering {
800 : // buffer so that every 500ms since the first update (or starting) we'll log
801 : // how far away we are; this is because we will get SIGKILL'd at 10s, and we
802 : // are not able to log *then*.
803 : buffering = true;
804 : buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
805 : }
806 : },
807 : _ = &mut buffered, if buffering => {
808 : buffering = false;
809 2 : info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
810 : }
811 : }
812 : }
813 :
814 2 : if panicked > 0 {
815 0 : warn!(
816 0 : panicked,
817 0 : total, "observed panicks while shutting down tenants"
818 0 : );
819 2 : }
820 :
821 : // caller will log how long we took
822 2 : }
823 :
824 0 : #[derive(Debug, thiserror::Error)]
825 : pub(crate) enum SetNewTenantConfigError {
826 : #[error(transparent)]
827 : GetTenant(#[from] GetTenantError),
828 : #[error(transparent)]
829 : Persist(anyhow::Error),
830 : #[error(transparent)]
831 : Other(anyhow::Error),
832 : }
833 :
834 0 : pub(crate) async fn set_new_tenant_config(
835 0 : conf: &'static PageServerConf,
836 0 : new_tenant_conf: TenantConfOpt,
837 0 : tenant_id: TenantId,
838 0 : ) -> Result<(), SetNewTenantConfigError> {
839 0 : // Legacy API: does not support sharding
840 0 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
841 :
842 0 : info!("configuring tenant {tenant_id}");
843 0 : let tenant = get_tenant(tenant_shard_id, true)?;
844 :
845 0 : if !tenant.tenant_shard_id().shard_count.is_unsharded() {
846 : // Note that we use ShardParameters::default below.
847 0 : return Err(SetNewTenantConfigError::Other(anyhow::anyhow!(
848 0 : "This API may only be used on single-sharded tenants, use the /location_config API for sharded tenants"
849 0 : )));
850 0 : }
851 0 :
852 0 : // This is a legacy API that only operates on attached tenants: the preferred
853 0 : // API to use is the location_config/ endpoint, which lets the caller provide
854 0 : // the full LocationConf.
855 0 : let location_conf = LocationConf::attached_single(
856 0 : new_tenant_conf.clone(),
857 0 : tenant.generation,
858 0 : &ShardParameters::default(),
859 0 : );
860 0 :
861 0 : Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf)
862 0 : .await
863 0 : .map_err(SetNewTenantConfigError::Persist)?;
864 0 : tenant.set_new_tenant_config(new_tenant_conf);
865 0 : Ok(())
866 0 : }
867 :
868 0 : #[derive(thiserror::Error, Debug)]
869 : pub(crate) enum UpsertLocationError {
870 : #[error("Bad config request: {0}")]
871 : BadRequest(anyhow::Error),
872 :
873 : #[error("Cannot change config in this state: {0}")]
874 : Unavailable(#[from] TenantMapError),
875 :
876 : #[error("Tenant is already being modified")]
877 : InProgress,
878 :
879 : #[error("Failed to flush: {0}")]
880 : Flush(anyhow::Error),
881 :
882 : #[error("Internal error: {0}")]
883 : Other(#[from] anyhow::Error),
884 : }
885 :
886 : impl TenantManager {
887 : /// Convenience function so that anyone with a TenantManager can get at the global configuration, without
888 : /// having to pass it around everywhere as a separate object.
889 0 : pub(crate) fn get_conf(&self) -> &'static PageServerConf {
890 0 : self.conf
891 0 : }
892 :
893 : /// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or is not fitting to the query.
894 : /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
895 0 : pub(crate) fn get_attached_tenant_shard(
896 0 : &self,
897 0 : tenant_shard_id: TenantShardId,
898 0 : active_only: bool,
899 0 : ) -> Result<Arc<Tenant>, GetTenantError> {
900 0 : let locked = self.tenants.read().unwrap();
901 :
902 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
903 :
904 0 : match peek_slot {
905 0 : Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
906 : TenantState::Broken {
907 0 : reason,
908 0 : backtrace: _,
909 0 : } if active_only => Err(GetTenantError::Broken(reason)),
910 0 : TenantState::Active => Ok(Arc::clone(tenant)),
911 : _ => {
912 0 : if active_only {
913 0 : Err(GetTenantError::NotActive(tenant_shard_id))
914 : } else {
915 0 : Ok(Arc::clone(tenant))
916 : }
917 : }
918 : },
919 0 : Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
920 : None | Some(TenantSlot::Secondary(_)) => {
921 0 : Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
922 : }
923 : }
924 0 : }
925 :
926 0 : pub(crate) fn get_secondary_tenant_shard(
927 0 : &self,
928 0 : tenant_shard_id: TenantShardId,
929 0 : ) -> Option<Arc<SecondaryTenant>> {
930 0 : let locked = self.tenants.read().unwrap();
931 0 :
932 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
933 0 : .ok()
934 0 : .flatten();
935 :
936 0 : match peek_slot {
937 0 : Some(TenantSlot::Secondary(s)) => Some(s.clone()),
938 0 : _ => None,
939 : }
940 0 : }
941 :
942 : /// Whether the `TenantManager` is responsible for the tenant shard
943 0 : pub(crate) fn manages_tenant_shard(&self, tenant_shard_id: TenantShardId) -> bool {
944 0 : let locked = self.tenants.read().unwrap();
945 0 :
946 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
947 0 : .ok()
948 0 : .flatten();
949 0 :
950 0 : peek_slot.is_some()
951 0 : }
952 :
953 0 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
954 : pub(crate) async fn upsert_location(
955 : &self,
956 : tenant_shard_id: TenantShardId,
957 : new_location_config: LocationConf,
958 : flush: Option<Duration>,
959 : mut spawn_mode: SpawnMode,
960 : ctx: &RequestContext,
961 : ) -> Result<Option<Arc<Tenant>>, UpsertLocationError> {
962 : debug_assert_current_span_has_tenant_id();
963 0 : info!("configuring tenant location to state {new_location_config:?}");
964 :
965 : enum FastPathModified {
966 : Attached(Arc<Tenant>),
967 : Secondary(Arc<SecondaryTenant>),
968 : }
969 :
970 : // Special case fast-path for updates to existing slots: if our upsert is only updating configuration,
971 : // then we do not need to set the slot to InProgress, we can just call into the
972 : // existng tenant.
973 : let fast_path_taken = {
974 : let locked = self.tenants.read().unwrap();
975 : let peek_slot =
976 : tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
977 : match (&new_location_config.mode, peek_slot) {
978 : (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
979 : match attach_conf.generation.cmp(&tenant.generation) {
980 : Ordering::Equal => {
981 : // A transition from Attached to Attached in the same generation, we may
982 : // take our fast path and just provide the updated configuration
983 : // to the tenant.
984 : tenant.set_new_location_config(
985 : AttachedTenantConf::try_from(new_location_config.clone())
986 : .map_err(UpsertLocationError::BadRequest)?,
987 : );
988 :
989 : Some(FastPathModified::Attached(tenant.clone()))
990 : }
991 : Ordering::Less => {
992 : return Err(UpsertLocationError::BadRequest(anyhow::anyhow!(
993 : "Generation {:?} is less than existing {:?}",
994 : attach_conf.generation,
995 : tenant.generation
996 : )));
997 : }
998 : Ordering::Greater => {
999 : // Generation advanced, fall through to general case of replacing `Tenant` object
1000 : None
1001 : }
1002 : }
1003 : }
1004 : (
1005 : LocationMode::Secondary(secondary_conf),
1006 : Some(TenantSlot::Secondary(secondary_tenant)),
1007 : ) => {
1008 : secondary_tenant.set_config(secondary_conf);
1009 : secondary_tenant.set_tenant_conf(&new_location_config.tenant_conf);
1010 : Some(FastPathModified::Secondary(secondary_tenant.clone()))
1011 : }
1012 : _ => {
1013 : // Not an Attached->Attached transition, fall through to general case
1014 : None
1015 : }
1016 : }
1017 : };
1018 :
1019 : // Fast-path continued: having dropped out of the self.tenants lock, do the async
1020 : // phase of writing config and/or waiting for flush, before returning.
1021 : match fast_path_taken {
1022 : Some(FastPathModified::Attached(tenant)) => {
1023 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
1024 : .await?;
1025 :
1026 : // Transition to AttachedStale means we may well hold a valid generation
1027 : // still, and have been requested to go stale as part of a migration. If
1028 : // the caller set `flush`, then flush to remote storage.
1029 : if let LocationMode::Attached(AttachedLocationConfig {
1030 : generation: _,
1031 : attach_mode: AttachmentMode::Stale,
1032 : }) = &new_location_config.mode
1033 : {
1034 : if let Some(flush_timeout) = flush {
1035 : match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
1036 : Ok(Err(e)) => {
1037 : return Err(UpsertLocationError::Flush(e));
1038 : }
1039 : Ok(Ok(_)) => return Ok(Some(tenant)),
1040 : Err(_) => {
1041 0 : tracing::warn!(
1042 0 : timeout_ms = flush_timeout.as_millis(),
1043 0 : "Timed out waiting for flush to remote storage, proceeding anyway."
1044 0 : )
1045 : }
1046 : }
1047 : }
1048 : }
1049 :
1050 : return Ok(Some(tenant));
1051 : }
1052 : Some(FastPathModified::Secondary(_secondary_tenant)) => {
1053 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
1054 : .await?;
1055 :
1056 : return Ok(None);
1057 : }
1058 : None => {
1059 : // Proceed with the general case procedure, where we will shutdown & remove any existing
1060 : // slot contents and replace with a fresh one
1061 : }
1062 : };
1063 :
1064 : // General case for upserts to TenantsMap, excluding the case above: we will substitute an
1065 : // InProgress value to the slot while we make whatever changes are required. The state for
1066 : // the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
1067 : // the state is ill-defined while we're in transition. Transitions are async, but fast: we do
1068 : // not do significant I/O, and shutdowns should be prompt via cancellation tokens.
1069 : let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)
1070 0 : .map_err(|e| match e {
1071 : TenantSlotError::AlreadyExists(_, _) | TenantSlotError::NotFound(_) => {
1072 0 : unreachable!("Called with mode Any")
1073 : }
1074 0 : TenantSlotError::InProgress => UpsertLocationError::InProgress,
1075 0 : TenantSlotError::MapState(s) => UpsertLocationError::Unavailable(s),
1076 0 : })?;
1077 :
1078 : match slot_guard.get_old_value() {
1079 : Some(TenantSlot::Attached(tenant)) => {
1080 : // The case where we keep a Tenant alive was covered above in the special case
1081 : // for Attached->Attached transitions in the same generation. By this point,
1082 : // if we see an attached tenant we know it will be discarded and should be
1083 : // shut down.
1084 : let (_guard, progress) = utils::completion::channel();
1085 :
1086 : match tenant.get_attach_mode() {
1087 : AttachmentMode::Single | AttachmentMode::Multi => {
1088 : // Before we leave our state as the presumed holder of the latest generation,
1089 : // flush any outstanding deletions to reduce the risk of leaking objects.
1090 : self.resources.deletion_queue_client.flush_advisory()
1091 : }
1092 : AttachmentMode::Stale => {
1093 : // If we're stale there's not point trying to flush deletions
1094 : }
1095 : };
1096 :
1097 0 : info!("Shutting down attached tenant");
1098 : match tenant.shutdown(progress, false).await {
1099 : Ok(()) => {}
1100 : Err(barrier) => {
1101 0 : info!("Shutdown already in progress, waiting for it to complete");
1102 : barrier.wait().await;
1103 : }
1104 : }
1105 : slot_guard.drop_old_value().expect("We just shut it down");
1106 :
1107 : // Edge case: if we were called with SpawnMode::Create, but a Tenant already existed, then
1108 : // the caller thinks they're creating but the tenant already existed. We must switch to
1109 : // Normal mode so that when starting this Tenant we properly probe remote storage for timelines,
1110 : // rather than assuming it to be empty.
1111 : spawn_mode = SpawnMode::Normal;
1112 : }
1113 : Some(TenantSlot::Secondary(state)) => {
1114 0 : info!("Shutting down secondary tenant");
1115 : state.shutdown().await;
1116 : }
1117 : Some(TenantSlot::InProgress(_)) => {
1118 : // This should never happen: acquire_slot should error out
1119 : // if the contents of a slot were InProgress.
1120 : return Err(UpsertLocationError::Other(anyhow::anyhow!(
1121 : "Acquired an InProgress slot, this is a bug."
1122 : )));
1123 : }
1124 : None => {
1125 : // Slot was vacant, nothing needs shutting down.
1126 : }
1127 : }
1128 :
1129 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1130 : let timelines_path = self.conf.timelines_path(&tenant_shard_id);
1131 :
1132 : // Directory structure is the same for attached and secondary modes:
1133 : // create it if it doesn't exist. Timeline load/creation expects the
1134 : // timelines/ subdir to already exist.
1135 : //
1136 : // Does not need to be fsync'd because local storage is just a cache.
1137 : tokio::fs::create_dir_all(&timelines_path)
1138 : .await
1139 0 : .with_context(|| format!("Creating {timelines_path}"))?;
1140 :
1141 : // Before activating either secondary or attached mode, persist the
1142 : // configuration, so that on restart we will re-attach (or re-start
1143 : // secondary) on the tenant.
1144 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config).await?;
1145 :
1146 : let new_slot = match &new_location_config.mode {
1147 : LocationMode::Secondary(secondary_config) => {
1148 : let shard_identity = new_location_config.shard;
1149 : TenantSlot::Secondary(SecondaryTenant::new(
1150 : tenant_shard_id,
1151 : shard_identity,
1152 : new_location_config.tenant_conf,
1153 : secondary_config,
1154 : ))
1155 : }
1156 : LocationMode::Attached(_attach_config) => {
1157 : let shard_identity = new_location_config.shard;
1158 :
1159 : // Testing hack: if we are configured with no control plane, then drop the generation
1160 : // from upserts. This enables creating generation-less tenants even though neon_local
1161 : // always uses generations when calling the location conf API.
1162 : let attached_conf = if cfg!(feature = "testing") {
1163 : let mut conf = AttachedTenantConf::try_from(new_location_config)?;
1164 : if self.conf.control_plane_api.is_none() {
1165 : conf.location.generation = Generation::none();
1166 : }
1167 : conf
1168 : } else {
1169 : AttachedTenantConf::try_from(new_location_config)?
1170 : };
1171 :
1172 : let tenant = tenant_spawn(
1173 : self.conf,
1174 : tenant_shard_id,
1175 : &tenant_path,
1176 : self.resources.clone(),
1177 : attached_conf,
1178 : shard_identity,
1179 : None,
1180 : self.tenants,
1181 : spawn_mode,
1182 : ctx,
1183 : )?;
1184 :
1185 : TenantSlot::Attached(tenant)
1186 : }
1187 : };
1188 :
1189 : let attached_tenant = if let TenantSlot::Attached(tenant) = &new_slot {
1190 : Some(tenant.clone())
1191 : } else {
1192 : None
1193 : };
1194 :
1195 : match slot_guard.upsert(new_slot) {
1196 : Err(TenantSlotUpsertError::InternalError(e)) => {
1197 : Err(UpsertLocationError::Other(anyhow::anyhow!(e)))
1198 : }
1199 : Err(TenantSlotUpsertError::MapState(e)) => Err(UpsertLocationError::Unavailable(e)),
1200 : Err(TenantSlotUpsertError::ShuttingDown((new_slot, _completion))) => {
1201 : // If we just called tenant_spawn() on a new tenant, and can't insert it into our map, then
1202 : // we must not leak it: this would violate the invariant that after shutdown_all_tenants, all tenants
1203 : // are shutdown.
1204 : //
1205 : // We must shut it down inline here.
1206 : match new_slot {
1207 : TenantSlot::InProgress(_) => {
1208 : // Unreachable because we never insert an InProgress
1209 : unreachable!()
1210 : }
1211 : TenantSlot::Attached(tenant) => {
1212 : let (_guard, progress) = utils::completion::channel();
1213 0 : info!("Shutting down just-spawned tenant, because tenant manager is shut down");
1214 : match tenant.shutdown(progress, false).await {
1215 : Ok(()) => {
1216 0 : info!("Finished shutting down just-spawned tenant");
1217 : }
1218 : Err(barrier) => {
1219 0 : info!("Shutdown already in progress, waiting for it to complete");
1220 : barrier.wait().await;
1221 : }
1222 : }
1223 : }
1224 : TenantSlot::Secondary(secondary_tenant) => {
1225 : secondary_tenant.shutdown().await;
1226 : }
1227 : }
1228 :
1229 : Err(UpsertLocationError::Unavailable(
1230 : TenantMapError::ShuttingDown,
1231 : ))
1232 : }
1233 : Ok(()) => Ok(attached_tenant),
1234 : }
1235 : }
1236 :
1237 : /// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
1238 : /// LocationConf that was last used to attach it. Optionally, the local file cache may be
1239 : /// dropped before re-attaching.
1240 : ///
1241 : /// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
1242 : /// where an issue is identified that would go away with a restart of the tenant.
1243 : ///
1244 : /// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
1245 : /// to respect the cancellation tokens used in normal shutdown().
1246 0 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
1247 : pub(crate) async fn reset_tenant(
1248 : &self,
1249 : tenant_shard_id: TenantShardId,
1250 : drop_cache: bool,
1251 : ctx: &RequestContext,
1252 : ) -> anyhow::Result<()> {
1253 : let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1254 : let Some(old_slot) = slot_guard.get_old_value() else {
1255 : anyhow::bail!("Tenant not found when trying to reset");
1256 : };
1257 :
1258 : let Some(tenant) = old_slot.get_attached() else {
1259 : slot_guard.revert();
1260 : anyhow::bail!("Tenant is not in attached state");
1261 : };
1262 :
1263 : let (_guard, progress) = utils::completion::channel();
1264 : match tenant.shutdown(progress, false).await {
1265 : Ok(()) => {
1266 : slot_guard.drop_old_value()?;
1267 : }
1268 : Err(_barrier) => {
1269 : slot_guard.revert();
1270 : anyhow::bail!("Cannot reset Tenant, already shutting down");
1271 : }
1272 : }
1273 :
1274 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1275 : let timelines_path = self.conf.timelines_path(&tenant_shard_id);
1276 : let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
1277 :
1278 : if drop_cache {
1279 0 : tracing::info!("Dropping local file cache");
1280 :
1281 : match tokio::fs::read_dir(&timelines_path).await {
1282 : Err(e) => {
1283 0 : tracing::warn!("Failed to list timelines while dropping cache: {}", e);
1284 : }
1285 : Ok(mut entries) => {
1286 : while let Some(entry) = entries.next_entry().await? {
1287 : tokio::fs::remove_dir_all(entry.path()).await?;
1288 : }
1289 : }
1290 : }
1291 : }
1292 :
1293 : let shard_identity = config.shard;
1294 : let tenant = tenant_spawn(
1295 : self.conf,
1296 : tenant_shard_id,
1297 : &tenant_path,
1298 : self.resources.clone(),
1299 : AttachedTenantConf::try_from(config)?,
1300 : shard_identity,
1301 : None,
1302 : self.tenants,
1303 : SpawnMode::Normal,
1304 : ctx,
1305 : )?;
1306 :
1307 : slot_guard.upsert(TenantSlot::Attached(tenant))?;
1308 :
1309 : Ok(())
1310 : }
1311 :
1312 0 : pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec<Arc<Tenant>> {
1313 0 : let locked = self.tenants.read().unwrap();
1314 0 : match &*locked {
1315 0 : TenantsMap::Initializing => Vec::new(),
1316 0 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => map
1317 0 : .values()
1318 0 : .filter_map(|slot| {
1319 0 : slot.get_attached()
1320 0 : .and_then(|t| if t.is_active() { Some(t.clone()) } else { None })
1321 0 : })
1322 0 : .collect(),
1323 : }
1324 0 : }
1325 : // Do some synchronous work for all tenant slots in Secondary state. The provided
1326 : // callback should be small and fast, as it will be called inside the global
1327 : // TenantsMap lock.
1328 0 : pub(crate) fn foreach_secondary_tenants<F>(&self, mut func: F)
1329 0 : where
1330 0 : // TODO: let the callback return a hint to drop out of the loop early
1331 0 : F: FnMut(&TenantShardId, &Arc<SecondaryTenant>),
1332 0 : {
1333 0 : let locked = self.tenants.read().unwrap();
1334 :
1335 0 : let map = match &*locked {
1336 0 : TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
1337 0 : TenantsMap::Open(m) => m,
1338 : };
1339 :
1340 0 : for (tenant_id, slot) in map {
1341 0 : if let TenantSlot::Secondary(state) = slot {
1342 : // Only expose secondary tenants that are not currently shutting down
1343 0 : if !state.cancel.is_cancelled() {
1344 0 : func(tenant_id, state)
1345 0 : }
1346 0 : }
1347 : }
1348 0 : }
1349 :
1350 : /// Total list of all tenant slots: this includes attached, secondary, and InProgress.
1351 0 : pub(crate) fn list(&self) -> Vec<(TenantShardId, TenantSlot)> {
1352 0 : let locked = self.tenants.read().unwrap();
1353 0 : match &*locked {
1354 0 : TenantsMap::Initializing => Vec::new(),
1355 0 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
1356 0 : map.iter().map(|(k, v)| (*k, v.clone())).collect()
1357 : }
1358 : }
1359 0 : }
1360 :
1361 0 : pub(crate) async fn delete_tenant(
1362 0 : &self,
1363 0 : tenant_shard_id: TenantShardId,
1364 0 : activation_timeout: Duration,
1365 0 : ) -> Result<(), DeleteTenantError> {
1366 0 : super::span::debug_assert_current_span_has_tenant_id();
1367 : // We acquire a SlotGuard during this function to protect against concurrent
1368 : // changes while the ::prepare phase of DeleteTenantFlow executes, but then
1369 : // have to return the Tenant to the map while the background deletion runs.
1370 : //
1371 : // TODO: refactor deletion to happen outside the lifetime of a Tenant.
1372 : // Currently, deletion requires a reference to the tenants map in order to
1373 : // keep the Tenant in the map until deletion is complete, and then remove
1374 : // it at the end.
1375 : //
1376 : // See https://github.com/neondatabase/neon/issues/5080
1377 :
1378 0 : let slot_guard =
1379 0 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
1380 :
1381 : // unwrap is safe because we used MustExist mode when acquiring
1382 0 : let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
1383 0 : TenantSlot::Attached(tenant) => tenant.clone(),
1384 : _ => {
1385 : // Express "not attached" as equivalent to "not found"
1386 0 : return Err(DeleteTenantError::NotAttached);
1387 : }
1388 : };
1389 :
1390 0 : match tenant.current_state() {
1391 0 : TenantState::Broken { .. } | TenantState::Stopping { .. } => {
1392 0 : // If a tenant is broken or stopping, DeleteTenantFlow can
1393 0 : // handle it: broken tenants proceed to delete, stopping tenants
1394 0 : // are checked for deletion already in progress.
1395 0 : }
1396 : _ => {
1397 0 : tenant
1398 0 : .wait_to_become_active(activation_timeout)
1399 0 : .await
1400 0 : .map_err(|e| match e {
1401 : GetActiveTenantError::WillNotBecomeActive(_) => {
1402 0 : DeleteTenantError::InvalidState(tenant.current_state())
1403 : }
1404 0 : GetActiveTenantError::Cancelled => DeleteTenantError::Cancelled,
1405 0 : GetActiveTenantError::NotFound(_) => DeleteTenantError::NotAttached,
1406 : GetActiveTenantError::WaitForActiveTimeout {
1407 0 : latest_state: _latest_state,
1408 0 : wait_time: _wait_time,
1409 0 : } => DeleteTenantError::InvalidState(tenant.current_state()),
1410 0 : })?;
1411 : }
1412 : }
1413 :
1414 0 : let result = DeleteTenantFlow::run(
1415 0 : self.conf,
1416 0 : self.resources.remote_storage.clone(),
1417 0 : &TENANTS,
1418 0 : tenant,
1419 0 : )
1420 0 : .await;
1421 :
1422 : // The Tenant goes back into the map in Stopping state, it will eventually be removed by DeleteTenantFLow
1423 0 : slot_guard.revert();
1424 0 : result
1425 0 : }
1426 :
1427 0 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), new_shard_count=%new_shard_count.literal()))]
1428 : pub(crate) async fn shard_split(
1429 : &self,
1430 : tenant_shard_id: TenantShardId,
1431 : new_shard_count: ShardCount,
1432 : ctx: &RequestContext,
1433 : ) -> anyhow::Result<Vec<TenantShardId>> {
1434 : let tenant = get_tenant(tenant_shard_id, true)?;
1435 :
1436 : // Plan: identify what the new child shards will be
1437 : if new_shard_count.count() <= tenant_shard_id.shard_count.count() {
1438 : anyhow::bail!("Requested shard count is not an increase");
1439 : }
1440 : let expansion_factor = new_shard_count.count() / tenant_shard_id.shard_count.count();
1441 : if !expansion_factor.is_power_of_two() {
1442 : anyhow::bail!("Requested split is not a power of two");
1443 : }
1444 :
1445 : let parent_shard_identity = tenant.shard_identity;
1446 : let parent_tenant_conf = tenant.get_tenant_conf();
1447 : let parent_generation = tenant.generation;
1448 :
1449 : let child_shards = tenant_shard_id.split(new_shard_count);
1450 0 : tracing::info!(
1451 0 : "Shard {} splits into: {}",
1452 0 : tenant_shard_id.to_index(),
1453 0 : child_shards
1454 0 : .iter()
1455 0 : .map(|id| format!("{}", id.to_index()))
1456 0 : .join(",")
1457 0 : );
1458 :
1459 : // Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
1460 : if let Err(e) = tenant.split_prepare(&child_shards).await {
1461 : // If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
1462 : // have been left in a partially-shut-down state.
1463 0 : tracing::warn!("Failed to prepare for split: {e}, reloading Tenant before returning");
1464 : self.reset_tenant(tenant_shard_id, false, ctx).await?;
1465 : return Err(e);
1466 : }
1467 :
1468 : self.resources.deletion_queue_client.flush_advisory();
1469 :
1470 : // Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
1471 : drop(tenant);
1472 : let mut parent_slot_guard =
1473 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1474 : let parent = match parent_slot_guard.get_old_value() {
1475 : Some(TenantSlot::Attached(t)) => t,
1476 : Some(TenantSlot::Secondary(_)) => anyhow::bail!("Tenant location in secondary mode"),
1477 : Some(TenantSlot::InProgress(_)) => {
1478 : // tenant_map_acquire_slot never returns InProgress, if a slot was InProgress
1479 : // it would return an error.
1480 : unreachable!()
1481 : }
1482 : None => {
1483 : // We don't actually need the parent shard to still be attached to do our work, but it's
1484 : // a weird enough situation that the caller probably didn't want us to continue working
1485 : // if they had detached the tenant they requested the split on.
1486 : anyhow::bail!("Detached parent shard in the middle of split!")
1487 : }
1488 : };
1489 :
1490 : // Optimization: hardlink layers from the parent into the children, so that they don't have to
1491 : // re-download & duplicate the data referenced in their initial IndexPart
1492 : self.shard_split_hardlink(parent, child_shards.clone())
1493 : .await?;
1494 :
1495 : // Take a snapshot of where the parent's WAL ingest had got to: we will wait for
1496 : // child shards to reach this point.
1497 : let mut target_lsns = HashMap::new();
1498 : for timeline in parent.timelines.lock().unwrap().clone().values() {
1499 : target_lsns.insert(timeline.timeline_id, timeline.get_last_record_lsn());
1500 : }
1501 :
1502 : // TODO: we should have the parent shard stop its WAL ingest here, it's a waste of resources
1503 : // and could slow down the children trying to catch up.
1504 :
1505 : // Phase 3: Spawn the child shards
1506 : for child_shard in &child_shards {
1507 : let mut child_shard_identity = parent_shard_identity;
1508 : child_shard_identity.count = child_shard.shard_count;
1509 : child_shard_identity.number = child_shard.shard_number;
1510 :
1511 : let child_location_conf = LocationConf {
1512 : mode: LocationMode::Attached(AttachedLocationConfig {
1513 : generation: parent_generation,
1514 : attach_mode: AttachmentMode::Single,
1515 : }),
1516 : shard: child_shard_identity,
1517 : tenant_conf: parent_tenant_conf.clone(),
1518 : };
1519 :
1520 : self.upsert_location(
1521 : *child_shard,
1522 : child_location_conf,
1523 : None,
1524 : SpawnMode::Normal,
1525 : ctx,
1526 : )
1527 : .await?;
1528 : }
1529 :
1530 : // Phase 4: wait for child chards WAL ingest to catch up to target LSN
1531 : for child_shard_id in &child_shards {
1532 : let child_shard_id = *child_shard_id;
1533 : let child_shard = {
1534 : let locked = TENANTS.read().unwrap();
1535 : let peek_slot =
1536 : tenant_map_peek_slot(&locked, &child_shard_id, TenantSlotPeekMode::Read)?;
1537 0 : peek_slot.and_then(|s| s.get_attached()).cloned()
1538 : };
1539 : if let Some(t) = child_shard {
1540 : // Wait for the child shard to become active: this should be very quick because it only
1541 : // has to download the index_part that we just uploaded when creating it.
1542 : if let Err(e) = t.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await {
1543 : // This is not fatal: we have durably created the child shard. It just makes the
1544 : // split operation less seamless for clients, as we will may detach the parent
1545 : // shard before the child shards are fully ready to serve requests.
1546 0 : tracing::warn!("Failed to wait for shard {child_shard_id} to activate: {e}");
1547 : continue;
1548 : }
1549 :
1550 : let timelines = t.timelines.lock().unwrap().clone();
1551 : for timeline in timelines.values() {
1552 : let Some(target_lsn) = target_lsns.get(&timeline.timeline_id) else {
1553 : continue;
1554 : };
1555 :
1556 0 : tracing::info!(
1557 0 : "Waiting for child shard {}/{} to reach target lsn {}...",
1558 0 : child_shard_id,
1559 0 : timeline.timeline_id,
1560 0 : target_lsn
1561 0 : );
1562 : if let Err(e) = timeline.wait_lsn(*target_lsn, ctx).await {
1563 : // Failure here might mean shutdown, in any case this part is an optimization
1564 : // and we shouldn't hold up the split operation.
1565 0 : tracing::warn!(
1566 0 : "Failed to wait for timeline {} to reach lsn {target_lsn}: {e}",
1567 0 : timeline.timeline_id
1568 0 : );
1569 : } else {
1570 0 : tracing::info!(
1571 0 : "Child shard {}/{} reached target lsn {}",
1572 0 : child_shard_id,
1573 0 : timeline.timeline_id,
1574 0 : target_lsn
1575 0 : );
1576 : }
1577 : }
1578 : }
1579 : }
1580 :
1581 : // Phase 5: Shut down the parent shard, and erase it from disk
1582 : let (_guard, progress) = completion::channel();
1583 : match parent.shutdown(progress, false).await {
1584 : Ok(()) => {}
1585 : Err(other) => {
1586 : other.wait().await;
1587 : }
1588 : }
1589 : let local_tenant_directory = self.conf.tenant_path(&tenant_shard_id);
1590 : let tmp_path = safe_rename_tenant_dir(&local_tenant_directory)
1591 : .await
1592 0 : .with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))?;
1593 : task_mgr::spawn(
1594 : task_mgr::BACKGROUND_RUNTIME.handle(),
1595 : TaskKind::MgmtRequest,
1596 : None,
1597 : None,
1598 : "tenant_files_delete",
1599 : false,
1600 0 : async move {
1601 0 : fs::remove_dir_all(tmp_path.as_path())
1602 0 : .await
1603 0 : .with_context(|| format!("tenant directory {:?} deletion", tmp_path))
1604 0 : },
1605 : );
1606 :
1607 : parent_slot_guard.drop_old_value()?;
1608 :
1609 : // Phase 6: Release the InProgress on the parent shard
1610 : drop(parent_slot_guard);
1611 :
1612 : Ok(child_shards)
1613 : }
1614 :
1615 : /// Part of [`Self::shard_split`]: hard link parent shard layers into child shards, as an optimization
1616 : /// to avoid the children downloading them again.
1617 : ///
1618 : /// For each resident layer in the parent shard, we will hard link it into all of the child shards.
1619 0 : async fn shard_split_hardlink(
1620 0 : &self,
1621 0 : parent_shard: &Tenant,
1622 0 : child_shards: Vec<TenantShardId>,
1623 0 : ) -> anyhow::Result<()> {
1624 0 : debug_assert_current_span_has_tenant_id();
1625 0 :
1626 0 : let parent_path = self.conf.tenant_path(parent_shard.get_tenant_shard_id());
1627 0 : let (parent_timelines, parent_layers) = {
1628 0 : let mut parent_layers = Vec::new();
1629 0 : let timelines = parent_shard.timelines.lock().unwrap().clone();
1630 0 : let parent_timelines = timelines.keys().cloned().collect::<Vec<_>>();
1631 0 : for timeline in timelines.values() {
1632 0 : let timeline_layers = timeline
1633 0 : .layers
1634 0 : .read()
1635 0 : .await
1636 0 : .resident_layers()
1637 0 : .collect::<Vec<_>>()
1638 0 : .await;
1639 0 : for layer in timeline_layers {
1640 0 : let relative_path = layer
1641 0 : .local_path()
1642 0 : .strip_prefix(&parent_path)
1643 0 : .context("Removing prefix from parent layer path")?;
1644 0 : parent_layers.push(relative_path.to_owned());
1645 : }
1646 : }
1647 : debug_assert!(
1648 0 : !parent_layers.is_empty(),
1649 0 : "shutdown cannot empty the layermap"
1650 : );
1651 0 : (parent_timelines, parent_layers)
1652 0 : };
1653 0 :
1654 0 : let mut child_prefixes = Vec::new();
1655 0 : let mut create_dirs = Vec::new();
1656 :
1657 0 : for child in child_shards {
1658 0 : let child_prefix = self.conf.tenant_path(&child);
1659 0 : create_dirs.push(child_prefix.clone());
1660 0 : create_dirs.extend(
1661 0 : parent_timelines
1662 0 : .iter()
1663 0 : .map(|t| self.conf.timeline_path(&child, t)),
1664 0 : );
1665 0 :
1666 0 : child_prefixes.push(child_prefix);
1667 0 : }
1668 :
1669 : // Since we will do a large number of small filesystem metadata operations, batch them into
1670 : // spawn_blocking calls rather than doing each one as a tokio::fs round-trip.
1671 0 : let jh = tokio::task::spawn_blocking(move || -> anyhow::Result<usize> {
1672 0 : for dir in &create_dirs {
1673 0 : if let Err(e) = std::fs::create_dir_all(dir) {
1674 : // Ignore AlreadyExists errors, drop out on all other errors
1675 0 : match e.kind() {
1676 0 : std::io::ErrorKind::AlreadyExists => {}
1677 : _ => {
1678 0 : return Err(anyhow::anyhow!(e).context(format!("Creating {dir}")));
1679 : }
1680 : }
1681 0 : }
1682 : }
1683 :
1684 0 : for child_prefix in child_prefixes {
1685 0 : for relative_layer in &parent_layers {
1686 0 : let parent_path = parent_path.join(relative_layer);
1687 0 : let child_path = child_prefix.join(relative_layer);
1688 0 : if let Err(e) = std::fs::hard_link(&parent_path, &child_path) {
1689 0 : match e.kind() {
1690 0 : std::io::ErrorKind::AlreadyExists => {}
1691 : std::io::ErrorKind::NotFound => {
1692 0 : tracing::info!(
1693 0 : "Layer {} not found during hard-linking, evicted during split?",
1694 0 : relative_layer
1695 0 : );
1696 : }
1697 : _ => {
1698 0 : return Err(anyhow::anyhow!(e).context(format!(
1699 0 : "Hard linking {relative_layer} into {child_prefix}"
1700 0 : )))
1701 : }
1702 : }
1703 0 : }
1704 : }
1705 : }
1706 :
1707 : // Durability is not required for correctness, but if we crashed during split and
1708 : // then came restarted with empty timeline dirs, it would be very inefficient to
1709 : // re-populate from remote storage.
1710 0 : for dir in create_dirs {
1711 0 : if let Err(e) = crashsafe::fsync(&dir) {
1712 : // Something removed a newly created timeline dir out from underneath us? Extremely
1713 : // unexpected, but not worth panic'ing over as this whole function is just an
1714 : // optimization.
1715 0 : tracing::warn!("Failed to fsync directory {dir}: {e}")
1716 0 : }
1717 : }
1718 :
1719 0 : Ok(parent_layers.len())
1720 0 : });
1721 0 :
1722 0 : match jh.await {
1723 0 : Ok(Ok(layer_count)) => {
1724 0 : tracing::info!(count = layer_count, "Hard linked layers into child shards");
1725 : }
1726 0 : Ok(Err(e)) => {
1727 : // This is an optimization, so we tolerate failure.
1728 0 : tracing::warn!("Error hard-linking layers, proceeding anyway: {e}")
1729 : }
1730 0 : Err(e) => {
1731 0 : // This is something totally unexpected like a panic, so bail out.
1732 0 : anyhow::bail!("Error joining hard linking task: {e}");
1733 : }
1734 : }
1735 :
1736 0 : Ok(())
1737 0 : }
1738 : }
1739 :
1740 0 : #[derive(Debug, thiserror::Error)]
1741 : pub(crate) enum GetTenantError {
1742 : /// NotFound is a TenantId rather than TenantShardId, because this error type is used from
1743 : /// getters that use a TenantId and a ShardSelector, not just getters that target a specific shard.
1744 : #[error("Tenant {0} not found")]
1745 : NotFound(TenantId),
1746 :
1747 : #[error("Tenant {0} is not active")]
1748 : NotActive(TenantShardId),
1749 : /// Broken is logically a subset of NotActive, but a distinct error is useful as
1750 : /// NotActive is usually a retryable state for API purposes, whereas Broken
1751 : /// is a stuck error state
1752 : #[error("Tenant is broken: {0}")]
1753 : Broken(String),
1754 :
1755 : // Initializing or shutting down: cannot authoritatively say whether we have this tenant
1756 : #[error("Tenant map is not available: {0}")]
1757 : MapState(#[from] TenantMapError),
1758 : }
1759 :
1760 : /// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
1761 : /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
1762 : ///
1763 : /// This method is cancel-safe.
1764 0 : pub(crate) fn get_tenant(
1765 0 : tenant_shard_id: TenantShardId,
1766 0 : active_only: bool,
1767 0 : ) -> Result<Arc<Tenant>, GetTenantError> {
1768 0 : let locked = TENANTS.read().unwrap();
1769 :
1770 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
1771 :
1772 0 : match peek_slot {
1773 0 : Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
1774 : TenantState::Broken {
1775 0 : reason,
1776 0 : backtrace: _,
1777 0 : } if active_only => Err(GetTenantError::Broken(reason)),
1778 0 : TenantState::Active => Ok(Arc::clone(tenant)),
1779 : _ => {
1780 0 : if active_only {
1781 0 : Err(GetTenantError::NotActive(tenant_shard_id))
1782 : } else {
1783 0 : Ok(Arc::clone(tenant))
1784 : }
1785 : }
1786 : },
1787 0 : Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
1788 : None | Some(TenantSlot::Secondary(_)) => {
1789 0 : Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
1790 : }
1791 : }
1792 0 : }
1793 :
1794 0 : #[derive(thiserror::Error, Debug)]
1795 : pub(crate) enum GetActiveTenantError {
1796 : /// We may time out either while TenantSlot is InProgress, or while the Tenant
1797 : /// is in a non-Active state
1798 : #[error(
1799 : "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
1800 : )]
1801 : WaitForActiveTimeout {
1802 : latest_state: Option<TenantState>,
1803 : wait_time: Duration,
1804 : },
1805 :
1806 : /// The TenantSlot is absent, or in secondary mode
1807 : #[error(transparent)]
1808 : NotFound(#[from] GetTenantError),
1809 :
1810 : /// Cancellation token fired while we were waiting
1811 : #[error("cancelled")]
1812 : Cancelled,
1813 :
1814 : /// Tenant exists, but is in a state that cannot become active (e.g. Stopping, Broken)
1815 : #[error("will not become active. Current state: {0}")]
1816 : WillNotBecomeActive(TenantState),
1817 : }
1818 :
1819 : /// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`]
1820 : /// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`],
1821 : /// then wait for up to `timeout` (minus however long we waited for the slot).
1822 0 : pub(crate) async fn get_active_tenant_with_timeout(
1823 0 : tenant_id: TenantId,
1824 0 : shard_selector: ShardSelector,
1825 0 : timeout: Duration,
1826 0 : cancel: &CancellationToken,
1827 0 : ) -> Result<Arc<Tenant>, GetActiveTenantError> {
1828 0 : enum WaitFor {
1829 0 : Barrier(utils::completion::Barrier),
1830 0 : Tenant(Arc<Tenant>),
1831 0 : }
1832 0 :
1833 0 : let wait_start = Instant::now();
1834 0 : let deadline = wait_start + timeout;
1835 :
1836 0 : let (wait_for, tenant_shard_id) = {
1837 0 : let locked = TENANTS.read().unwrap();
1838 :
1839 : // Resolve TenantId to TenantShardId
1840 0 : let tenant_shard_id = locked
1841 0 : .resolve_attached_shard(&tenant_id, shard_selector)
1842 0 : .ok_or(GetActiveTenantError::NotFound(GetTenantError::NotFound(
1843 0 : tenant_id,
1844 0 : )))?;
1845 :
1846 0 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
1847 0 : .map_err(GetTenantError::MapState)?;
1848 0 : match peek_slot {
1849 0 : Some(TenantSlot::Attached(tenant)) => {
1850 0 : match tenant.current_state() {
1851 : TenantState::Active => {
1852 : // Fast path: we don't need to do any async waiting.
1853 0 : return Ok(tenant.clone());
1854 : }
1855 : _ => {
1856 0 : tenant.activate_now();
1857 0 : (WaitFor::Tenant(tenant.clone()), tenant_shard_id)
1858 : }
1859 : }
1860 : }
1861 : Some(TenantSlot::Secondary(_)) => {
1862 0 : return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
1863 0 : tenant_shard_id,
1864 0 : )))
1865 : }
1866 0 : Some(TenantSlot::InProgress(barrier)) => {
1867 0 : (WaitFor::Barrier(barrier.clone()), tenant_shard_id)
1868 : }
1869 : None => {
1870 0 : return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
1871 0 : tenant_id,
1872 0 : )))
1873 : }
1874 : }
1875 : };
1876 :
1877 0 : let tenant = match wait_for {
1878 0 : WaitFor::Barrier(barrier) => {
1879 0 : tracing::debug!("Waiting for tenant InProgress state to pass...");
1880 0 : timeout_cancellable(
1881 0 : deadline.duration_since(Instant::now()),
1882 0 : cancel,
1883 0 : barrier.wait(),
1884 0 : )
1885 0 : .await
1886 0 : .map_err(|e| match e {
1887 0 : TimeoutCancellableError::Timeout => GetActiveTenantError::WaitForActiveTimeout {
1888 0 : latest_state: None,
1889 0 : wait_time: wait_start.elapsed(),
1890 0 : },
1891 0 : TimeoutCancellableError::Cancelled => GetActiveTenantError::Cancelled,
1892 0 : })?;
1893 : {
1894 0 : let locked = TENANTS.read().unwrap();
1895 0 : let peek_slot =
1896 0 : tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
1897 0 : .map_err(GetTenantError::MapState)?;
1898 0 : match peek_slot {
1899 0 : Some(TenantSlot::Attached(tenant)) => tenant.clone(),
1900 : _ => {
1901 0 : return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
1902 0 : tenant_shard_id,
1903 0 : )))
1904 : }
1905 : }
1906 : }
1907 : }
1908 0 : WaitFor::Tenant(tenant) => tenant,
1909 : };
1910 :
1911 0 : tracing::debug!("Waiting for tenant to enter active state...");
1912 0 : tenant
1913 0 : .wait_to_become_active(deadline.duration_since(Instant::now()))
1914 0 : .await?;
1915 0 : Ok(tenant)
1916 0 : }
1917 :
1918 0 : #[derive(Debug, thiserror::Error)]
1919 : pub(crate) enum DeleteTimelineError {
1920 : #[error("Tenant {0}")]
1921 : Tenant(#[from] GetTenantError),
1922 :
1923 : #[error("Timeline {0}")]
1924 : Timeline(#[from] crate::tenant::DeleteTimelineError),
1925 : }
1926 :
1927 0 : #[derive(Debug, thiserror::Error)]
1928 : pub(crate) enum TenantStateError {
1929 : #[error("Tenant {0} is stopping")]
1930 : IsStopping(TenantShardId),
1931 : #[error(transparent)]
1932 : SlotError(#[from] TenantSlotError),
1933 : #[error(transparent)]
1934 : SlotUpsertError(#[from] TenantSlotUpsertError),
1935 : #[error(transparent)]
1936 : Other(#[from] anyhow::Error),
1937 : }
1938 :
1939 0 : pub(crate) async fn detach_tenant(
1940 0 : conf: &'static PageServerConf,
1941 0 : tenant_shard_id: TenantShardId,
1942 0 : detach_ignored: bool,
1943 0 : deletion_queue_client: &DeletionQueueClient,
1944 0 : ) -> Result<(), TenantStateError> {
1945 0 : let tmp_path = detach_tenant0(
1946 0 : conf,
1947 0 : &TENANTS,
1948 0 : tenant_shard_id,
1949 0 : detach_ignored,
1950 0 : deletion_queue_client,
1951 0 : )
1952 0 : .await?;
1953 : // Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
1954 : // After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
1955 0 : let task_tenant_id = None;
1956 0 : task_mgr::spawn(
1957 0 : task_mgr::BACKGROUND_RUNTIME.handle(),
1958 0 : TaskKind::MgmtRequest,
1959 0 : task_tenant_id,
1960 0 : None,
1961 0 : "tenant_files_delete",
1962 0 : false,
1963 0 : async move {
1964 0 : fs::remove_dir_all(tmp_path.as_path())
1965 0 : .await
1966 0 : .with_context(|| format!("tenant directory {:?} deletion", tmp_path))
1967 0 : },
1968 0 : );
1969 0 : Ok(())
1970 0 : }
1971 :
1972 0 : async fn detach_tenant0(
1973 0 : conf: &'static PageServerConf,
1974 0 : tenants: &std::sync::RwLock<TenantsMap>,
1975 0 : tenant_shard_id: TenantShardId,
1976 0 : detach_ignored: bool,
1977 0 : deletion_queue_client: &DeletionQueueClient,
1978 0 : ) -> Result<Utf8PathBuf, TenantStateError> {
1979 0 : let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
1980 0 : let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
1981 0 : safe_rename_tenant_dir(&local_tenant_directory)
1982 0 : .await
1983 0 : .with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))
1984 0 : };
1985 :
1986 0 : let removal_result = remove_tenant_from_memory(
1987 0 : tenants,
1988 0 : tenant_shard_id,
1989 0 : tenant_dir_rename_operation(tenant_shard_id),
1990 0 : )
1991 0 : .await;
1992 :
1993 : // Flush pending deletions, so that they have a good chance of passing validation
1994 : // before this tenant is potentially re-attached elsewhere.
1995 0 : deletion_queue_client.flush_advisory();
1996 0 :
1997 0 : // Ignored tenants are not present in memory and will bail the removal from memory operation.
1998 0 : // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
1999 0 : if detach_ignored
2000 : && matches!(
2001 0 : removal_result,
2002 : Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
2003 : )
2004 : {
2005 0 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
2006 0 : if tenant_ignore_mark.exists() {
2007 0 : info!("Detaching an ignored tenant");
2008 0 : let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
2009 0 : .await
2010 0 : .with_context(|| {
2011 0 : format!("Ignored tenant {tenant_shard_id} local directory rename")
2012 0 : })?;
2013 0 : return Ok(tmp_path);
2014 0 : }
2015 0 : }
2016 :
2017 0 : removal_result
2018 0 : }
2019 :
2020 0 : pub(crate) async fn load_tenant(
2021 0 : conf: &'static PageServerConf,
2022 0 : tenant_id: TenantId,
2023 0 : generation: Generation,
2024 0 : broker_client: storage_broker::BrokerClientChannel,
2025 0 : remote_storage: Option<GenericRemoteStorage>,
2026 0 : deletion_queue_client: DeletionQueueClient,
2027 0 : ctx: &RequestContext,
2028 0 : ) -> Result<(), TenantMapInsertError> {
2029 0 : // This is a legacy API (replaced by `/location_conf`). It does not support sharding
2030 0 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
2031 :
2032 0 : let slot_guard =
2033 0 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
2034 0 : let tenant_path = conf.tenant_path(&tenant_shard_id);
2035 0 :
2036 0 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
2037 0 : if tenant_ignore_mark.exists() {
2038 0 : std::fs::remove_file(&tenant_ignore_mark).with_context(|| {
2039 0 : format!(
2040 0 : "Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"
2041 0 : )
2042 0 : })?;
2043 0 : }
2044 :
2045 0 : let resources = TenantSharedResources {
2046 0 : broker_client,
2047 0 : remote_storage,
2048 0 : deletion_queue_client,
2049 0 : };
2050 :
2051 0 : let mut location_conf =
2052 0 : Tenant::load_tenant_config(conf, &tenant_shard_id).map_err(TenantMapInsertError::Other)?;
2053 0 : location_conf.attach_in_generation(generation);
2054 0 :
2055 0 : Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
2056 :
2057 0 : let shard_identity = location_conf.shard;
2058 0 : let new_tenant = tenant_spawn(
2059 0 : conf,
2060 0 : tenant_shard_id,
2061 0 : &tenant_path,
2062 0 : resources,
2063 0 : AttachedTenantConf::try_from(location_conf)?,
2064 0 : shard_identity,
2065 0 : None,
2066 0 : &TENANTS,
2067 0 : SpawnMode::Normal,
2068 0 : ctx,
2069 0 : )
2070 0 : .with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))?;
2071 :
2072 0 : slot_guard.upsert(TenantSlot::Attached(new_tenant))?;
2073 0 : Ok(())
2074 0 : }
2075 :
2076 0 : pub(crate) async fn ignore_tenant(
2077 0 : conf: &'static PageServerConf,
2078 0 : tenant_id: TenantId,
2079 0 : ) -> Result<(), TenantStateError> {
2080 0 : ignore_tenant0(conf, &TENANTS, tenant_id).await
2081 0 : }
2082 :
2083 0 : #[instrument(skip_all, fields(shard_id))]
2084 : async fn ignore_tenant0(
2085 : conf: &'static PageServerConf,
2086 : tenants: &std::sync::RwLock<TenantsMap>,
2087 : tenant_id: TenantId,
2088 : ) -> Result<(), TenantStateError> {
2089 : // This is a legacy API (replaced by `/location_conf`). It does not support sharding
2090 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
2091 : tracing::Span::current().record(
2092 : "shard_id",
2093 : tracing::field::display(tenant_shard_id.shard_slug()),
2094 : );
2095 :
2096 0 : remove_tenant_from_memory(tenants, tenant_shard_id, async {
2097 0 : let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
2098 0 : fs::File::create(&ignore_mark_file)
2099 0 : .await
2100 0 : .context("Failed to create ignore mark file")
2101 0 : .and_then(|_| {
2102 0 : crashsafe::fsync_file_and_parent(&ignore_mark_file)
2103 0 : .context("Failed to fsync ignore mark file")
2104 0 : })
2105 0 : .with_context(|| format!("Failed to crate ignore mark for tenant {tenant_shard_id}"))?;
2106 0 : Ok(())
2107 0 : })
2108 : .await
2109 : }
2110 :
2111 0 : #[derive(Debug, thiserror::Error)]
2112 : pub(crate) enum TenantMapListError {
2113 : #[error("tenant map is still initiailizing")]
2114 : Initializing,
2115 : }
2116 :
2117 : ///
2118 : /// Get list of tenants, for the mgmt API
2119 : ///
2120 0 : pub(crate) async fn list_tenants(
2121 0 : ) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
2122 0 : let tenants = TENANTS.read().unwrap();
2123 0 : let m = match &*tenants {
2124 0 : TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
2125 0 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
2126 0 : };
2127 0 : Ok(m.iter()
2128 0 : .filter_map(|(id, tenant)| match tenant {
2129 0 : TenantSlot::Attached(tenant) => {
2130 0 : Some((*id, tenant.current_state(), tenant.generation()))
2131 : }
2132 0 : TenantSlot::Secondary(_) => None,
2133 0 : TenantSlot::InProgress(_) => None,
2134 0 : })
2135 0 : .collect())
2136 0 : }
2137 :
2138 0 : #[derive(Debug, thiserror::Error)]
2139 : pub(crate) enum TenantMapInsertError {
2140 : #[error(transparent)]
2141 : SlotError(#[from] TenantSlotError),
2142 : #[error(transparent)]
2143 : SlotUpsertError(#[from] TenantSlotUpsertError),
2144 : #[error(transparent)]
2145 : Other(#[from] anyhow::Error),
2146 : }
2147 :
2148 : /// Superset of TenantMapError: issues that can occur when acquiring a slot
2149 : /// for a particular tenant ID.
2150 0 : #[derive(Debug, thiserror::Error)]
2151 : pub(crate) enum TenantSlotError {
2152 : /// When acquiring a slot with the expectation that the tenant already exists.
2153 : #[error("Tenant {0} not found")]
2154 : NotFound(TenantShardId),
2155 :
2156 : /// When acquiring a slot with the expectation that the tenant does not already exist.
2157 : #[error("tenant {0} already exists, state: {1:?}")]
2158 : AlreadyExists(TenantShardId, TenantState),
2159 :
2160 : // Tried to read a slot that is currently being mutated by another administrative
2161 : // operation.
2162 : #[error("tenant has a state change in progress, try again later")]
2163 : InProgress,
2164 :
2165 : #[error(transparent)]
2166 : MapState(#[from] TenantMapError),
2167 : }
2168 :
2169 : /// Superset of TenantMapError: issues that can occur when using a SlotGuard
2170 : /// to insert a new value.
2171 0 : #[derive(thiserror::Error)]
2172 : pub(crate) enum TenantSlotUpsertError {
2173 : /// An error where the slot is in an unexpected state, indicating a code bug
2174 : #[error("Internal error updating Tenant")]
2175 : InternalError(Cow<'static, str>),
2176 :
2177 : #[error(transparent)]
2178 : MapState(TenantMapError),
2179 :
2180 : // If we encounter TenantManager shutdown during upsert, we must carry the Completion
2181 : // from the SlotGuard, so that the caller can hold it while they clean up: otherwise
2182 : // TenantManager shutdown might race ahead before we're done cleaning up any Tenant that
2183 : // was protected by the SlotGuard.
2184 : #[error("Shutting down")]
2185 : ShuttingDown((TenantSlot, utils::completion::Completion)),
2186 : }
2187 :
2188 : impl std::fmt::Debug for TenantSlotUpsertError {
2189 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
2190 0 : match self {
2191 0 : Self::InternalError(reason) => write!(f, "Internal Error {reason}"),
2192 0 : Self::MapState(map_error) => write!(f, "Tenant map state: {map_error:?}"),
2193 0 : Self::ShuttingDown(_completion) => write!(f, "Tenant map shutting down"),
2194 : }
2195 0 : }
2196 : }
2197 :
2198 0 : #[derive(Debug, thiserror::Error)]
2199 : enum TenantSlotDropError {
2200 : /// It is only legal to drop a TenantSlot if its contents are fully shut down
2201 : #[error("Tenant was not shut down")]
2202 : NotShutdown,
2203 : }
2204 :
2205 : /// Errors that can happen any time we are walking the tenant map to try and acquire
2206 : /// the TenantSlot for a particular tenant.
2207 0 : #[derive(Debug, thiserror::Error)]
2208 : pub enum TenantMapError {
2209 : // Tried to read while initializing
2210 : #[error("tenant map is still initializing")]
2211 : StillInitializing,
2212 :
2213 : // Tried to read while shutting down
2214 : #[error("tenant map is shutting down")]
2215 : ShuttingDown,
2216 : }
2217 :
2218 : /// Guards a particular tenant_id's content in the TenantsMap. While this
2219 : /// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
2220 : /// for this tenant, which acts as a marker for any operations targeting
2221 : /// this tenant to retry later, or wait for the InProgress state to end.
2222 : ///
2223 : /// This structure enforces the important invariant that we do not have overlapping
2224 : /// tasks that will try use local storage for a the same tenant ID: we enforce that
2225 : /// the previous contents of a slot have been shut down before the slot can be
2226 : /// left empty or used for something else
2227 : ///
2228 : /// Holders of a SlotGuard should explicitly dispose of it, using either `upsert`
2229 : /// to provide a new value, or `revert` to put the slot back into its initial
2230 : /// state. If the SlotGuard is dropped without calling either of these, then
2231 : /// we will leave the slot empty if our `old_value` is already shut down, else
2232 : /// we will replace the slot with `old_value` (equivalent to doing a revert).
2233 : ///
2234 : /// The `old_value` may be dropped before the SlotGuard is dropped, by calling
2235 : /// `drop_old_value`. It is an error to call this without shutting down
2236 : /// the conents of `old_value`.
2237 : pub struct SlotGuard {
2238 : tenant_shard_id: TenantShardId,
2239 : old_value: Option<TenantSlot>,
2240 : upserted: bool,
2241 :
2242 : /// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will
2243 : /// release any waiters as soon as this SlotGuard is dropped.
2244 : completion: utils::completion::Completion,
2245 : }
2246 :
2247 : impl SlotGuard {
2248 2 : fn new(
2249 2 : tenant_shard_id: TenantShardId,
2250 2 : old_value: Option<TenantSlot>,
2251 2 : completion: utils::completion::Completion,
2252 2 : ) -> Self {
2253 2 : Self {
2254 2 : tenant_shard_id,
2255 2 : old_value,
2256 2 : upserted: false,
2257 2 : completion,
2258 2 : }
2259 2 : }
2260 :
2261 : /// Get any value that was present in the slot before we acquired ownership
2262 : /// of it: in state transitions, this will be the old state.
2263 2 : fn get_old_value(&self) -> &Option<TenantSlot> {
2264 2 : &self.old_value
2265 2 : }
2266 :
2267 : /// Emplace a new value in the slot. This consumes the guard, and after
2268 : /// returning, the slot is no longer protected from concurrent changes.
2269 0 : fn upsert(mut self, new_value: TenantSlot) -> Result<(), TenantSlotUpsertError> {
2270 0 : if !self.old_value_is_shutdown() {
2271 : // This is a bug: callers should never try to drop an old value without
2272 : // shutting it down
2273 0 : return Err(TenantSlotUpsertError::InternalError(
2274 0 : "Old TenantSlot value not shut down".into(),
2275 0 : ));
2276 0 : }
2277 :
2278 0 : let replaced = {
2279 0 : let mut locked = TENANTS.write().unwrap();
2280 0 :
2281 0 : if let TenantSlot::InProgress(_) = new_value {
2282 : // It is never expected to try and upsert InProgress via this path: it should
2283 : // only be written via the tenant_map_acquire_slot path. If we hit this it's a bug.
2284 0 : return Err(TenantSlotUpsertError::InternalError(
2285 0 : "Attempt to upsert an InProgress state".into(),
2286 0 : ));
2287 0 : }
2288 :
2289 0 : let m = match &mut *locked {
2290 : TenantsMap::Initializing => {
2291 0 : return Err(TenantSlotUpsertError::MapState(
2292 0 : TenantMapError::StillInitializing,
2293 0 : ))
2294 : }
2295 : TenantsMap::ShuttingDown(_) => {
2296 0 : return Err(TenantSlotUpsertError::ShuttingDown((
2297 0 : new_value,
2298 0 : self.completion.clone(),
2299 0 : )));
2300 : }
2301 0 : TenantsMap::Open(m) => m,
2302 0 : };
2303 0 :
2304 0 : let replaced = m.insert(self.tenant_shard_id, new_value);
2305 0 : self.upserted = true;
2306 0 :
2307 0 : METRICS.tenant_slots.set(m.len() as u64);
2308 0 :
2309 0 : replaced
2310 : };
2311 :
2312 : // Sanity check: on an upsert we should always be replacing an InProgress marker
2313 0 : match replaced {
2314 : Some(TenantSlot::InProgress(_)) => {
2315 : // Expected case: we find our InProgress in the map: nothing should have
2316 : // replaced it because the code that acquires slots will not grant another
2317 : // one for the same TenantId.
2318 0 : Ok(())
2319 : }
2320 : None => {
2321 0 : METRICS.unexpected_errors.inc();
2322 0 : error!(
2323 0 : tenant_shard_id = %self.tenant_shard_id,
2324 0 : "Missing InProgress marker during tenant upsert, this is a bug."
2325 0 : );
2326 0 : Err(TenantSlotUpsertError::InternalError(
2327 0 : "Missing InProgress marker during tenant upsert".into(),
2328 0 : ))
2329 : }
2330 0 : Some(slot) => {
2331 0 : METRICS.unexpected_errors.inc();
2332 0 : error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot);
2333 0 : Err(TenantSlotUpsertError::InternalError(
2334 0 : "Unexpected contents of TenantSlot".into(),
2335 0 : ))
2336 : }
2337 : }
2338 0 : }
2339 :
2340 : /// Replace the InProgress slot with whatever was in the guard when we started
2341 0 : fn revert(mut self) {
2342 0 : if let Some(value) = self.old_value.take() {
2343 0 : match self.upsert(value) {
2344 0 : Err(TenantSlotUpsertError::InternalError(_)) => {
2345 0 : // We already logged the error, nothing else we can do.
2346 0 : }
2347 : Err(
2348 : TenantSlotUpsertError::MapState(_) | TenantSlotUpsertError::ShuttingDown(_),
2349 0 : ) => {
2350 0 : // If the map is shutting down, we need not replace anything
2351 0 : }
2352 0 : Ok(()) => {}
2353 : }
2354 0 : }
2355 0 : }
2356 :
2357 : /// We may never drop our old value until it is cleanly shut down: otherwise we might leave
2358 : /// rogue background tasks that would write to the local tenant directory that this guard
2359 : /// is responsible for protecting
2360 2 : fn old_value_is_shutdown(&self) -> bool {
2361 2 : match self.old_value.as_ref() {
2362 2 : Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(),
2363 0 : Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(),
2364 : Some(TenantSlot::InProgress(_)) => {
2365 : // A SlotGuard cannot be constructed for a slot that was already InProgress
2366 0 : unreachable!()
2367 : }
2368 0 : None => true,
2369 : }
2370 2 : }
2371 :
2372 : /// The guard holder is done with the old value of the slot: they are obliged to already
2373 : /// shut it down before we reach this point.
2374 2 : fn drop_old_value(&mut self) -> Result<(), TenantSlotDropError> {
2375 2 : if !self.old_value_is_shutdown() {
2376 0 : Err(TenantSlotDropError::NotShutdown)
2377 : } else {
2378 2 : self.old_value.take();
2379 2 : Ok(())
2380 : }
2381 2 : }
2382 : }
2383 :
2384 : impl Drop for SlotGuard {
2385 2 : fn drop(&mut self) {
2386 2 : if self.upserted {
2387 0 : return;
2388 2 : }
2389 2 : // Our old value is already shutdown, or it never existed: it is safe
2390 2 : // for us to fully release the TenantSlot back into an empty state
2391 2 :
2392 2 : let mut locked = TENANTS.write().unwrap();
2393 :
2394 2 : let m = match &mut *locked {
2395 : TenantsMap::Initializing => {
2396 : // There is no map, this should never happen.
2397 2 : return;
2398 : }
2399 : TenantsMap::ShuttingDown(_) => {
2400 : // When we transition to shutdown, InProgress elements are removed
2401 : // from the map, so we do not need to clean up our Inprogress marker.
2402 : // See [`shutdown_all_tenants0`]
2403 0 : return;
2404 : }
2405 0 : TenantsMap::Open(m) => m,
2406 0 : };
2407 0 :
2408 0 : use std::collections::btree_map::Entry;
2409 0 : match m.entry(self.tenant_shard_id) {
2410 0 : Entry::Occupied(mut entry) => {
2411 0 : if !matches!(entry.get(), TenantSlot::InProgress(_)) {
2412 0 : METRICS.unexpected_errors.inc();
2413 0 : error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get());
2414 0 : }
2415 :
2416 0 : if self.old_value_is_shutdown() {
2417 0 : entry.remove();
2418 0 : } else {
2419 0 : entry.insert(self.old_value.take().unwrap());
2420 0 : }
2421 : }
2422 : Entry::Vacant(_) => {
2423 0 : METRICS.unexpected_errors.inc();
2424 0 : error!(
2425 0 : tenant_shard_id = %self.tenant_shard_id,
2426 0 : "Missing InProgress marker during SlotGuard drop, this is a bug."
2427 0 : );
2428 : }
2429 : }
2430 :
2431 0 : METRICS.tenant_slots.set(m.len() as u64);
2432 2 : }
2433 : }
2434 :
2435 : enum TenantSlotPeekMode {
2436 : /// In Read mode, peek will be permitted to see the slots even if the pageserver is shutting down
2437 : Read,
2438 : /// In Write mode, trying to peek at a slot while the pageserver is shutting down is an error
2439 : Write,
2440 : }
2441 :
2442 0 : fn tenant_map_peek_slot<'a>(
2443 0 : tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
2444 0 : tenant_shard_id: &TenantShardId,
2445 0 : mode: TenantSlotPeekMode,
2446 0 : ) -> Result<Option<&'a TenantSlot>, TenantMapError> {
2447 0 : match tenants.deref() {
2448 0 : TenantsMap::Initializing => Err(TenantMapError::StillInitializing),
2449 0 : TenantsMap::ShuttingDown(m) => match mode {
2450 : TenantSlotPeekMode::Read => Ok(Some(
2451 : // When reading in ShuttingDown state, we must translate None results
2452 : // into a ShuttingDown error, because absence of a tenant shard ID in the map
2453 : // isn't a reliable indicator of the tenant being gone: it might have been
2454 : // InProgress when shutdown started, and cleaned up from that state such
2455 : // that it's now no longer in the map. Callers will have to wait until
2456 : // we next start up to get a proper answer. This avoids incorrect 404 API responses.
2457 0 : m.get(tenant_shard_id).ok_or(TenantMapError::ShuttingDown)?,
2458 : )),
2459 0 : TenantSlotPeekMode::Write => Err(TenantMapError::ShuttingDown),
2460 : },
2461 0 : TenantsMap::Open(m) => Ok(m.get(tenant_shard_id)),
2462 : }
2463 0 : }
2464 :
2465 : enum TenantSlotAcquireMode {
2466 : /// Acquire the slot irrespective of current state, or whether it already exists
2467 : Any,
2468 : /// Return an error if trying to acquire a slot and it doesn't already exist
2469 : MustExist,
2470 : /// Return an error if trying to acquire a slot and it already exists
2471 : MustNotExist,
2472 : }
2473 :
2474 0 : fn tenant_map_acquire_slot(
2475 0 : tenant_shard_id: &TenantShardId,
2476 0 : mode: TenantSlotAcquireMode,
2477 0 : ) -> Result<SlotGuard, TenantSlotError> {
2478 0 : tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode)
2479 0 : }
2480 :
2481 2 : fn tenant_map_acquire_slot_impl(
2482 2 : tenant_shard_id: &TenantShardId,
2483 2 : tenants: &std::sync::RwLock<TenantsMap>,
2484 2 : mode: TenantSlotAcquireMode,
2485 2 : ) -> Result<SlotGuard, TenantSlotError> {
2486 2 : use TenantSlotAcquireMode::*;
2487 2 : METRICS.tenant_slot_writes.inc();
2488 2 :
2489 2 : let mut locked = tenants.write().unwrap();
2490 2 : let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug());
2491 2 : let _guard = span.enter();
2492 :
2493 2 : let m = match &mut *locked {
2494 0 : TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
2495 0 : TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
2496 2 : TenantsMap::Open(m) => m,
2497 2 : };
2498 2 :
2499 2 : use std::collections::btree_map::Entry;
2500 2 :
2501 2 : let entry = m.entry(*tenant_shard_id);
2502 2 :
2503 2 : match entry {
2504 0 : Entry::Vacant(v) => match mode {
2505 : MustExist => {
2506 0 : tracing::debug!("Vacant && MustExist: return NotFound");
2507 0 : Err(TenantSlotError::NotFound(*tenant_shard_id))
2508 : }
2509 : _ => {
2510 0 : let (completion, barrier) = utils::completion::channel();
2511 0 : v.insert(TenantSlot::InProgress(barrier));
2512 0 : tracing::debug!("Vacant, inserted InProgress");
2513 0 : Ok(SlotGuard::new(*tenant_shard_id, None, completion))
2514 : }
2515 : },
2516 2 : Entry::Occupied(mut o) => {
2517 2 : // Apply mode-driven checks
2518 2 : match (o.get(), mode) {
2519 : (TenantSlot::InProgress(_), _) => {
2520 0 : tracing::debug!("Occupied, failing for InProgress");
2521 0 : Err(TenantSlotError::InProgress)
2522 : }
2523 0 : (slot, MustNotExist) => match slot {
2524 0 : TenantSlot::Attached(tenant) => {
2525 0 : tracing::debug!("Attached && MustNotExist, return AlreadyExists");
2526 0 : Err(TenantSlotError::AlreadyExists(
2527 0 : *tenant_shard_id,
2528 0 : tenant.current_state(),
2529 0 : ))
2530 : }
2531 : _ => {
2532 : // FIXME: the AlreadyExists error assumes that we have a Tenant
2533 : // to get the state from
2534 0 : tracing::debug!("Occupied & MustNotExist, return AlreadyExists");
2535 0 : Err(TenantSlotError::AlreadyExists(
2536 0 : *tenant_shard_id,
2537 0 : TenantState::Broken {
2538 0 : reason: "Present but not attached".to_string(),
2539 0 : backtrace: "".to_string(),
2540 0 : },
2541 0 : ))
2542 : }
2543 : },
2544 : _ => {
2545 : // Happy case: the slot was not in any state that violated our mode
2546 2 : let (completion, barrier) = utils::completion::channel();
2547 2 : let old_value = o.insert(TenantSlot::InProgress(barrier));
2548 2 : tracing::debug!("Occupied, replaced with InProgress");
2549 2 : Ok(SlotGuard::new(
2550 2 : *tenant_shard_id,
2551 2 : Some(old_value),
2552 2 : completion,
2553 2 : ))
2554 : }
2555 : }
2556 : }
2557 : }
2558 2 : }
2559 :
2560 : /// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
2561 : /// Allows to remove other tenant resources manually, via `tenant_cleanup`.
2562 : /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
2563 : /// operation would be needed to remove it.
2564 2 : async fn remove_tenant_from_memory<V, F>(
2565 2 : tenants: &std::sync::RwLock<TenantsMap>,
2566 2 : tenant_shard_id: TenantShardId,
2567 2 : tenant_cleanup: F,
2568 2 : ) -> Result<V, TenantStateError>
2569 2 : where
2570 2 : F: std::future::Future<Output = anyhow::Result<V>>,
2571 2 : {
2572 2 : let mut slot_guard =
2573 2 : tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
2574 :
2575 : // allow pageserver shutdown to await for our completion
2576 2 : let (_guard, progress) = completion::channel();
2577 :
2578 : // The SlotGuard allows us to manipulate the Tenant object without fear of some
2579 : // concurrent API request doing something else for the same tenant ID.
2580 2 : let attached_tenant = match slot_guard.get_old_value() {
2581 2 : Some(TenantSlot::Attached(tenant)) => {
2582 2 : // whenever we remove a tenant from memory, we don't want to flush and wait for upload
2583 2 : let freeze_and_flush = false;
2584 2 :
2585 2 : // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
2586 2 : // that we can continue safely to cleanup.
2587 2 : match tenant.shutdown(progress, freeze_and_flush).await {
2588 2 : Ok(()) => {}
2589 0 : Err(_other) => {
2590 0 : // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
2591 0 : // wait for it but return an error right away because these are distinct requests.
2592 0 : slot_guard.revert();
2593 0 : return Err(TenantStateError::IsStopping(tenant_shard_id));
2594 : }
2595 : }
2596 2 : Some(tenant)
2597 : }
2598 0 : Some(TenantSlot::Secondary(secondary_state)) => {
2599 0 : tracing::info!("Shutting down in secondary mode");
2600 0 : secondary_state.shutdown().await;
2601 0 : None
2602 : }
2603 : Some(TenantSlot::InProgress(_)) => {
2604 : // Acquiring a slot guarantees its old value was not InProgress
2605 0 : unreachable!();
2606 : }
2607 0 : None => None,
2608 : };
2609 :
2610 2 : match tenant_cleanup
2611 2 : .await
2612 2 : .with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
2613 : {
2614 2 : Ok(hook_value) => {
2615 2 : // Success: drop the old TenantSlot::Attached.
2616 2 : slot_guard
2617 2 : .drop_old_value()
2618 2 : .expect("We just called shutdown");
2619 2 :
2620 2 : Ok(hook_value)
2621 : }
2622 0 : Err(e) => {
2623 : // If we had a Tenant, set it to Broken and put it back in the TenantsMap
2624 0 : if let Some(attached_tenant) = attached_tenant {
2625 0 : attached_tenant.set_broken(e.to_string()).await;
2626 0 : }
2627 : // Leave the broken tenant in the map
2628 0 : slot_guard.revert();
2629 0 :
2630 0 : Err(TenantStateError::Other(e))
2631 : }
2632 : }
2633 2 : }
2634 :
2635 : use {
2636 : crate::repository::GcResult, pageserver_api::models::TimelineGcRequest,
2637 : utils::http::error::ApiError,
2638 : };
2639 :
2640 0 : pub(crate) async fn immediate_gc(
2641 0 : tenant_shard_id: TenantShardId,
2642 0 : timeline_id: TimelineId,
2643 0 : gc_req: TimelineGcRequest,
2644 0 : cancel: CancellationToken,
2645 0 : ctx: &RequestContext,
2646 0 : ) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
2647 0 : let guard = TENANTS.read().unwrap();
2648 :
2649 0 : let tenant = guard
2650 0 : .get(&tenant_shard_id)
2651 0 : .map(Arc::clone)
2652 0 : .with_context(|| format!("tenant {tenant_shard_id}"))
2653 0 : .map_err(|e| ApiError::NotFound(e.into()))?;
2654 :
2655 0 : let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
2656 0 : // Use tenant's pitr setting
2657 0 : let pitr = tenant.get_pitr_interval();
2658 0 :
2659 0 : // Run in task_mgr to avoid race with tenant_detach operation
2660 0 : let ctx = ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
2661 0 : let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
2662 0 : // TODO: spawning is redundant now, need to hold the gate
2663 0 : task_mgr::spawn(
2664 0 : &tokio::runtime::Handle::current(),
2665 0 : TaskKind::GarbageCollector,
2666 0 : Some(tenant_shard_id),
2667 0 : Some(timeline_id),
2668 0 : &format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"),
2669 0 : false,
2670 0 : async move {
2671 0 : fail::fail_point!("immediate_gc_task_pre");
2672 :
2673 : #[allow(unused_mut)]
2674 0 : let mut result = tenant
2675 0 : .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
2676 0 : .instrument(info_span!("manual_gc", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))
2677 0 : .await;
2678 : // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
2679 : // better once the types support it.
2680 :
2681 : #[cfg(feature = "testing")]
2682 : {
2683 0 : if let Ok(result) = result.as_mut() {
2684 : // why not futures unordered? it seems it needs very much the same task structure
2685 : // but would only run on single task.
2686 0 : let mut js = tokio::task::JoinSet::new();
2687 0 : for layer in std::mem::take(&mut result.doomed_layers) {
2688 0 : js.spawn(layer.wait_drop());
2689 0 : }
2690 0 : tracing::info!(total = js.len(), "starting to wait for the gc'd layers to be dropped");
2691 0 : while let Some(res) = js.join_next().await {
2692 0 : res.expect("wait_drop should not panic");
2693 0 : }
2694 0 : }
2695 :
2696 0 : let timeline = tenant.get_timeline(timeline_id, false).ok();
2697 0 : let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref());
2698 :
2699 0 : if let Some(rtc) = rtc {
2700 : // layer drops schedule actions on remote timeline client to actually do the
2701 : // deletions; don't care just exit fast about the shutdown error
2702 0 : drop(rtc.wait_completion().await);
2703 0 : }
2704 : }
2705 :
2706 0 : match task_done.send(result) {
2707 0 : Ok(_) => (),
2708 0 : Err(result) => error!("failed to send gc result: {result:?}"),
2709 : }
2710 0 : Ok(())
2711 0 : }
2712 0 : );
2713 0 :
2714 0 : // drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
2715 0 : drop(guard);
2716 0 :
2717 0 : Ok(wait_task_done)
2718 0 : }
2719 :
2720 : #[cfg(test)]
2721 : mod tests {
2722 : use std::collections::BTreeMap;
2723 : use std::sync::Arc;
2724 : use tracing::Instrument;
2725 :
2726 : use crate::tenant::mgr::TenantSlot;
2727 :
2728 : use super::{super::harness::TenantHarness, TenantsMap};
2729 :
2730 2 : #[tokio::test(start_paused = true)]
2731 2 : async fn shutdown_awaits_in_progress_tenant() {
2732 2 : // Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully
2733 2 : // wait for it to complete before proceeding.
2734 2 :
2735 2 : let h = TenantHarness::create("shutdown_awaits_in_progress_tenant").unwrap();
2736 2 : let (t, _ctx) = h.load().await;
2737 2 :
2738 2 : // harness loads it to active, which is forced and nothing is running on the tenant
2739 2 :
2740 2 : let id = t.tenant_shard_id();
2741 2 :
2742 2 : // tenant harness configures the logging and we cannot escape it
2743 2 : let span = h.span();
2744 2 : let _e = span.enter();
2745 2 :
2746 2 : let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
2747 2 : let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
2748 2 :
2749 2 : // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
2750 2 : // permit it to proceed: that will stick the tenant in InProgress
2751 2 :
2752 2 : let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
2753 2 : let (until_cleanup_started, cleanup_started) = utils::completion::channel();
2754 2 : let mut remove_tenant_from_memory_task = {
2755 2 : let jh = tokio::spawn({
2756 2 : let tenants = tenants.clone();
2757 2 : async move {
2758 2 : let cleanup = async move {
2759 2 : drop(until_cleanup_started);
2760 2 : can_complete_cleanup.wait().await;
2761 2 : anyhow::Ok(())
2762 2 : };
2763 2 : super::remove_tenant_from_memory(&tenants, id, cleanup).await
2764 2 : }
2765 2 : .instrument(h.span())
2766 2 : });
2767 2 :
2768 2 : // now the long cleanup should be in place, with the stopping state
2769 2 : cleanup_started.wait().await;
2770 2 : jh
2771 2 : };
2772 2 :
2773 2 : let mut shutdown_task = {
2774 2 : let (until_shutdown_started, shutdown_started) = utils::completion::channel();
2775 2 :
2776 2 : let shutdown_task = tokio::spawn(async move {
2777 2 : drop(until_shutdown_started);
2778 4 : super::shutdown_all_tenants0(&tenants).await;
2779 2 : });
2780 2 :
2781 2 : shutdown_started.wait().await;
2782 2 : shutdown_task
2783 2 : };
2784 2 :
2785 2 : let long_time = std::time::Duration::from_secs(15);
2786 4 : tokio::select! {
2787 4 : _ = &mut shutdown_task => unreachable!("shutdown should block on remove_tenant_from_memory completing"),
2788 4 : _ = &mut remove_tenant_from_memory_task => unreachable!("remove_tenant_from_memory_task should not complete until explicitly unblocked"),
2789 4 : _ = tokio::time::sleep(long_time) => {},
2790 4 : }
2791 2 :
2792 2 : drop(until_cleanup_completed);
2793 2 :
2794 2 : // Now that we allow it to proceed, shutdown should complete immediately
2795 2 : remove_tenant_from_memory_task.await.unwrap().unwrap();
2796 2 : shutdown_task.await.unwrap();
2797 2 : }
2798 : }
|