Line data Source code
1 : //! This module acts as a switchboard to access different repositories managed by this
2 : //! page server.
3 :
4 : use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
5 : use pageserver_api::key::Key;
6 : use pageserver_api::models::ShardParameters;
7 : use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId};
8 : use rand::{distributions::Alphanumeric, Rng};
9 : use std::borrow::Cow;
10 : use std::cmp::Ordering;
11 : use std::collections::{BTreeMap, HashMap};
12 : use std::ops::Deref;
13 : use std::sync::Arc;
14 : use std::time::{Duration, Instant};
15 : use tokio::fs;
16 : use utils::timeout::{timeout_cancellable, TimeoutCancellableError};
17 :
18 : use anyhow::Context;
19 : use once_cell::sync::Lazy;
20 : use tokio::task::JoinSet;
21 : use tokio_util::sync::CancellationToken;
22 : use tracing::*;
23 :
24 : use remote_storage::GenericRemoteStorage;
25 : use utils::crashsafe;
26 :
27 : use crate::config::PageServerConf;
28 : use crate::context::{DownloadBehavior, RequestContext};
29 : use crate::control_plane_client::{
30 : ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError,
31 : };
32 : use crate::deletion_queue::DeletionQueueClient;
33 : use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
34 : use crate::task_mgr::{self, TaskKind};
35 : use crate::tenant::config::{
36 : AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, SecondaryLocationConfig,
37 : TenantConfOpt,
38 : };
39 : use crate::tenant::delete::DeleteTenantFlow;
40 : use crate::tenant::span::debug_assert_current_span_has_tenant_id;
41 : use crate::tenant::{AttachedTenantConf, SpawnMode, Tenant, TenantState};
42 : use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
43 :
44 : use utils::crashsafe::path_with_suffix_extension;
45 : use utils::fs_ext::PathExt;
46 : use utils::generation::Generation;
47 : use utils::id::{TenantId, TimelineId};
48 :
49 : use super::delete::DeleteTenantError;
50 : use super::secondary::SecondaryTenant;
51 : use super::TenantSharedResources;
52 :
53 : /// For a tenant that appears in TenantsMap, it may either be
54 : /// - `Attached`: has a full Tenant object, is elegible to service
55 : /// reads and ingest WAL.
56 : /// - `Secondary`: is only keeping a local cache warm.
57 : ///
58 : /// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
59 : /// that way we avoid having to carefully switch a tenant's ingestion etc on and off during
60 : /// its lifetime, and we can preserve some important safety invariants like `Tenant` always
61 : /// having a properly acquired generation (Secondary doesn't need a generation)
62 4 : #[derive(Clone)]
63 : pub(crate) enum TenantSlot {
64 : Attached(Arc<Tenant>),
65 : Secondary(Arc<SecondaryTenant>),
66 : /// In this state, other administrative operations acting on the TenantId should
67 : /// block, or return a retry indicator equivalent to HTTP 503.
68 : InProgress(utils::completion::Barrier),
69 : }
70 :
71 : impl std::fmt::Debug for TenantSlot {
72 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 0 : match self {
74 0 : Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()),
75 0 : Self::Secondary(_) => write!(f, "Secondary"),
76 0 : Self::InProgress(_) => write!(f, "InProgress"),
77 : }
78 0 : }
79 : }
80 :
81 : impl TenantSlot {
82 : /// Return the `Tenant` in this slot if attached, else None
83 1236 : fn get_attached(&self) -> Option<&Arc<Tenant>> {
84 1236 : match self {
85 1212 : Self::Attached(t) => Some(t),
86 11 : Self::Secondary(_) => None,
87 13 : Self::InProgress(_) => None,
88 : }
89 1236 : }
90 : }
91 :
92 : /// The tenants known to the pageserver.
93 : /// The enum variants are used to distinguish the different states that the pageserver can be in.
94 : pub(crate) enum TenantsMap {
95 : /// [`init_tenant_mgr`] is not done yet.
96 : Initializing,
97 : /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
98 : /// New tenants can be added using [`tenant_map_acquire_slot`].
99 : Open(BTreeMap<TenantShardId, TenantSlot>),
100 : /// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
101 : /// Existing tenants are still accessible, but no new tenants can be created.
102 : ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
103 : }
104 :
105 : pub(crate) enum TenantsMapRemoveResult {
106 : Occupied(TenantSlot),
107 : Vacant,
108 : InProgress(utils::completion::Barrier),
109 : }
110 :
111 : /// When resolving a TenantId to a shard, we may be looking for the 0th
112 : /// shard, or we might be looking for whichever shard holds a particular page.
113 : pub(crate) enum ShardSelector {
114 : /// Only return the 0th shard, if it is present. If a non-0th shard is present,
115 : /// ignore it.
116 : Zero,
117 : /// Pick the first shard we find for the TenantId
118 : First,
119 : /// Pick the shard that holds this key
120 : Page(Key),
121 : }
122 :
123 : impl TenantsMap {
124 : /// Convenience function for typical usage, where we want to get a `Tenant` object, for
125 : /// working with attached tenants. If the TenantId is in the map but in Secondary state,
126 : /// None is returned.
127 251 : pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc<Tenant>> {
128 251 : match self {
129 0 : TenantsMap::Initializing => None,
130 251 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
131 251 : m.get(tenant_shard_id).and_then(|slot| slot.get_attached())
132 : }
133 : }
134 251 : }
135 :
136 : /// A page service client sends a TenantId, and to look up the correct Tenant we must
137 : /// resolve this to a fully qualified TenantShardId.
138 20696 : fn resolve_attached_shard(
139 20696 : &self,
140 20696 : tenant_id: &TenantId,
141 20696 : selector: ShardSelector,
142 20696 : ) -> Option<TenantShardId> {
143 20696 : let mut want_shard = None;
144 20696 : match self {
145 0 : TenantsMap::Initializing => None,
146 20696 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
147 20696 : for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
148 : // Ignore all slots that don't contain an attached tenant
149 20695 : let tenant = match &slot.1 {
150 20684 : TenantSlot::Attached(t) => t,
151 11 : _ => continue,
152 : };
153 :
154 2834 : match selector {
155 10025 : ShardSelector::First => return Some(*slot.0),
156 2834 : ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
157 2834 : return Some(*slot.0)
158 : }
159 7825 : ShardSelector::Page(key) => {
160 7825 : // First slot we see for this tenant, calculate the expected shard number
161 7825 : // for the key: we will use this for checking if this and subsequent
162 7825 : // slots contain the key, rather than recalculating the hash each time.
163 7825 : if want_shard.is_none() {
164 7825 : want_shard = Some(tenant.shard_identity.get_shard_number(&key));
165 7825 : }
166 :
167 7825 : if Some(tenant.shard_identity.number) == want_shard {
168 7825 : return Some(*slot.0);
169 0 : }
170 : }
171 0 : _ => continue,
172 : }
173 : }
174 :
175 : // Fall through: we didn't find an acceptable shard
176 12 : None
177 : }
178 : }
179 20696 : }
180 :
181 : /// Only for use from DeleteTenantFlow. This method directly removes a TenantSlot from the map.
182 : ///
183 : /// The normal way to remove a tenant is using a SlotGuard, which will gracefully remove the guarded
184 : /// slot if the enclosed tenant is shutdown.
185 71 : pub(crate) fn remove(&mut self, tenant_shard_id: TenantShardId) -> TenantsMapRemoveResult {
186 71 : use std::collections::btree_map::Entry;
187 71 : match self {
188 0 : TenantsMap::Initializing => TenantsMapRemoveResult::Vacant,
189 71 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => match m.entry(tenant_shard_id) {
190 71 : Entry::Occupied(entry) => match entry.get() {
191 1 : TenantSlot::InProgress(barrier) => {
192 1 : TenantsMapRemoveResult::InProgress(barrier.clone())
193 : }
194 70 : _ => TenantsMapRemoveResult::Occupied(entry.remove()),
195 : },
196 0 : Entry::Vacant(_entry) => TenantsMapRemoveResult::Vacant,
197 : },
198 : }
199 71 : }
200 :
201 71 : pub(crate) fn len(&self) -> usize {
202 71 : match self {
203 0 : TenantsMap::Initializing => 0,
204 71 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.len(),
205 : }
206 71 : }
207 : }
208 :
209 : /// This is "safe" in that that it won't leave behind a partially deleted directory
210 : /// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting
211 : /// the contents.
212 : ///
213 : /// This is pageserver-specific, as it relies on future processes after a crash to check
214 : /// for TEMP_FILE_SUFFIX when loading things.
215 12 : async fn safe_remove_tenant_dir_all(path: impl AsRef<Utf8Path>) -> std::io::Result<()> {
216 35 : let tmp_path = safe_rename_tenant_dir(path).await?;
217 12 : fs::remove_dir_all(tmp_path).await
218 12 : }
219 :
220 90 : async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<Utf8PathBuf> {
221 90 : let parent = path
222 90 : .as_ref()
223 90 : .parent()
224 90 : // It is invalid to call this function with a relative path. Tenant directories
225 90 : // should always have a parent.
226 90 : .ok_or(std::io::Error::new(
227 90 : std::io::ErrorKind::InvalidInput,
228 90 : "Path must be absolute",
229 90 : ))?;
230 90 : let rand_suffix = rand::thread_rng()
231 90 : .sample_iter(&Alphanumeric)
232 90 : .take(8)
233 90 : .map(char::from)
234 90 : .collect::<String>()
235 90 : + TEMP_FILE_SUFFIX;
236 90 : let tmp_path = path_with_suffix_extension(&path, &rand_suffix);
237 90 : fs::rename(path.as_ref(), &tmp_path).await?;
238 90 : fs::File::open(parent).await?.sync_all().await?;
239 90 : Ok(tmp_path)
240 90 : }
241 :
242 : static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
243 606 : Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
244 :
245 : /// The TenantManager is responsible for storing and mutating the collection of all tenants
246 : /// that this pageserver process has state for. Every Tenant and SecondaryTenant instance
247 : /// lives inside the TenantManager.
248 : ///
249 : /// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
250 : /// the same tenant twice concurrently, or trying to configure the same tenant into secondary
251 : /// and attached modes concurrently.
252 : pub struct TenantManager {
253 : conf: &'static PageServerConf,
254 : // TODO: currently this is a &'static pointing to TENANTs. When we finish refactoring
255 : // out of that static variable, the TenantManager can own this.
256 : // See https://github.com/neondatabase/neon/issues/5796
257 : tenants: &'static std::sync::RwLock<TenantsMap>,
258 : resources: TenantSharedResources,
259 : }
260 :
261 1 : fn emergency_generations(
262 1 : tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
263 1 : ) -> HashMap<TenantShardId, Generation> {
264 1 : tenant_confs
265 1 : .iter()
266 1 : .filter_map(|(tid, lc)| {
267 1 : let lc = match lc {
268 1 : Ok(lc) => lc,
269 0 : Err(_) => return None,
270 : };
271 1 : let gen = match &lc.mode {
272 1 : LocationMode::Attached(alc) => Some(alc.generation),
273 0 : LocationMode::Secondary(_) => None,
274 : };
275 :
276 1 : gen.map(|g| (*tid, g))
277 1 : })
278 1 : .collect()
279 1 : }
280 :
281 604 : async fn init_load_generations(
282 604 : conf: &'static PageServerConf,
283 604 : tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
284 604 : resources: &TenantSharedResources,
285 604 : cancel: &CancellationToken,
286 604 : ) -> anyhow::Result<Option<HashMap<TenantShardId, Generation>>> {
287 604 : let generations = if conf.control_plane_emergency_mode {
288 1 : error!(
289 1 : "Emergency mode! Tenants will be attached unsafely using their last known generation"
290 1 : );
291 1 : emergency_generations(tenant_confs)
292 603 : } else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
293 602 : info!("Calling control plane API to re-attach tenants");
294 : // If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
295 1826 : match client.re_attach().await {
296 602 : Ok(tenants) => tenants,
297 : Err(RetryForeverError::ShuttingDown) => {
298 0 : anyhow::bail!("Shut down while waiting for control plane re-attach response")
299 : }
300 : }
301 : } else {
302 1 : info!("Control plane API not configured, tenant generations are disabled");
303 1 : return Ok(None);
304 : };
305 :
306 : // The deletion queue needs to know about the startup attachment state to decide which (if any) stored
307 : // deletion list entries may still be valid. We provide that by pushing a recovery operation into
308 : // the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
309 : // are processed, even though we don't block on recovery completing here.
310 : //
311 : // Must only do this if remote storage is enabled, otherwise deletion queue
312 : // is not running and channel push will fail.
313 603 : if resources.remote_storage.is_some() {
314 603 : resources
315 603 : .deletion_queue_client
316 603 : .recover(generations.clone())?;
317 0 : }
318 :
319 603 : Ok(Some(generations))
320 604 : }
321 :
322 : /// Given a directory discovered in the pageserver's tenants/ directory, attempt
323 : /// to load a tenant config from it.
324 : ///
325 : /// If file is missing, return Ok(None)
326 229 : fn load_tenant_config(
327 229 : conf: &'static PageServerConf,
328 229 : dentry: Utf8DirEntry,
329 229 : ) -> anyhow::Result<Option<(TenantShardId, anyhow::Result<LocationConf>)>> {
330 229 : let tenant_dir_path = dentry.path().to_path_buf();
331 229 : if crate::is_temporary(&tenant_dir_path) {
332 0 : info!("Found temporary tenant directory, removing: {tenant_dir_path}");
333 : // No need to use safe_remove_tenant_dir_all because this is already
334 : // a temporary path
335 0 : if let Err(e) = std::fs::remove_dir_all(&tenant_dir_path) {
336 0 : error!(
337 0 : "Failed to remove temporary directory '{}': {:?}",
338 0 : tenant_dir_path, e
339 0 : );
340 0 : }
341 0 : return Ok(None);
342 229 : }
343 :
344 : // This case happens if we crash during attachment before writing a config into the dir
345 229 : let is_empty = tenant_dir_path
346 229 : .is_empty_dir()
347 229 : .with_context(|| format!("Failed to check whether {tenant_dir_path:?} is an empty dir"))?;
348 229 : if is_empty {
349 2 : info!("removing empty tenant directory {tenant_dir_path:?}");
350 2 : if let Err(e) = std::fs::remove_dir(&tenant_dir_path) {
351 0 : error!(
352 0 : "Failed to remove empty tenant directory '{}': {e:#}",
353 0 : tenant_dir_path
354 0 : )
355 2 : }
356 2 : return Ok(None);
357 227 : }
358 227 :
359 227 : let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
360 227 : if tenant_ignore_mark_file.exists() {
361 1 : info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
362 1 : return Ok(None);
363 226 : }
364 :
365 226 : let tenant_shard_id = match tenant_dir_path
366 226 : .file_name()
367 226 : .unwrap_or_default()
368 226 : .parse::<TenantShardId>()
369 : {
370 226 : Ok(id) => id,
371 : Err(_) => {
372 0 : warn!("Invalid tenant path (garbage in our repo directory?): {tenant_dir_path}",);
373 0 : return Ok(None);
374 : }
375 : };
376 :
377 226 : Ok(Some((
378 226 : tenant_shard_id,
379 226 : Tenant::load_tenant_config(conf, &tenant_shard_id),
380 226 : )))
381 229 : }
382 :
383 : /// Initial stage of load: walk the local tenants directory, clean up any temp files,
384 : /// and load configurations for the tenants we found.
385 : ///
386 : /// Do this in parallel, because we expect 10k+ tenants, so serial execution can take
387 : /// seconds even on reasonably fast drives.
388 604 : async fn init_load_tenant_configs(
389 604 : conf: &'static PageServerConf,
390 604 : ) -> anyhow::Result<HashMap<TenantShardId, anyhow::Result<LocationConf>>> {
391 604 : let tenants_dir = conf.tenants_path();
392 :
393 604 : let dentries = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<Utf8DirEntry>> {
394 604 : let dir_entries = tenants_dir
395 604 : .read_dir_utf8()
396 604 : .with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
397 :
398 604 : Ok(dir_entries.collect::<Result<Vec<_>, std::io::Error>>()?)
399 604 : })
400 602 : .await??;
401 :
402 604 : let mut configs = HashMap::new();
403 604 :
404 604 : let mut join_set = JoinSet::new();
405 833 : for dentry in dentries {
406 229 : join_set.spawn_blocking(move || load_tenant_config(conf, dentry));
407 229 : }
408 :
409 833 : while let Some(r) = join_set.join_next().await {
410 229 : if let Some((tenant_id, tenant_config)) = r?? {
411 226 : configs.insert(tenant_id, tenant_config);
412 226 : }
413 : }
414 :
415 604 : Ok(configs)
416 604 : }
417 :
418 : /// Initialize repositories with locally available timelines.
419 : /// Timelines that are only partially available locally (remote storage has more data than this pageserver)
420 : /// are scheduled for download and added to the tenant once download is completed.
421 604 : #[instrument(skip_all)]
422 : pub async fn init_tenant_mgr(
423 : conf: &'static PageServerConf,
424 : resources: TenantSharedResources,
425 : init_order: InitializationOrder,
426 : cancel: CancellationToken,
427 : ) -> anyhow::Result<TenantManager> {
428 : let mut tenants = BTreeMap::new();
429 :
430 : let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
431 :
432 : // Scan local filesystem for attached tenants
433 : let tenant_configs = init_load_tenant_configs(conf).await?;
434 :
435 : // Determine which tenants are to be attached
436 : let tenant_generations =
437 : init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
438 :
439 604 : tracing::info!(
440 604 : "Attaching {} tenants at startup, warming up {} at a time",
441 604 : tenant_configs.len(),
442 604 : conf.concurrent_tenant_warmup.initial_permits()
443 604 : );
444 : TENANT.startup_scheduled.inc_by(tenant_configs.len() as u64);
445 :
446 : // Construct `Tenant` objects and start them running
447 : for (tenant_shard_id, location_conf) in tenant_configs {
448 : let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
449 :
450 : let mut location_conf = match location_conf {
451 : Ok(l) => l,
452 : Err(e) => {
453 0 : warn!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Marking tenant broken, failed to {e:#}");
454 :
455 : tenants.insert(
456 : tenant_shard_id,
457 : TenantSlot::Attached(Tenant::create_broken_tenant(
458 : conf,
459 : tenant_shard_id,
460 : format!("{}", e),
461 : )),
462 : );
463 : continue;
464 : }
465 : };
466 :
467 : let generation = if let Some(generations) = &tenant_generations {
468 : // We have a generation map: treat it as the authority for whether
469 : // this tenant is really attached.
470 : if let Some(gen) = generations.get(&tenant_shard_id) {
471 : if let LocationMode::Attached(attached) = &location_conf.mode {
472 : if attached.generation > *gen {
473 0 : tracing::error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
474 0 : "Control plane gave decreasing generation ({gen:?}) in re-attach response for tenant that was attached in generation {:?}, demoting to secondary",
475 0 : attached.generation
476 0 : );
477 :
478 : // We cannot safely attach this tenant given a bogus generation number, but let's avoid throwing away
479 : // local disk content: demote to secondary rather than detaching.
480 : tenants.insert(
481 : tenant_shard_id,
482 : TenantSlot::Secondary(SecondaryTenant::new(
483 : tenant_shard_id,
484 : location_conf.shard,
485 : location_conf.tenant_conf,
486 : &SecondaryLocationConfig { warm: false },
487 : )),
488 : );
489 : }
490 : }
491 : *gen
492 : } else {
493 : match &location_conf.mode {
494 : LocationMode::Secondary(secondary_config) => {
495 : // We do not require the control plane's permission for secondary mode
496 : // tenants, because they do no remote writes and hence require no
497 : // generation number
498 7 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Loaded tenant in secondary mode");
499 : tenants.insert(
500 : tenant_shard_id,
501 : TenantSlot::Secondary(SecondaryTenant::new(
502 : tenant_shard_id,
503 : location_conf.shard,
504 : location_conf.tenant_conf,
505 : secondary_config,
506 : )),
507 : );
508 : }
509 : LocationMode::Attached(_) => {
510 : // TODO: augment re-attach API to enable the control plane to
511 : // instruct us about secondary attachments. That way, instead of throwing
512 : // away local state, we can gracefully fall back to secondary here, if the control
513 : // plane tells us so.
514 : // (https://github.com/neondatabase/neon/issues/5377)
515 12 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Detaching tenant, control plane omitted it in re-attach response");
516 : if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await {
517 0 : error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
518 0 : "Failed to remove detached tenant directory '{tenant_dir_path}': {e:?}",
519 0 : );
520 : }
521 : }
522 : };
523 :
524 : continue;
525 : }
526 : } else {
527 : // Legacy mode: no generation information, any tenant present
528 : // on local disk may activate
529 0 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Starting tenant in legacy mode, no generation",);
530 : Generation::none()
531 : };
532 :
533 : // Presence of a generation number implies attachment: attach the tenant
534 : // if it wasn't already, and apply the generation number.
535 : location_conf.attach_in_generation(generation);
536 : Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
537 :
538 : let shard_identity = location_conf.shard;
539 : match tenant_spawn(
540 : conf,
541 : tenant_shard_id,
542 : &tenant_dir_path,
543 : resources.clone(),
544 : AttachedTenantConf::try_from(location_conf)?,
545 : shard_identity,
546 : Some(init_order.clone()),
547 : &TENANTS,
548 : SpawnMode::Normal,
549 : &ctx,
550 : ) {
551 : Ok(tenant) => {
552 : tenants.insert(tenant_shard_id, TenantSlot::Attached(tenant));
553 : }
554 : Err(e) => {
555 0 : error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to start tenant: {e:#}");
556 : }
557 : }
558 : }
559 :
560 604 : info!("Processed {} local tenants at startup", tenants.len());
561 :
562 : let mut tenants_map = TENANTS.write().unwrap();
563 : assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
564 : METRICS.tenant_slots.set(tenants.len() as u64);
565 : *tenants_map = TenantsMap::Open(tenants);
566 :
567 : Ok(TenantManager {
568 : conf,
569 : tenants: &TENANTS,
570 : resources,
571 : })
572 : }
573 :
574 : /// Wrapper for Tenant::spawn that checks invariants before running, and inserts
575 : /// a broken tenant in the map if Tenant::spawn fails.
576 : #[allow(clippy::too_many_arguments)]
577 858 : pub(crate) fn tenant_spawn(
578 858 : conf: &'static PageServerConf,
579 858 : tenant_shard_id: TenantShardId,
580 858 : tenant_path: &Utf8Path,
581 858 : resources: TenantSharedResources,
582 858 : location_conf: AttachedTenantConf,
583 858 : shard_identity: ShardIdentity,
584 858 : init_order: Option<InitializationOrder>,
585 858 : tenants: &'static std::sync::RwLock<TenantsMap>,
586 858 : mode: SpawnMode,
587 858 : ctx: &RequestContext,
588 858 : ) -> anyhow::Result<Arc<Tenant>> {
589 858 : anyhow::ensure!(
590 858 : tenant_path.is_dir(),
591 0 : "Cannot load tenant from path {tenant_path:?}, it either does not exist or not a directory"
592 : );
593 : anyhow::ensure!(
594 858 : !crate::is_temporary(tenant_path),
595 0 : "Cannot load tenant from temporary path {tenant_path:?}"
596 : );
597 : anyhow::ensure!(
598 858 : !tenant_path.is_empty_dir().with_context(|| {
599 0 : format!("Failed to check whether {tenant_path:?} is an empty dir")
600 858 : })?,
601 0 : "Cannot load tenant from empty directory {tenant_path:?}"
602 : );
603 :
604 858 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
605 858 : anyhow::ensure!(
606 858 : !conf.tenant_ignore_mark_file_path(&tenant_shard_id).exists(),
607 0 : "Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
608 : );
609 :
610 858 : let tenant = match Tenant::spawn(
611 858 : conf,
612 858 : tenant_shard_id,
613 858 : resources,
614 858 : location_conf,
615 858 : shard_identity,
616 858 : init_order,
617 858 : tenants,
618 858 : mode,
619 858 : ctx,
620 858 : ) {
621 858 : Ok(tenant) => tenant,
622 0 : Err(e) => {
623 0 : error!("Failed to spawn tenant {tenant_shard_id}, reason: {e:#}");
624 0 : Tenant::create_broken_tenant(conf, tenant_shard_id, format!("{e:#}"))
625 : }
626 : };
627 :
628 858 : Ok(tenant)
629 858 : }
630 :
631 : ///
632 : /// Shut down all tenants. This runs as part of pageserver shutdown.
633 : ///
634 : /// NB: We leave the tenants in the map, so that they remain accessible through
635 : /// the management API until we shut it down. If we removed the shut-down tenants
636 : /// from the tenants map, the management API would return 404 for these tenants,
637 : /// because TenantsMap::get() now returns `None`.
638 : /// That could be easily misinterpreted by control plane, the consumer of the
639 : /// management API. For example, it could attach the tenant on a different pageserver.
640 : /// We would then be in split-brain once this pageserver restarts.
641 172 : #[instrument(skip_all)]
642 : pub(crate) async fn shutdown_all_tenants() {
643 : shutdown_all_tenants0(&TENANTS).await
644 : }
645 :
646 174 : async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
647 174 : use utils::completion;
648 174 :
649 174 : let mut join_set = JoinSet::new();
650 :
651 : // Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
652 174 : let (total_in_progress, total_attached) = {
653 174 : let mut m = tenants.write().unwrap();
654 174 : match &mut *m {
655 : TenantsMap::Initializing => {
656 0 : *m = TenantsMap::ShuttingDown(BTreeMap::default());
657 0 : info!("tenants map is empty");
658 0 : return;
659 : }
660 174 : TenantsMap::Open(tenants) => {
661 174 : let mut shutdown_state = BTreeMap::new();
662 174 : let mut total_in_progress = 0;
663 174 : let mut total_attached = 0;
664 :
665 199 : for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
666 199 : match v {
667 183 : TenantSlot::Attached(t) => {
668 183 : shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
669 183 : join_set.spawn(
670 183 : async move {
671 183 : let freeze_and_flush = true;
672 :
673 183 : let res = {
674 183 : let (_guard, shutdown_progress) = completion::channel();
675 405 : t.shutdown(shutdown_progress, freeze_and_flush).await
676 : };
677 :
678 183 : if let Err(other_progress) = res {
679 : // join the another shutdown in progress
680 0 : other_progress.wait().await;
681 183 : }
682 :
683 : // we cannot afford per tenant logging here, because if s3 is degraded, we are
684 : // going to log too many lines
685 183 : debug!("tenant successfully stopped");
686 183 : }
687 183 : .instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())),
688 : );
689 :
690 183 : total_attached += 1;
691 : }
692 7 : TenantSlot::Secondary(state) => {
693 7 : // We don't need to wait for this individually per-tenant: the
694 7 : // downloader task will be waited on eventually, this cancel
695 7 : // is just to encourage it to drop out if it is doing work
696 7 : // for this tenant right now.
697 7 : state.cancel.cancel();
698 7 :
699 7 : shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state));
700 7 : }
701 9 : TenantSlot::InProgress(notify) => {
702 9 : // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
703 9 : // wait for their notifications to fire in this function.
704 9 : join_set.spawn(async move {
705 9 : notify.wait().await;
706 9 : });
707 9 :
708 9 : total_in_progress += 1;
709 9 : }
710 : }
711 : }
712 174 : *m = TenantsMap::ShuttingDown(shutdown_state);
713 174 : (total_in_progress, total_attached)
714 : }
715 : TenantsMap::ShuttingDown(_) => {
716 0 : error!("already shutting down, this function isn't supposed to be called more than once");
717 0 : return;
718 : }
719 : }
720 : };
721 :
722 174 : let started_at = std::time::Instant::now();
723 :
724 174 : info!(
725 174 : "Waiting for {} InProgress tenants and {} Attached tenants to shut down",
726 174 : total_in_progress, total_attached
727 174 : );
728 :
729 174 : let total = join_set.len();
730 174 : let mut panicked = 0;
731 174 : let mut buffering = true;
732 174 : const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
733 174 : let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
734 :
735 373 : while !join_set.is_empty() {
736 199 : tokio::select! {
737 192 : Some(joined) = join_set.join_next() => {
738 : match joined {
739 : Ok(()) => {},
740 : Err(join_error) if join_error.is_cancelled() => {
741 : unreachable!("we are not cancelling any of the tasks");
742 : }
743 : Err(join_error) if join_error.is_panic() => {
744 : // cannot really do anything, as this panic is likely a bug
745 : panicked += 1;
746 : }
747 : Err(join_error) => {
748 0 : warn!("unknown kind of JoinError: {join_error}");
749 : }
750 : }
751 : if !buffering {
752 : // buffer so that every 500ms since the first update (or starting) we'll log
753 : // how far away we are; this is because we will get SIGKILL'd at 10s, and we
754 : // are not able to log *then*.
755 : buffering = true;
756 : buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
757 : }
758 : },
759 : _ = &mut buffered, if buffering => {
760 : buffering = false;
761 7 : info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
762 : }
763 : }
764 : }
765 :
766 174 : if panicked > 0 {
767 0 : warn!(
768 0 : panicked,
769 0 : total, "observed panicks while shutting down tenants"
770 0 : );
771 174 : }
772 :
773 : // caller will log how long we took
774 174 : }
775 :
776 0 : #[derive(Debug, thiserror::Error)]
777 : pub(crate) enum SetNewTenantConfigError {
778 : #[error(transparent)]
779 : GetTenant(#[from] GetTenantError),
780 : #[error(transparent)]
781 : Persist(anyhow::Error),
782 : #[error(transparent)]
783 : Other(anyhow::Error),
784 : }
785 :
786 30 : pub(crate) async fn set_new_tenant_config(
787 30 : conf: &'static PageServerConf,
788 30 : new_tenant_conf: TenantConfOpt,
789 30 : tenant_id: TenantId,
790 30 : ) -> Result<(), SetNewTenantConfigError> {
791 30 : // Legacy API: does not support sharding
792 30 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
793 :
794 30 : info!("configuring tenant {tenant_id}");
795 30 : let tenant = get_tenant(tenant_shard_id, true)?;
796 :
797 30 : if tenant.tenant_shard_id().shard_count > ShardCount(0) {
798 : // Note that we use ShardParameters::default below.
799 0 : return Err(SetNewTenantConfigError::Other(anyhow::anyhow!(
800 0 : "This API may only be used on single-sharded tenants, use the /location_config API for sharded tenants"
801 0 : )));
802 30 : }
803 30 :
804 30 : // This is a legacy API that only operates on attached tenants: the preferred
805 30 : // API to use is the location_config/ endpoint, which lets the caller provide
806 30 : // the full LocationConf.
807 30 : let location_conf = LocationConf::attached_single(
808 30 : new_tenant_conf,
809 30 : tenant.generation,
810 30 : &ShardParameters::default(),
811 30 : );
812 30 :
813 30 : Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf)
814 60 : .await
815 30 : .map_err(SetNewTenantConfigError::Persist)?;
816 30 : tenant.set_new_tenant_config(new_tenant_conf);
817 30 : Ok(())
818 30 : }
819 :
820 18 : #[derive(thiserror::Error, Debug)]
821 : pub(crate) enum UpsertLocationError {
822 : #[error("Bad config request: {0}")]
823 : BadRequest(anyhow::Error),
824 :
825 : #[error("Cannot change config in this state: {0}")]
826 : Unavailable(#[from] TenantMapError),
827 :
828 : #[error("Tenant is already being modified")]
829 : InProgress,
830 :
831 : #[error("Failed to flush: {0}")]
832 : Flush(anyhow::Error),
833 :
834 : #[error("Internal error: {0}")]
835 : Other(#[from] anyhow::Error),
836 : }
837 :
838 : impl TenantManager {
839 : /// Convenience function so that anyone with a TenantManager can get at the global configuration, without
840 : /// having to pass it around everywhere as a separate object.
841 1237 : pub(crate) fn get_conf(&self) -> &'static PageServerConf {
842 1237 : self.conf
843 1237 : }
844 :
845 : /// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or is not fitting to the query.
846 : /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
847 969 : pub(crate) fn get_attached_tenant_shard(
848 969 : &self,
849 969 : tenant_shard_id: TenantShardId,
850 969 : active_only: bool,
851 969 : ) -> Result<Arc<Tenant>, GetTenantError> {
852 969 : let locked = self.tenants.read().unwrap();
853 :
854 969 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
855 :
856 967 : match peek_slot {
857 965 : Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
858 : TenantState::Broken {
859 0 : reason,
860 0 : backtrace: _,
861 0 : } if active_only => Err(GetTenantError::Broken(reason)),
862 865 : TenantState::Active => Ok(Arc::clone(tenant)),
863 : _ => {
864 100 : if active_only {
865 0 : Err(GetTenantError::NotActive(tenant_shard_id))
866 : } else {
867 100 : Ok(Arc::clone(tenant))
868 : }
869 : }
870 : },
871 2 : Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
872 : None | Some(TenantSlot::Secondary(_)) => {
873 2 : Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
874 : }
875 : }
876 969 : }
877 :
878 6 : pub(crate) fn get_secondary_tenant_shard(
879 6 : &self,
880 6 : tenant_shard_id: TenantShardId,
881 6 : ) -> Option<Arc<SecondaryTenant>> {
882 6 : let locked = self.tenants.read().unwrap();
883 6 :
884 6 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
885 6 : .ok()
886 6 : .flatten();
887 :
888 6 : match peek_slot {
889 6 : Some(TenantSlot::Secondary(s)) => Some(s.clone()),
890 0 : _ => None,
891 : }
892 6 : }
893 :
894 : /// Whether the `TenantManager` is responsible for the tenant shard
895 1 : pub(crate) fn manages_tenant_shard(&self, tenant_shard_id: TenantShardId) -> bool {
896 1 : let locked = self.tenants.read().unwrap();
897 1 :
898 1 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
899 1 : .ok()
900 1 : .flatten();
901 1 :
902 1 : peek_slot.is_some()
903 1 : }
904 :
905 774 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
906 : pub(crate) async fn upsert_location(
907 : &self,
908 : tenant_shard_id: TenantShardId,
909 : new_location_config: LocationConf,
910 : flush: Option<Duration>,
911 : mut spawn_mode: SpawnMode,
912 : ctx: &RequestContext,
913 : ) -> Result<Option<Arc<Tenant>>, UpsertLocationError> {
914 : debug_assert_current_span_has_tenant_id();
915 774 : info!("configuring tenant location to state {new_location_config:?}");
916 :
917 : enum FastPathModified {
918 : Attached(Arc<Tenant>),
919 : Secondary(Arc<SecondaryTenant>),
920 : }
921 :
922 : // Special case fast-path for updates to existing slots: if our upsert is only updating configuration,
923 : // then we do not need to set the slot to InProgress, we can just call into the
924 : // existng tenant.
925 : let fast_path_taken = {
926 : let locked = self.tenants.read().unwrap();
927 : let peek_slot =
928 : tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
929 : match (&new_location_config.mode, peek_slot) {
930 : (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
931 : match attach_conf.generation.cmp(&tenant.generation) {
932 : Ordering::Equal => {
933 : // A transition from Attached to Attached in the same generation, we may
934 : // take our fast path and just provide the updated configuration
935 : // to the tenant.
936 : tenant.set_new_location_config(
937 : AttachedTenantConf::try_from(new_location_config.clone())
938 : .map_err(UpsertLocationError::BadRequest)?,
939 : );
940 :
941 : Some(FastPathModified::Attached(tenant.clone()))
942 : }
943 : Ordering::Less => {
944 : return Err(UpsertLocationError::BadRequest(anyhow::anyhow!(
945 : "Generation {:?} is less than existing {:?}",
946 : attach_conf.generation,
947 : tenant.generation
948 : )));
949 : }
950 : Ordering::Greater => {
951 : // Generation advanced, fall through to general case of replacing `Tenant` object
952 : None
953 : }
954 : }
955 : }
956 : (
957 : LocationMode::Secondary(secondary_conf),
958 : Some(TenantSlot::Secondary(secondary_tenant)),
959 : ) => {
960 : secondary_tenant.set_config(secondary_conf);
961 : secondary_tenant.set_tenant_conf(&new_location_config.tenant_conf);
962 : Some(FastPathModified::Secondary(secondary_tenant.clone()))
963 : }
964 : _ => {
965 : // Not an Attached->Attached transition, fall through to general case
966 : None
967 : }
968 : }
969 : };
970 :
971 : // Fast-path continued: having dropped out of the self.tenants lock, do the async
972 : // phase of writing config and/or waiting for flush, before returning.
973 : match fast_path_taken {
974 : Some(FastPathModified::Attached(tenant)) => {
975 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
976 : .await?;
977 :
978 : // Transition to AttachedStale means we may well hold a valid generation
979 : // still, and have been requested to go stale as part of a migration. If
980 : // the caller set `flush`, then flush to remote storage.
981 : if let LocationMode::Attached(AttachedLocationConfig {
982 : generation: _,
983 : attach_mode: AttachmentMode::Stale,
984 : }) = &new_location_config.mode
985 : {
986 : if let Some(flush_timeout) = flush {
987 : match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
988 : Ok(Err(e)) => {
989 : return Err(UpsertLocationError::Flush(e));
990 : }
991 : Ok(Ok(_)) => return Ok(Some(tenant)),
992 : Err(_) => {
993 0 : tracing::warn!(
994 0 : timeout_ms = flush_timeout.as_millis(),
995 0 : "Timed out waiting for flush to remote storage, proceeding anyway."
996 0 : )
997 : }
998 : }
999 : }
1000 : }
1001 :
1002 : return Ok(Some(tenant));
1003 : }
1004 : Some(FastPathModified::Secondary(_secondary_tenant)) => {
1005 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
1006 : .await?;
1007 :
1008 : return Ok(None);
1009 : }
1010 : None => {
1011 : // Proceed with the general case procedure, where we will shutdown & remove any existing
1012 : // slot contents and replace with a fresh one
1013 : }
1014 : };
1015 :
1016 : // General case for upserts to TenantsMap, excluding the case above: we will substitute an
1017 : // InProgress value to the slot while we make whatever changes are required. The state for
1018 : // the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
1019 : // the state is ill-defined while we're in transition. Transitions are async, but fast: we do
1020 : // not do significant I/O, and shutdowns should be prompt via cancellation tokens.
1021 : let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)
1022 18 : .map_err(|e| match e {
1023 : TenantSlotError::AlreadyExists(_, _) | TenantSlotError::NotFound(_) => {
1024 0 : unreachable!("Called with mode Any")
1025 : }
1026 18 : TenantSlotError::InProgress => UpsertLocationError::InProgress,
1027 0 : TenantSlotError::MapState(s) => UpsertLocationError::Unavailable(s),
1028 18 : })?;
1029 :
1030 : match slot_guard.get_old_value() {
1031 : Some(TenantSlot::Attached(tenant)) => {
1032 : // The case where we keep a Tenant alive was covered above in the special case
1033 : // for Attached->Attached transitions in the same generation. By this point,
1034 : // if we see an attached tenant we know it will be discarded and should be
1035 : // shut down.
1036 : let (_guard, progress) = utils::completion::channel();
1037 :
1038 : match tenant.get_attach_mode() {
1039 : AttachmentMode::Single | AttachmentMode::Multi => {
1040 : // Before we leave our state as the presumed holder of the latest generation,
1041 : // flush any outstanding deletions to reduce the risk of leaking objects.
1042 : self.resources.deletion_queue_client.flush_advisory()
1043 : }
1044 : AttachmentMode::Stale => {
1045 : // If we're stale there's not point trying to flush deletions
1046 : }
1047 : };
1048 :
1049 74 : info!("Shutting down attached tenant");
1050 : match tenant.shutdown(progress, false).await {
1051 : Ok(()) => {}
1052 : Err(barrier) => {
1053 0 : info!("Shutdown already in progress, waiting for it to complete");
1054 : barrier.wait().await;
1055 : }
1056 : }
1057 : slot_guard.drop_old_value().expect("We just shut it down");
1058 :
1059 : // Edge case: if we were called with SpawnMode::Create, but a Tenant already existed, then
1060 : // the caller thinks they're creating but the tenant already existed. We must switch to
1061 : // Normal mode so that when starting this Tenant we properly probe remote storage for timelines,
1062 : // rather than assuming it to be empty.
1063 : spawn_mode = SpawnMode::Normal;
1064 : }
1065 : Some(TenantSlot::Secondary(state)) => {
1066 17 : info!("Shutting down secondary tenant");
1067 : state.shutdown().await;
1068 : }
1069 : Some(TenantSlot::InProgress(_)) => {
1070 : // This should never happen: acquire_slot should error out
1071 : // if the contents of a slot were InProgress.
1072 : return Err(UpsertLocationError::Other(anyhow::anyhow!(
1073 : "Acquired an InProgress slot, this is a bug."
1074 : )));
1075 : }
1076 : None => {
1077 : // Slot was vacant, nothing needs shutting down.
1078 : }
1079 : }
1080 :
1081 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1082 : let timelines_path = self.conf.timelines_path(&tenant_shard_id);
1083 :
1084 : // Directory structure is the same for attached and secondary modes:
1085 : // create it if it doesn't exist. Timeline load/creation expects the
1086 : // timelines/ subdir to already exist.
1087 : //
1088 : // Does not need to be fsync'd because local storage is just a cache.
1089 : tokio::fs::create_dir_all(&timelines_path)
1090 : .await
1091 0 : .with_context(|| format!("Creating {timelines_path}"))?;
1092 :
1093 : // Before activating either secondary or attached mode, persist the
1094 : // configuration, so that on restart we will re-attach (or re-start
1095 : // secondary) on the tenant.
1096 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config).await?;
1097 :
1098 : let new_slot = match &new_location_config.mode {
1099 : LocationMode::Secondary(secondary_config) => {
1100 : let shard_identity = new_location_config.shard;
1101 : TenantSlot::Secondary(SecondaryTenant::new(
1102 : tenant_shard_id,
1103 : shard_identity,
1104 : new_location_config.tenant_conf,
1105 : secondary_config,
1106 : ))
1107 : }
1108 : LocationMode::Attached(_attach_config) => {
1109 : let shard_identity = new_location_config.shard;
1110 :
1111 : // Testing hack: if we are configured with no control plane, then drop the generation
1112 : // from upserts. This enables creating generation-less tenants even though neon_local
1113 : // always uses generations when calling the location conf API.
1114 : let attached_conf = if cfg!(feature = "testing") {
1115 : let mut conf = AttachedTenantConf::try_from(new_location_config)?;
1116 : if self.conf.control_plane_api.is_none() {
1117 : conf.location.generation = Generation::none();
1118 : }
1119 : conf
1120 : } else {
1121 : AttachedTenantConf::try_from(new_location_config)?
1122 : };
1123 :
1124 : let tenant = tenant_spawn(
1125 : self.conf,
1126 : tenant_shard_id,
1127 : &tenant_path,
1128 : self.resources.clone(),
1129 : attached_conf,
1130 : shard_identity,
1131 : None,
1132 : self.tenants,
1133 : spawn_mode,
1134 : ctx,
1135 : )?;
1136 :
1137 : TenantSlot::Attached(tenant)
1138 : }
1139 : };
1140 :
1141 : let attached_tenant = if let TenantSlot::Attached(tenant) = &new_slot {
1142 : Some(tenant.clone())
1143 : } else {
1144 : None
1145 : };
1146 :
1147 : match slot_guard.upsert(new_slot) {
1148 : Err(TenantSlotUpsertError::InternalError(e)) => {
1149 : Err(UpsertLocationError::Other(anyhow::anyhow!(e)))
1150 : }
1151 : Err(TenantSlotUpsertError::MapState(e)) => Err(UpsertLocationError::Unavailable(e)),
1152 : Err(TenantSlotUpsertError::ShuttingDown((new_slot, _completion))) => {
1153 : // If we just called tenant_spawn() on a new tenant, and can't insert it into our map, then
1154 : // we must not leak it: this would violate the invariant that after shutdown_all_tenants, all tenants
1155 : // are shutdown.
1156 : //
1157 : // We must shut it down inline here.
1158 : match new_slot {
1159 : TenantSlot::InProgress(_) => {
1160 : // Unreachable because we never insert an InProgress
1161 : unreachable!()
1162 : }
1163 : TenantSlot::Attached(tenant) => {
1164 : let (_guard, progress) = utils::completion::channel();
1165 7 : info!("Shutting down just-spawned tenant, because tenant manager is shut down");
1166 : match tenant.shutdown(progress, false).await {
1167 : Ok(()) => {
1168 7 : info!("Finished shutting down just-spawned tenant");
1169 : }
1170 : Err(barrier) => {
1171 0 : info!("Shutdown already in progress, waiting for it to complete");
1172 : barrier.wait().await;
1173 : }
1174 : }
1175 : }
1176 : TenantSlot::Secondary(secondary_tenant) => {
1177 : secondary_tenant.shutdown().await;
1178 : }
1179 : }
1180 :
1181 : Err(UpsertLocationError::Unavailable(
1182 : TenantMapError::ShuttingDown,
1183 : ))
1184 : }
1185 : Ok(()) => Ok(attached_tenant),
1186 : }
1187 : }
1188 :
1189 : /// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
1190 : /// LocationConf that was last used to attach it. Optionally, the local file cache may be
1191 : /// dropped before re-attaching.
1192 : ///
1193 : /// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
1194 : /// where an issue is identified that would go away with a restart of the tenant.
1195 : ///
1196 : /// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
1197 : /// to respect the cancellation tokens used in normal shutdown().
1198 0 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
1199 : pub(crate) async fn reset_tenant(
1200 : &self,
1201 : tenant_shard_id: TenantShardId,
1202 : drop_cache: bool,
1203 : ctx: RequestContext,
1204 : ) -> anyhow::Result<()> {
1205 : let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1206 : let Some(old_slot) = slot_guard.get_old_value() else {
1207 : anyhow::bail!("Tenant not found when trying to reset");
1208 : };
1209 :
1210 : let Some(tenant) = old_slot.get_attached() else {
1211 : slot_guard.revert();
1212 : anyhow::bail!("Tenant is not in attached state");
1213 : };
1214 :
1215 : let (_guard, progress) = utils::completion::channel();
1216 : match tenant.shutdown(progress, false).await {
1217 : Ok(()) => {
1218 : slot_guard.drop_old_value()?;
1219 : }
1220 : Err(_barrier) => {
1221 : slot_guard.revert();
1222 : anyhow::bail!("Cannot reset Tenant, already shutting down");
1223 : }
1224 : }
1225 :
1226 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1227 : let timelines_path = self.conf.timelines_path(&tenant_shard_id);
1228 : let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
1229 :
1230 : if drop_cache {
1231 1 : tracing::info!("Dropping local file cache");
1232 :
1233 : match tokio::fs::read_dir(&timelines_path).await {
1234 : Err(e) => {
1235 0 : tracing::warn!("Failed to list timelines while dropping cache: {}", e);
1236 : }
1237 : Ok(mut entries) => {
1238 : while let Some(entry) = entries.next_entry().await? {
1239 : tokio::fs::remove_dir_all(entry.path()).await?;
1240 : }
1241 : }
1242 : }
1243 : }
1244 :
1245 : let shard_identity = config.shard;
1246 : let tenant = tenant_spawn(
1247 : self.conf,
1248 : tenant_shard_id,
1249 : &tenant_path,
1250 : self.resources.clone(),
1251 : AttachedTenantConf::try_from(config)?,
1252 : shard_identity,
1253 : None,
1254 : self.tenants,
1255 : SpawnMode::Normal,
1256 : &ctx,
1257 : )?;
1258 :
1259 : slot_guard.upsert(TenantSlot::Attached(tenant))?;
1260 :
1261 : Ok(())
1262 : }
1263 :
1264 1180 : pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec<Arc<Tenant>> {
1265 1180 : let locked = self.tenants.read().unwrap();
1266 1180 : match &*locked {
1267 0 : TenantsMap::Initializing => Vec::new(),
1268 1180 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => map
1269 1180 : .values()
1270 1180 : .filter_map(|slot| {
1271 984 : slot.get_attached()
1272 984 : .and_then(|t| if t.is_active() { Some(t.clone()) } else { None })
1273 1180 : })
1274 1180 : .collect(),
1275 : }
1276 1180 : }
1277 : // Do some synchronous work for all tenant slots in Secondary state. The provided
1278 : // callback should be small and fast, as it will be called inside the global
1279 : // TenantsMap lock.
1280 1196 : pub(crate) fn foreach_secondary_tenants<F>(&self, mut func: F)
1281 1196 : where
1282 1196 : // TODO: let the callback return a hint to drop out of the loop early
1283 1196 : F: FnMut(&TenantShardId, &Arc<SecondaryTenant>),
1284 1196 : {
1285 1196 : let locked = self.tenants.read().unwrap();
1286 :
1287 1196 : let map = match &*locked {
1288 0 : TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
1289 1196 : TenantsMap::Open(m) => m,
1290 : };
1291 :
1292 2218 : for (tenant_id, slot) in map {
1293 1022 : if let TenantSlot::Secondary(state) = slot {
1294 : // Only expose secondary tenants that are not currently shutting down
1295 13 : if !state.cancel.is_cancelled() {
1296 13 : func(tenant_id, state)
1297 0 : }
1298 1009 : }
1299 : }
1300 1196 : }
1301 :
1302 : /// Total list of all tenant slots: this includes attached, secondary, and InProgress.
1303 5 : pub(crate) fn list(&self) -> Vec<(TenantShardId, TenantSlot)> {
1304 5 : let locked = self.tenants.read().unwrap();
1305 5 : match &*locked {
1306 0 : TenantsMap::Initializing => Vec::new(),
1307 5 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
1308 5 : map.iter().map(|(k, v)| (*k, v.clone())).collect()
1309 : }
1310 : }
1311 5 : }
1312 :
1313 120 : pub(crate) async fn delete_tenant(
1314 120 : &self,
1315 120 : tenant_shard_id: TenantShardId,
1316 120 : activation_timeout: Duration,
1317 120 : ) -> Result<(), DeleteTenantError> {
1318 120 : super::span::debug_assert_current_span_has_tenant_id();
1319 : // We acquire a SlotGuard during this function to protect against concurrent
1320 : // changes while the ::prepare phase of DeleteTenantFlow executes, but then
1321 : // have to return the Tenant to the map while the background deletion runs.
1322 : //
1323 : // TODO: refactor deletion to happen outside the lifetime of a Tenant.
1324 : // Currently, deletion requires a reference to the tenants map in order to
1325 : // keep the Tenant in the map until deletion is complete, and then remove
1326 : // it at the end.
1327 : //
1328 : // See https://github.com/neondatabase/neon/issues/5080
1329 :
1330 104 : let slot_guard =
1331 120 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
1332 :
1333 : // unwrap is safe because we used MustExist mode when acquiring
1334 104 : let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
1335 104 : TenantSlot::Attached(tenant) => tenant.clone(),
1336 : _ => {
1337 : // Express "not attached" as equivalent to "not found"
1338 0 : return Err(DeleteTenantError::NotAttached);
1339 : }
1340 : };
1341 :
1342 104 : match tenant.current_state() {
1343 26 : TenantState::Broken { .. } | TenantState::Stopping { .. } => {
1344 26 : // If a tenant is broken or stopping, DeleteTenantFlow can
1345 26 : // handle it: broken tenants proceed to delete, stopping tenants
1346 26 : // are checked for deletion already in progress.
1347 26 : }
1348 : _ => {
1349 78 : tenant
1350 78 : .wait_to_become_active(activation_timeout)
1351 2 : .await
1352 78 : .map_err(|e| match e {
1353 : GetActiveTenantError::WillNotBecomeActive(_) => {
1354 0 : DeleteTenantError::InvalidState(tenant.current_state())
1355 : }
1356 0 : GetActiveTenantError::Cancelled => DeleteTenantError::Cancelled,
1357 0 : GetActiveTenantError::NotFound(_) => DeleteTenantError::NotAttached,
1358 : GetActiveTenantError::WaitForActiveTimeout {
1359 0 : latest_state: _latest_state,
1360 0 : wait_time: _wait_time,
1361 0 : } => DeleteTenantError::InvalidState(tenant.current_state()),
1362 78 : })?;
1363 : }
1364 : }
1365 :
1366 104 : let result = DeleteTenantFlow::run(
1367 104 : self.conf,
1368 104 : self.resources.remote_storage.clone(),
1369 104 : &TENANTS,
1370 104 : tenant,
1371 104 : )
1372 528 : .await;
1373 :
1374 : // The Tenant goes back into the map in Stopping state, it will eventually be removed by DeleteTenantFLow
1375 104 : slot_guard.revert();
1376 104 : result
1377 120 : }
1378 : }
1379 :
1380 12 : #[derive(Debug, thiserror::Error)]
1381 : pub(crate) enum GetTenantError {
1382 : /// NotFound is a TenantId rather than TenantShardId, because this error type is used from
1383 : /// getters that use a TenantId and a ShardSelector, not just getters that target a specific shard.
1384 : #[error("Tenant {0} not found")]
1385 : NotFound(TenantId),
1386 :
1387 : #[error("Tenant {0} is not active")]
1388 : NotActive(TenantShardId),
1389 : /// Broken is logically a subset of NotActive, but a distinct error is useful as
1390 : /// NotActive is usually a retryable state for API purposes, whereas Broken
1391 : /// is a stuck error state
1392 : #[error("Tenant is broken: {0}")]
1393 : Broken(String),
1394 :
1395 : // Initializing or shutting down: cannot authoritatively say whether we have this tenant
1396 : #[error("Tenant map is not available: {0}")]
1397 : MapState(#[from] TenantMapError),
1398 : }
1399 :
1400 : /// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
1401 : /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
1402 : ///
1403 : /// This method is cancel-safe.
1404 6066 : pub(crate) fn get_tenant(
1405 6066 : tenant_shard_id: TenantShardId,
1406 6066 : active_only: bool,
1407 6066 : ) -> Result<Arc<Tenant>, GetTenantError> {
1408 6066 : let locked = TENANTS.read().unwrap();
1409 :
1410 6066 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
1411 :
1412 5999 : match peek_slot {
1413 5999 : Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
1414 : TenantState::Broken {
1415 1 : reason,
1416 1 : backtrace: _,
1417 1 : } if active_only => Err(GetTenantError::Broken(reason)),
1418 5637 : TenantState::Active => Ok(Arc::clone(tenant)),
1419 : _ => {
1420 361 : if active_only {
1421 5 : Err(GetTenantError::NotActive(tenant_shard_id))
1422 : } else {
1423 356 : Ok(Arc::clone(tenant))
1424 : }
1425 : }
1426 : },
1427 0 : Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
1428 : None | Some(TenantSlot::Secondary(_)) => {
1429 67 : Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
1430 : }
1431 : }
1432 6066 : }
1433 :
1434 13 : #[derive(thiserror::Error, Debug)]
1435 : pub(crate) enum GetActiveTenantError {
1436 : /// We may time out either while TenantSlot is InProgress, or while the Tenant
1437 : /// is in a non-Active state
1438 : #[error(
1439 : "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
1440 : )]
1441 : WaitForActiveTimeout {
1442 : latest_state: Option<TenantState>,
1443 : wait_time: Duration,
1444 : },
1445 :
1446 : /// The TenantSlot is absent, or in secondary mode
1447 : #[error(transparent)]
1448 : NotFound(#[from] GetTenantError),
1449 :
1450 : /// Cancellation token fired while we were waiting
1451 : #[error("cancelled")]
1452 : Cancelled,
1453 :
1454 : /// Tenant exists, but is in a state that cannot become active (e.g. Stopping, Broken)
1455 : #[error("will not become active. Current state: {0}")]
1456 : WillNotBecomeActive(TenantState),
1457 : }
1458 :
1459 : /// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`]
1460 : /// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`],
1461 : /// then wait for up to `timeout` (minus however long we waited for the slot).
1462 20696 : pub(crate) async fn get_active_tenant_with_timeout(
1463 20696 : tenant_id: TenantId,
1464 20696 : shard_selector: ShardSelector,
1465 20696 : timeout: Duration,
1466 20696 : cancel: &CancellationToken,
1467 20696 : ) -> Result<Arc<Tenant>, GetActiveTenantError> {
1468 20696 : enum WaitFor {
1469 20696 : Barrier(utils::completion::Barrier),
1470 20696 : Tenant(Arc<Tenant>),
1471 20696 : }
1472 20696 :
1473 20696 : let wait_start = Instant::now();
1474 20696 : let deadline = wait_start + timeout;
1475 :
1476 133 : let (wait_for, tenant_shard_id) = {
1477 20696 : let locked = TENANTS.read().unwrap();
1478 :
1479 : // Resolve TenantId to TenantShardId
1480 20696 : let tenant_shard_id = locked
1481 20696 : .resolve_attached_shard(&tenant_id, shard_selector)
1482 20696 : .ok_or(GetActiveTenantError::NotFound(GetTenantError::NotFound(
1483 20696 : tenant_id,
1484 20696 : )))?;
1485 :
1486 20684 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
1487 20684 : .map_err(GetTenantError::MapState)?;
1488 20684 : match peek_slot {
1489 20684 : Some(TenantSlot::Attached(tenant)) => {
1490 20684 : match tenant.current_state() {
1491 : TenantState::Active => {
1492 : // Fast path: we don't need to do any async waiting.
1493 20551 : return Ok(tenant.clone());
1494 : }
1495 : _ => {
1496 133 : tenant.activate_now();
1497 133 : (WaitFor::Tenant(tenant.clone()), tenant_shard_id)
1498 : }
1499 : }
1500 : }
1501 : Some(TenantSlot::Secondary(_)) => {
1502 0 : return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
1503 0 : tenant_shard_id,
1504 0 : )))
1505 : }
1506 0 : Some(TenantSlot::InProgress(barrier)) => {
1507 0 : (WaitFor::Barrier(barrier.clone()), tenant_shard_id)
1508 : }
1509 : None => {
1510 0 : return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
1511 0 : tenant_id,
1512 0 : )))
1513 : }
1514 : }
1515 : };
1516 :
1517 133 : let tenant = match wait_for {
1518 0 : WaitFor::Barrier(barrier) => {
1519 0 : tracing::debug!("Waiting for tenant InProgress state to pass...");
1520 0 : timeout_cancellable(
1521 0 : deadline.duration_since(Instant::now()),
1522 0 : cancel,
1523 0 : barrier.wait(),
1524 0 : )
1525 0 : .await
1526 0 : .map_err(|e| match e {
1527 0 : TimeoutCancellableError::Timeout => GetActiveTenantError::WaitForActiveTimeout {
1528 0 : latest_state: None,
1529 0 : wait_time: wait_start.elapsed(),
1530 0 : },
1531 0 : TimeoutCancellableError::Cancelled => GetActiveTenantError::Cancelled,
1532 0 : })?;
1533 : {
1534 0 : let locked = TENANTS.read().unwrap();
1535 0 : let peek_slot =
1536 0 : tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
1537 0 : .map_err(GetTenantError::MapState)?;
1538 0 : match peek_slot {
1539 0 : Some(TenantSlot::Attached(tenant)) => tenant.clone(),
1540 : _ => {
1541 0 : return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
1542 0 : tenant_shard_id,
1543 0 : )))
1544 : }
1545 : }
1546 : }
1547 : }
1548 133 : WaitFor::Tenant(tenant) => tenant,
1549 : };
1550 :
1551 0 : tracing::debug!("Waiting for tenant to enter active state...");
1552 133 : tenant
1553 133 : .wait_to_become_active(deadline.duration_since(Instant::now()))
1554 398 : .await?;
1555 133 : Ok(tenant)
1556 20696 : }
1557 :
1558 0 : #[derive(Debug, thiserror::Error)]
1559 : pub(crate) enum DeleteTimelineError {
1560 : #[error("Tenant {0}")]
1561 : Tenant(#[from] GetTenantError),
1562 :
1563 : #[error("Timeline {0}")]
1564 : Timeline(#[from] crate::tenant::DeleteTimelineError),
1565 : }
1566 :
1567 0 : #[derive(Debug, thiserror::Error)]
1568 : pub(crate) enum TenantStateError {
1569 : #[error("Tenant {0} is stopping")]
1570 : IsStopping(TenantShardId),
1571 : #[error(transparent)]
1572 : SlotError(#[from] TenantSlotError),
1573 : #[error(transparent)]
1574 : SlotUpsertError(#[from] TenantSlotUpsertError),
1575 : #[error(transparent)]
1576 : Other(#[from] anyhow::Error),
1577 : }
1578 :
1579 88 : pub(crate) async fn detach_tenant(
1580 88 : conf: &'static PageServerConf,
1581 88 : tenant_shard_id: TenantShardId,
1582 88 : detach_ignored: bool,
1583 88 : deletion_queue_client: &DeletionQueueClient,
1584 88 : ) -> Result<(), TenantStateError> {
1585 88 : let tmp_path = detach_tenant0(
1586 88 : conf,
1587 88 : &TENANTS,
1588 88 : tenant_shard_id,
1589 88 : detach_ignored,
1590 88 : deletion_queue_client,
1591 88 : )
1592 393 : .await?;
1593 : // Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
1594 : // After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
1595 78 : let task_tenant_id = None;
1596 78 : task_mgr::spawn(
1597 78 : task_mgr::BACKGROUND_RUNTIME.handle(),
1598 78 : TaskKind::MgmtRequest,
1599 78 : task_tenant_id,
1600 78 : None,
1601 78 : "tenant_files_delete",
1602 78 : false,
1603 78 : async move {
1604 78 : fs::remove_dir_all(tmp_path.as_path())
1605 78 : .await
1606 78 : .with_context(|| format!("tenant directory {:?} deletion", tmp_path))
1607 78 : },
1608 78 : );
1609 78 : Ok(())
1610 88 : }
1611 :
1612 88 : async fn detach_tenant0(
1613 88 : conf: &'static PageServerConf,
1614 88 : tenants: &std::sync::RwLock<TenantsMap>,
1615 88 : tenant_shard_id: TenantShardId,
1616 88 : detach_ignored: bool,
1617 88 : deletion_queue_client: &DeletionQueueClient,
1618 88 : ) -> Result<Utf8PathBuf, TenantStateError> {
1619 88 : let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
1620 78 : let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
1621 78 : safe_rename_tenant_dir(&local_tenant_directory)
1622 234 : .await
1623 88 : .with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))
1624 78 : };
1625 :
1626 88 : let removal_result = remove_tenant_from_memory(
1627 88 : tenants,
1628 88 : tenant_shard_id,
1629 88 : tenant_dir_rename_operation(tenant_shard_id),
1630 88 : )
1631 390 : .await;
1632 :
1633 : // Flush pending deletions, so that they have a good chance of passing validation
1634 : // before this tenant is potentially re-attached elsewhere.
1635 88 : deletion_queue_client.flush_advisory();
1636 88 :
1637 88 : // Ignored tenants are not present in memory and will bail the removal from memory operation.
1638 88 : // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
1639 88 : if detach_ignored
1640 : && matches!(
1641 8 : removal_result,
1642 : Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
1643 : )
1644 : {
1645 8 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
1646 8 : if tenant_ignore_mark.exists() {
1647 1 : info!("Detaching an ignored tenant");
1648 1 : let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
1649 3 : .await
1650 1 : .with_context(|| {
1651 0 : format!("Ignored tenant {tenant_shard_id} local directory rename")
1652 1 : })?;
1653 1 : return Ok(tmp_path);
1654 7 : }
1655 80 : }
1656 :
1657 87 : removal_result
1658 88 : }
1659 :
1660 4 : pub(crate) async fn load_tenant(
1661 4 : conf: &'static PageServerConf,
1662 4 : tenant_id: TenantId,
1663 4 : generation: Generation,
1664 4 : broker_client: storage_broker::BrokerClientChannel,
1665 4 : remote_storage: Option<GenericRemoteStorage>,
1666 4 : deletion_queue_client: DeletionQueueClient,
1667 4 : ctx: &RequestContext,
1668 4 : ) -> Result<(), TenantMapInsertError> {
1669 4 : // This is a legacy API (replaced by `/location_conf`). It does not support sharding
1670 4 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
1671 :
1672 3 : let slot_guard =
1673 4 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
1674 3 : let tenant_path = conf.tenant_path(&tenant_shard_id);
1675 3 :
1676 3 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
1677 3 : if tenant_ignore_mark.exists() {
1678 3 : std::fs::remove_file(&tenant_ignore_mark).with_context(|| {
1679 0 : format!(
1680 0 : "Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"
1681 0 : )
1682 3 : })?;
1683 0 : }
1684 :
1685 3 : let resources = TenantSharedResources {
1686 3 : broker_client,
1687 3 : remote_storage,
1688 3 : deletion_queue_client,
1689 3 : };
1690 :
1691 3 : let mut location_conf =
1692 3 : Tenant::load_tenant_config(conf, &tenant_shard_id).map_err(TenantMapInsertError::Other)?;
1693 3 : location_conf.attach_in_generation(generation);
1694 3 :
1695 6 : Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
1696 :
1697 3 : let shard_identity = location_conf.shard;
1698 3 : let new_tenant = tenant_spawn(
1699 3 : conf,
1700 3 : tenant_shard_id,
1701 3 : &tenant_path,
1702 3 : resources,
1703 3 : AttachedTenantConf::try_from(location_conf)?,
1704 3 : shard_identity,
1705 3 : None,
1706 3 : &TENANTS,
1707 3 : SpawnMode::Normal,
1708 3 : ctx,
1709 3 : )
1710 3 : .with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))?;
1711 :
1712 3 : slot_guard.upsert(TenantSlot::Attached(new_tenant))?;
1713 3 : Ok(())
1714 4 : }
1715 :
1716 5 : pub(crate) async fn ignore_tenant(
1717 5 : conf: &'static PageServerConf,
1718 5 : tenant_id: TenantId,
1719 5 : ) -> Result<(), TenantStateError> {
1720 15 : ignore_tenant0(conf, &TENANTS, tenant_id).await
1721 5 : }
1722 :
1723 0 : #[instrument(skip_all, fields(shard_id))]
1724 : async fn ignore_tenant0(
1725 : conf: &'static PageServerConf,
1726 : tenants: &std::sync::RwLock<TenantsMap>,
1727 : tenant_id: TenantId,
1728 : ) -> Result<(), TenantStateError> {
1729 : // This is a legacy API (replaced by `/location_conf`). It does not support sharding
1730 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
1731 : tracing::Span::current().record(
1732 : "shard_id",
1733 : tracing::field::display(tenant_shard_id.shard_slug()),
1734 : );
1735 :
1736 5 : remove_tenant_from_memory(tenants, tenant_shard_id, async {
1737 5 : let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
1738 5 : fs::File::create(&ignore_mark_file)
1739 5 : .await
1740 5 : .context("Failed to create ignore mark file")
1741 5 : .and_then(|_| {
1742 5 : crashsafe::fsync_file_and_parent(&ignore_mark_file)
1743 5 : .context("Failed to fsync ignore mark file")
1744 5 : })
1745 5 : .with_context(|| format!("Failed to crate ignore mark for tenant {tenant_shard_id}"))?;
1746 5 : Ok(())
1747 5 : })
1748 : .await
1749 : }
1750 :
1751 0 : #[derive(Debug, thiserror::Error)]
1752 : pub(crate) enum TenantMapListError {
1753 : #[error("tenant map is still initiailizing")]
1754 : Initializing,
1755 : }
1756 :
1757 : ///
1758 : /// Get list of tenants, for the mgmt API
1759 : ///
1760 232 : pub(crate) async fn list_tenants(
1761 232 : ) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
1762 232 : let tenants = TENANTS.read().unwrap();
1763 232 : let m = match &*tenants {
1764 0 : TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
1765 232 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
1766 232 : };
1767 232 : Ok(m.iter()
1768 242 : .filter_map(|(id, tenant)| match tenant {
1769 240 : TenantSlot::Attached(tenant) => {
1770 240 : Some((*id, tenant.current_state(), tenant.generation()))
1771 : }
1772 2 : TenantSlot::Secondary(_) => None,
1773 0 : TenantSlot::InProgress(_) => None,
1774 242 : })
1775 232 : .collect())
1776 232 : }
1777 :
1778 0 : #[derive(Debug, thiserror::Error)]
1779 : pub(crate) enum TenantMapInsertError {
1780 : #[error(transparent)]
1781 : SlotError(#[from] TenantSlotError),
1782 : #[error(transparent)]
1783 : SlotUpsertError(#[from] TenantSlotUpsertError),
1784 : #[error(transparent)]
1785 : Other(#[from] anyhow::Error),
1786 : }
1787 :
1788 : /// Superset of TenantMapError: issues that can occur when acquiring a slot
1789 : /// for a particular tenant ID.
1790 1 : #[derive(Debug, thiserror::Error)]
1791 : pub(crate) enum TenantSlotError {
1792 : /// When acquiring a slot with the expectation that the tenant already exists.
1793 : #[error("Tenant {0} not found")]
1794 : NotFound(TenantShardId),
1795 :
1796 : /// When acquiring a slot with the expectation that the tenant does not already exist.
1797 : #[error("tenant {0} already exists, state: {1:?}")]
1798 : AlreadyExists(TenantShardId, TenantState),
1799 :
1800 : // Tried to read a slot that is currently being mutated by another administrative
1801 : // operation.
1802 : #[error("tenant has a state change in progress, try again later")]
1803 : InProgress,
1804 :
1805 : #[error(transparent)]
1806 : MapState(#[from] TenantMapError),
1807 : }
1808 :
1809 : /// Superset of TenantMapError: issues that can occur when using a SlotGuard
1810 : /// to insert a new value.
1811 0 : #[derive(thiserror::Error)]
1812 : pub(crate) enum TenantSlotUpsertError {
1813 : /// An error where the slot is in an unexpected state, indicating a code bug
1814 : #[error("Internal error updating Tenant")]
1815 : InternalError(Cow<'static, str>),
1816 :
1817 : #[error(transparent)]
1818 : MapState(TenantMapError),
1819 :
1820 : // If we encounter TenantManager shutdown during upsert, we must carry the Completion
1821 : // from the SlotGuard, so that the caller can hold it while they clean up: otherwise
1822 : // TenantManager shutdown might race ahead before we're done cleaning up any Tenant that
1823 : // was protected by the SlotGuard.
1824 : #[error("Shutting down")]
1825 : ShuttingDown((TenantSlot, utils::completion::Completion)),
1826 : }
1827 :
1828 : impl std::fmt::Debug for TenantSlotUpsertError {
1829 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1830 0 : match self {
1831 0 : Self::InternalError(reason) => write!(f, "Internal Error {reason}"),
1832 0 : Self::MapState(map_error) => write!(f, "Tenant map state: {map_error:?}"),
1833 0 : Self::ShuttingDown(_completion) => write!(f, "Tenant map shutting down"),
1834 : }
1835 0 : }
1836 : }
1837 :
1838 0 : #[derive(Debug, thiserror::Error)]
1839 : enum TenantSlotDropError {
1840 : /// It is only legal to drop a TenantSlot if its contents are fully shut down
1841 : #[error("Tenant was not shut down")]
1842 : NotShutdown,
1843 : }
1844 :
1845 : /// Errors that can happen any time we are walking the tenant map to try and acquire
1846 : /// the TenantSlot for a particular tenant.
1847 0 : #[derive(Debug, thiserror::Error)]
1848 : pub enum TenantMapError {
1849 : // Tried to read while initializing
1850 : #[error("tenant map is still initializing")]
1851 : StillInitializing,
1852 :
1853 : // Tried to read while shutting down
1854 : #[error("tenant map is shutting down")]
1855 : ShuttingDown,
1856 : }
1857 :
1858 : /// Guards a particular tenant_id's content in the TenantsMap. While this
1859 : /// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
1860 : /// for this tenant, which acts as a marker for any operations targeting
1861 : /// this tenant to retry later, or wait for the InProgress state to end.
1862 : ///
1863 : /// This structure enforces the important invariant that we do not have overlapping
1864 : /// tasks that will try use local storage for a the same tenant ID: we enforce that
1865 : /// the previous contents of a slot have been shut down before the slot can be
1866 : /// left empty or used for something else
1867 : ///
1868 : /// Holders of a SlotGuard should explicitly dispose of it, using either `upsert`
1869 : /// to provide a new value, or `revert` to put the slot back into its initial
1870 : /// state. If the SlotGuard is dropped without calling either of these, then
1871 : /// we will leave the slot empty if our `old_value` is already shut down, else
1872 : /// we will replace the slot with `old_value` (equivalent to doing a revert).
1873 : ///
1874 : /// The `old_value` may be dropped before the SlotGuard is dropped, by calling
1875 : /// `drop_old_value`. It is an error to call this without shutting down
1876 : /// the conents of `old_value`.
1877 : pub struct SlotGuard {
1878 : tenant_shard_id: TenantShardId,
1879 : old_value: Option<TenantSlot>,
1880 : upserted: bool,
1881 :
1882 : /// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will
1883 : /// release any waiters as soon as this SlotGuard is dropped.
1884 : completion: utils::completion::Completion,
1885 : }
1886 :
1887 : impl SlotGuard {
1888 865 : fn new(
1889 865 : tenant_shard_id: TenantShardId,
1890 865 : old_value: Option<TenantSlot>,
1891 865 : completion: utils::completion::Completion,
1892 865 : ) -> Self {
1893 865 : Self {
1894 865 : tenant_shard_id,
1895 865 : old_value,
1896 865 : upserted: false,
1897 865 : completion,
1898 865 : }
1899 865 : }
1900 :
1901 : /// Get any value that was present in the slot before we acquired ownership
1902 : /// of it: in state transitions, this will be the old state.
1903 862 : fn get_old_value(&self) -> &Option<TenantSlot> {
1904 862 : &self.old_value
1905 862 : }
1906 :
1907 : /// Emplace a new value in the slot. This consumes the guard, and after
1908 : /// returning, the slot is no longer protected from concurrent changes.
1909 780 : fn upsert(mut self, new_value: TenantSlot) -> Result<(), TenantSlotUpsertError> {
1910 780 : if !self.old_value_is_shutdown() {
1911 : // This is a bug: callers should never try to drop an old value without
1912 : // shutting it down
1913 0 : return Err(TenantSlotUpsertError::InternalError(
1914 0 : "Old TenantSlot value not shut down".into(),
1915 0 : ));
1916 780 : }
1917 :
1918 773 : let replaced = {
1919 780 : let mut locked = TENANTS.write().unwrap();
1920 780 :
1921 780 : if let TenantSlot::InProgress(_) = new_value {
1922 : // It is never expected to try and upsert InProgress via this path: it should
1923 : // only be written via the tenant_map_acquire_slot path. If we hit this it's a bug.
1924 0 : return Err(TenantSlotUpsertError::InternalError(
1925 0 : "Attempt to upsert an InProgress state".into(),
1926 0 : ));
1927 780 : }
1928 :
1929 780 : let m = match &mut *locked {
1930 : TenantsMap::Initializing => {
1931 0 : return Err(TenantSlotUpsertError::MapState(
1932 0 : TenantMapError::StillInitializing,
1933 0 : ))
1934 : }
1935 : TenantsMap::ShuttingDown(_) => {
1936 7 : return Err(TenantSlotUpsertError::ShuttingDown((
1937 7 : new_value,
1938 7 : self.completion.clone(),
1939 7 : )));
1940 : }
1941 773 : TenantsMap::Open(m) => m,
1942 773 : };
1943 773 :
1944 773 : let replaced = m.insert(self.tenant_shard_id, new_value);
1945 773 : self.upserted = true;
1946 773 :
1947 773 : METRICS.tenant_slots.set(m.len() as u64);
1948 773 :
1949 773 : replaced
1950 : };
1951 :
1952 : // Sanity check: on an upsert we should always be replacing an InProgress marker
1953 773 : match replaced {
1954 : Some(TenantSlot::InProgress(_)) => {
1955 : // Expected case: we find our InProgress in the map: nothing should have
1956 : // replaced it because the code that acquires slots will not grant another
1957 : // one for the same TenantId.
1958 773 : Ok(())
1959 : }
1960 : None => {
1961 0 : METRICS.unexpected_errors.inc();
1962 0 : error!(
1963 0 : tenant_shard_id = %self.tenant_shard_id,
1964 0 : "Missing InProgress marker during tenant upsert, this is a bug."
1965 0 : );
1966 0 : Err(TenantSlotUpsertError::InternalError(
1967 0 : "Missing InProgress marker during tenant upsert".into(),
1968 0 : ))
1969 : }
1970 0 : Some(slot) => {
1971 0 : METRICS.unexpected_errors.inc();
1972 0 : error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot);
1973 0 : Err(TenantSlotUpsertError::InternalError(
1974 0 : "Unexpected contents of TenantSlot".into(),
1975 0 : ))
1976 : }
1977 : }
1978 780 : }
1979 :
1980 : /// Replace the InProgress slot with whatever was in the guard when we started
1981 104 : fn revert(mut self) {
1982 104 : if let Some(value) = self.old_value.take() {
1983 104 : match self.upsert(value) {
1984 0 : Err(TenantSlotUpsertError::InternalError(_)) => {
1985 0 : // We already logged the error, nothing else we can do.
1986 0 : }
1987 : Err(
1988 : TenantSlotUpsertError::MapState(_) | TenantSlotUpsertError::ShuttingDown(_),
1989 0 : ) => {
1990 0 : // If the map is shutting down, we need not replace anything
1991 0 : }
1992 104 : Ok(()) => {}
1993 : }
1994 0 : }
1995 104 : }
1996 :
1997 : /// We may never drop our old value until it is cleanly shut down: otherwise we might leave
1998 : /// rogue background tasks that would write to the local tenant directory that this guard
1999 : /// is responsible for protecting
2000 1023 : fn old_value_is_shutdown(&self) -> bool {
2001 1023 : match self.old_value.as_ref() {
2002 155 : Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(),
2003 22 : Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(),
2004 : Some(TenantSlot::InProgress(_)) => {
2005 : // A SlotGuard cannot be constructed for a slot that was already InProgress
2006 0 : unreachable!()
2007 : }
2008 846 : None => true,
2009 : }
2010 1023 : }
2011 :
2012 : /// The guard holder is done with the old value of the slot: they are obliged to already
2013 : /// shut it down before we reach this point.
2014 160 : fn drop_old_value(&mut self) -> Result<(), TenantSlotDropError> {
2015 160 : if !self.old_value_is_shutdown() {
2016 0 : Err(TenantSlotDropError::NotShutdown)
2017 : } else {
2018 160 : self.old_value.take();
2019 160 : Ok(())
2020 : }
2021 160 : }
2022 : }
2023 :
2024 : impl Drop for SlotGuard {
2025 865 : fn drop(&mut self) {
2026 865 : if self.upserted {
2027 773 : return;
2028 92 : }
2029 92 : // Our old value is already shutdown, or it never existed: it is safe
2030 92 : // for us to fully release the TenantSlot back into an empty state
2031 92 :
2032 92 : let mut locked = TENANTS.write().unwrap();
2033 :
2034 92 : let m = match &mut *locked {
2035 : TenantsMap::Initializing => {
2036 : // There is no map, this should never happen.
2037 2 : return;
2038 : }
2039 : TenantsMap::ShuttingDown(_) => {
2040 : // When we transition to shutdown, InProgress elements are removed
2041 : // from the map, so we do not need to clean up our Inprogress marker.
2042 : // See [`shutdown_all_tenants0`]
2043 7 : return;
2044 : }
2045 83 : TenantsMap::Open(m) => m,
2046 83 : };
2047 83 :
2048 83 : use std::collections::btree_map::Entry;
2049 83 : match m.entry(self.tenant_shard_id) {
2050 83 : Entry::Occupied(mut entry) => {
2051 83 : if !matches!(entry.get(), TenantSlot::InProgress(_)) {
2052 0 : METRICS.unexpected_errors.inc();
2053 0 : error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get());
2054 83 : }
2055 :
2056 83 : if self.old_value_is_shutdown() {
2057 83 : entry.remove();
2058 83 : } else {
2059 0 : entry.insert(self.old_value.take().unwrap());
2060 0 : }
2061 : }
2062 : Entry::Vacant(_) => {
2063 0 : METRICS.unexpected_errors.inc();
2064 0 : error!(
2065 0 : tenant_shard_id = %self.tenant_shard_id,
2066 0 : "Missing InProgress marker during SlotGuard drop, this is a bug."
2067 0 : );
2068 : }
2069 : }
2070 :
2071 83 : METRICS.tenant_slots.set(m.len() as u64);
2072 865 : }
2073 : }
2074 :
2075 : enum TenantSlotPeekMode {
2076 : /// In Read mode, peek will be permitted to see the slots even if the pageserver is shutting down
2077 : Read,
2078 : /// In Write mode, trying to peek at a slot while the pageserver is shutting down is an error
2079 : Write,
2080 : }
2081 :
2082 28500 : fn tenant_map_peek_slot<'a>(
2083 28500 : tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
2084 28500 : tenant_shard_id: &TenantShardId,
2085 28500 : mode: TenantSlotPeekMode,
2086 28500 : ) -> Result<Option<&'a TenantSlot>, TenantMapError> {
2087 28500 : match tenants.deref() {
2088 0 : TenantsMap::Initializing => Err(TenantMapError::StillInitializing),
2089 18 : TenantsMap::ShuttingDown(m) => match mode {
2090 : TenantSlotPeekMode::Read => Ok(Some(
2091 : // When reading in ShuttingDown state, we must translate None results
2092 : // into a ShuttingDown error, because absence of a tenant shard ID in the map
2093 : // isn't a reliable indicator of the tenant being gone: it might have been
2094 : // InProgress when shutdown started, and cleaned up from that state such
2095 : // that it's now no longer in the map. Callers will have to wait until
2096 : // we next start up to get a proper answer. This avoids incorrect 404 API responses.
2097 0 : m.get(tenant_shard_id).ok_or(TenantMapError::ShuttingDown)?,
2098 : )),
2099 18 : TenantSlotPeekMode::Write => Err(TenantMapError::ShuttingDown),
2100 : },
2101 28482 : TenantsMap::Open(m) => Ok(m.get(tenant_shard_id)),
2102 : }
2103 28500 : }
2104 :
2105 : enum TenantSlotAcquireMode {
2106 : /// Acquire the slot irrespective of current state, or whether it already exists
2107 : Any,
2108 : /// Return an error if trying to acquire a slot and it doesn't already exist
2109 : MustExist,
2110 : /// Return an error if trying to acquire a slot and it already exists
2111 : MustNotExist,
2112 : }
2113 :
2114 816 : fn tenant_map_acquire_slot(
2115 816 : tenant_shard_id: &TenantShardId,
2116 816 : mode: TenantSlotAcquireMode,
2117 816 : ) -> Result<SlotGuard, TenantSlotError> {
2118 816 : tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode)
2119 816 : }
2120 :
2121 911 : fn tenant_map_acquire_slot_impl(
2122 911 : tenant_shard_id: &TenantShardId,
2123 911 : tenants: &std::sync::RwLock<TenantsMap>,
2124 911 : mode: TenantSlotAcquireMode,
2125 911 : ) -> Result<SlotGuard, TenantSlotError> {
2126 911 : use TenantSlotAcquireMode::*;
2127 911 : METRICS.tenant_slot_writes.inc();
2128 911 :
2129 911 : let mut locked = tenants.write().unwrap();
2130 911 : let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug());
2131 911 : let _guard = span.enter();
2132 :
2133 911 : let m = match &mut *locked {
2134 0 : TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
2135 0 : TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
2136 911 : TenantsMap::Open(m) => m,
2137 911 : };
2138 911 :
2139 911 : use std::collections::btree_map::Entry;
2140 911 :
2141 911 : let entry = m.entry(*tenant_shard_id);
2142 911 :
2143 911 : match entry {
2144 609 : Entry::Vacant(v) => match mode {
2145 : MustExist => {
2146 25 : tracing::debug!("Vacant && MustExist: return NotFound");
2147 25 : Err(TenantSlotError::NotFound(*tenant_shard_id))
2148 : }
2149 : _ => {
2150 584 : let (completion, barrier) = utils::completion::channel();
2151 584 : v.insert(TenantSlot::InProgress(barrier));
2152 584 : tracing::debug!("Vacant, inserted InProgress");
2153 584 : Ok(SlotGuard::new(*tenant_shard_id, None, completion))
2154 : }
2155 : },
2156 302 : Entry::Occupied(mut o) => {
2157 302 : // Apply mode-driven checks
2158 302 : match (o.get(), mode) {
2159 : (TenantSlot::InProgress(_), _) => {
2160 20 : tracing::debug!("Occupied, failing for InProgress");
2161 20 : Err(TenantSlotError::InProgress)
2162 : }
2163 1 : (slot, MustNotExist) => match slot {
2164 1 : TenantSlot::Attached(tenant) => {
2165 1 : tracing::debug!("Attached && MustNotExist, return AlreadyExists");
2166 1 : Err(TenantSlotError::AlreadyExists(
2167 1 : *tenant_shard_id,
2168 1 : tenant.current_state(),
2169 1 : ))
2170 : }
2171 : _ => {
2172 : // FIXME: the AlreadyExists error assumes that we have a Tenant
2173 : // to get the state from
2174 0 : tracing::debug!("Occupied & MustNotExist, return AlreadyExists");
2175 0 : Err(TenantSlotError::AlreadyExists(
2176 0 : *tenant_shard_id,
2177 0 : TenantState::Broken {
2178 0 : reason: "Present but not attached".to_string(),
2179 0 : backtrace: "".to_string(),
2180 0 : },
2181 0 : ))
2182 : }
2183 : },
2184 : _ => {
2185 : // Happy case: the slot was not in any state that violated our mode
2186 281 : let (completion, barrier) = utils::completion::channel();
2187 281 : let old_value = o.insert(TenantSlot::InProgress(barrier));
2188 281 : tracing::debug!("Occupied, replaced with InProgress");
2189 281 : Ok(SlotGuard::new(
2190 281 : *tenant_shard_id,
2191 281 : Some(old_value),
2192 281 : completion,
2193 281 : ))
2194 : }
2195 : }
2196 : }
2197 : }
2198 911 : }
2199 :
2200 : /// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
2201 : /// Allows to remove other tenant resources manually, via `tenant_cleanup`.
2202 : /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
2203 : /// operation would be needed to remove it.
2204 95 : async fn remove_tenant_from_memory<V, F>(
2205 95 : tenants: &std::sync::RwLock<TenantsMap>,
2206 95 : tenant_shard_id: TenantShardId,
2207 95 : tenant_cleanup: F,
2208 95 : ) -> Result<V, TenantStateError>
2209 95 : where
2210 95 : F: std::future::Future<Output = anyhow::Result<V>>,
2211 95 : {
2212 : use utils::completion;
2213 :
2214 84 : let mut slot_guard =
2215 95 : tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
2216 :
2217 : // allow pageserver shutdown to await for our completion
2218 84 : let (_guard, progress) = completion::channel();
2219 :
2220 : // The SlotGuard allows us to manipulate the Tenant object without fear of some
2221 : // concurrent API request doing something else for the same tenant ID.
2222 84 : let attached_tenant = match slot_guard.get_old_value() {
2223 79 : Some(TenantSlot::Attached(tenant)) => {
2224 79 : // whenever we remove a tenant from memory, we don't want to flush and wait for upload
2225 79 : let freeze_and_flush = false;
2226 79 :
2227 79 : // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
2228 79 : // that we can continue safely to cleanup.
2229 169 : match tenant.shutdown(progress, freeze_and_flush).await {
2230 79 : Ok(()) => {}
2231 0 : Err(_other) => {
2232 0 : // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
2233 0 : // wait for it but return an error right away because these are distinct requests.
2234 0 : slot_guard.revert();
2235 0 : return Err(TenantStateError::IsStopping(tenant_shard_id));
2236 : }
2237 : }
2238 79 : Some(tenant)
2239 : }
2240 5 : Some(TenantSlot::Secondary(secondary_state)) => {
2241 5 : tracing::info!("Shutting down in secondary mode");
2242 5 : secondary_state.shutdown().await;
2243 5 : None
2244 : }
2245 : Some(TenantSlot::InProgress(_)) => {
2246 : // Acquiring a slot guarantees its old value was not InProgress
2247 0 : unreachable!();
2248 : }
2249 0 : None => None,
2250 : };
2251 :
2252 84 : match tenant_cleanup
2253 238 : .await
2254 84 : .with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
2255 : {
2256 84 : Ok(hook_value) => {
2257 84 : // Success: drop the old TenantSlot::Attached.
2258 84 : slot_guard
2259 84 : .drop_old_value()
2260 84 : .expect("We just called shutdown");
2261 84 :
2262 84 : Ok(hook_value)
2263 : }
2264 0 : Err(e) => {
2265 : // If we had a Tenant, set it to Broken and put it back in the TenantsMap
2266 0 : if let Some(attached_tenant) = attached_tenant {
2267 0 : attached_tenant.set_broken(e.to_string()).await;
2268 0 : }
2269 : // Leave the broken tenant in the map
2270 0 : slot_guard.revert();
2271 0 :
2272 0 : Err(TenantStateError::Other(e))
2273 : }
2274 : }
2275 95 : }
2276 :
2277 : use {
2278 : crate::repository::GcResult, pageserver_api::models::TimelineGcRequest,
2279 : utils::http::error::ApiError,
2280 : };
2281 :
2282 251 : pub(crate) async fn immediate_gc(
2283 251 : tenant_shard_id: TenantShardId,
2284 251 : timeline_id: TimelineId,
2285 251 : gc_req: TimelineGcRequest,
2286 251 : cancel: CancellationToken,
2287 251 : ctx: &RequestContext,
2288 251 : ) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
2289 251 : let guard = TENANTS.read().unwrap();
2290 :
2291 251 : let tenant = guard
2292 251 : .get(&tenant_shard_id)
2293 251 : .map(Arc::clone)
2294 251 : .with_context(|| format!("tenant {tenant_shard_id}"))
2295 251 : .map_err(|e| ApiError::NotFound(e.into()))?;
2296 :
2297 250 : let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
2298 250 : // Use tenant's pitr setting
2299 250 : let pitr = tenant.get_pitr_interval();
2300 250 :
2301 250 : // Run in task_mgr to avoid race with tenant_detach operation
2302 250 : let ctx = ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
2303 250 : let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
2304 250 : // TODO: spawning is redundant now, need to hold the gate
2305 250 : task_mgr::spawn(
2306 250 : &tokio::runtime::Handle::current(),
2307 250 : TaskKind::GarbageCollector,
2308 250 : Some(tenant_shard_id),
2309 250 : Some(timeline_id),
2310 250 : &format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"),
2311 250 : false,
2312 250 : async move {
2313 0 : fail::fail_point!("immediate_gc_task_pre");
2314 :
2315 : #[allow(unused_mut)]
2316 250 : let mut result = tenant
2317 250 : .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
2318 250 : .instrument(info_span!("manual_gc", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))
2319 200 : .await;
2320 : // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
2321 : // better once the types support it.
2322 :
2323 : #[cfg(feature = "testing")]
2324 : {
2325 250 : if let Ok(result) = result.as_mut() {
2326 : // why not futures unordered? it seems it needs very much the same task structure
2327 : // but would only run on single task.
2328 249 : let mut js = tokio::task::JoinSet::new();
2329 1031 : for layer in std::mem::take(&mut result.doomed_layers) {
2330 1031 : js.spawn(layer.wait_drop());
2331 1031 : }
2332 249 : tracing::info!(total = js.len(), "starting to wait for the gc'd layers to be dropped");
2333 1280 : while let Some(res) = js.join_next().await {
2334 1031 : res.expect("wait_drop should not panic");
2335 1031 : }
2336 1 : }
2337 :
2338 250 : let timeline = tenant.get_timeline(timeline_id, false).ok();
2339 250 : let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref());
2340 :
2341 250 : if let Some(rtc) = rtc {
2342 : // layer drops schedule actions on remote timeline client to actually do the
2343 : // deletions; don't care just exit fast about the shutdown error
2344 249 : drop(rtc.wait_completion().await);
2345 1 : }
2346 : }
2347 :
2348 250 : match task_done.send(result) {
2349 250 : Ok(_) => (),
2350 0 : Err(result) => error!("failed to send gc result: {result:?}"),
2351 : }
2352 250 : Ok(())
2353 250 : }
2354 250 : );
2355 250 :
2356 250 : // drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
2357 250 : drop(guard);
2358 250 :
2359 250 : Ok(wait_task_done)
2360 251 : }
2361 :
2362 : #[cfg(test)]
2363 : mod tests {
2364 : use std::collections::BTreeMap;
2365 : use std::sync::Arc;
2366 : use tracing::Instrument;
2367 :
2368 : use crate::tenant::mgr::TenantSlot;
2369 :
2370 : use super::{super::harness::TenantHarness, TenantsMap};
2371 :
2372 2 : #[tokio::test(start_paused = true)]
2373 2 : async fn shutdown_awaits_in_progress_tenant() {
2374 2 : // Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully
2375 2 : // wait for it to complete before proceeding.
2376 2 :
2377 2 : let h = TenantHarness::create("shutdown_awaits_in_progress_tenant").unwrap();
2378 2 : let (t, _ctx) = h.load().await;
2379 :
2380 : // harness loads it to active, which is forced and nothing is running on the tenant
2381 :
2382 2 : let id = t.tenant_shard_id();
2383 2 :
2384 2 : // tenant harness configures the logging and we cannot escape it
2385 2 : let span = h.span();
2386 2 : let _e = span.enter();
2387 2 :
2388 2 : let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
2389 2 : let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
2390 2 :
2391 2 : // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
2392 2 : // permit it to proceed: that will stick the tenant in InProgress
2393 2 :
2394 2 : let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
2395 2 : let (until_cleanup_started, cleanup_started) = utils::completion::channel();
2396 2 : let mut remove_tenant_from_memory_task = {
2397 2 : let jh = tokio::spawn({
2398 2 : let tenants = tenants.clone();
2399 2 : async move {
2400 2 : let cleanup = async move {
2401 2 : drop(until_cleanup_started);
2402 2 : can_complete_cleanup.wait().await;
2403 2 : anyhow::Ok(())
2404 2 : };
2405 2 : super::remove_tenant_from_memory(&tenants, id, cleanup).await
2406 2 : }
2407 2 : .instrument(h.span())
2408 2 : });
2409 2 :
2410 2 : // now the long cleanup should be in place, with the stopping state
2411 2 : cleanup_started.wait().await;
2412 2 : jh
2413 : };
2414 :
2415 2 : let mut shutdown_task = {
2416 2 : let (until_shutdown_started, shutdown_started) = utils::completion::channel();
2417 2 :
2418 2 : let shutdown_task = tokio::spawn(async move {
2419 2 : drop(until_shutdown_started);
2420 4 : super::shutdown_all_tenants0(&tenants).await;
2421 2 : });
2422 2 :
2423 2 : shutdown_started.wait().await;
2424 2 : shutdown_task
2425 2 : };
2426 2 :
2427 2 : let long_time = std::time::Duration::from_secs(15);
2428 4 : tokio::select! {
2429 4 : _ = &mut shutdown_task => unreachable!("shutdown should block on remove_tenant_from_memory completing"),
2430 4 : _ = &mut remove_tenant_from_memory_task => unreachable!("remove_tenant_from_memory_task should not complete until explicitly unblocked"),
2431 4 : _ = tokio::time::sleep(long_time) => {},
2432 4 : }
2433 :
2434 2 : drop(until_cleanup_completed);
2435 2 :
2436 2 : // Now that we allow it to proceed, shutdown should complete immediately
2437 2 : remove_tenant_from_memory_task.await.unwrap().unwrap();
2438 2 : shutdown_task.await.unwrap();
2439 : }
2440 : }
|