Line data Source code
1 : //! This module acts as a switchboard to access different repositories managed by this
2 : //! page server.
3 :
4 : use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
5 : use itertools::Itertools;
6 : use pageserver_api::key::Key;
7 : use pageserver_api::models::ShardParameters;
8 : use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, TenantShardId};
9 : use rand::{distributions::Alphanumeric, Rng};
10 : use std::borrow::Cow;
11 : use std::cmp::Ordering;
12 : use std::collections::{BTreeMap, HashMap};
13 : use std::ops::Deref;
14 : use std::sync::Arc;
15 : use std::time::{Duration, Instant};
16 : use tokio::fs;
17 : use utils::timeout::{timeout_cancellable, TimeoutCancellableError};
18 :
19 : use anyhow::Context;
20 : use once_cell::sync::Lazy;
21 : use tokio::task::JoinSet;
22 : use tokio_util::sync::CancellationToken;
23 : use tracing::*;
24 :
25 : use remote_storage::GenericRemoteStorage;
26 : use utils::{completion, crashsafe};
27 :
28 : use crate::config::PageServerConf;
29 : use crate::context::{DownloadBehavior, RequestContext};
30 : use crate::control_plane_client::{
31 : ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError,
32 : };
33 : use crate::deletion_queue::DeletionQueueClient;
34 : use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
35 : use crate::task_mgr::{self, TaskKind};
36 : use crate::tenant::config::{
37 : AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, SecondaryLocationConfig,
38 : TenantConfOpt,
39 : };
40 : use crate::tenant::delete::DeleteTenantFlow;
41 : use crate::tenant::span::debug_assert_current_span_has_tenant_id;
42 : use crate::tenant::{AttachedTenantConf, SpawnMode, Tenant, TenantState};
43 : use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
44 :
45 : use utils::crashsafe::path_with_suffix_extension;
46 : use utils::fs_ext::PathExt;
47 : use utils::generation::Generation;
48 : use utils::id::{TenantId, TimelineId};
49 :
50 : use super::delete::DeleteTenantError;
51 : use super::secondary::SecondaryTenant;
52 : use super::TenantSharedResources;
53 :
54 : /// For a tenant that appears in TenantsMap, it may either be
55 : /// - `Attached`: has a full Tenant object, is elegible to service
56 : /// reads and ingest WAL.
57 : /// - `Secondary`: is only keeping a local cache warm.
58 : ///
59 : /// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
60 : /// that way we avoid having to carefully switch a tenant's ingestion etc on and off during
61 : /// its lifetime, and we can preserve some important safety invariants like `Tenant` always
62 : /// having a properly acquired generation (Secondary doesn't need a generation)
63 5 : #[derive(Clone)]
64 : pub(crate) enum TenantSlot {
65 : Attached(Arc<Tenant>),
66 : Secondary(Arc<SecondaryTenant>),
67 : /// In this state, other administrative operations acting on the TenantId should
68 : /// block, or return a retry indicator equivalent to HTTP 503.
69 : InProgress(utils::completion::Barrier),
70 : }
71 :
72 : impl std::fmt::Debug for TenantSlot {
73 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 0 : match self {
75 0 : Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()),
76 0 : Self::Secondary(_) => write!(f, "Secondary"),
77 0 : Self::InProgress(_) => write!(f, "InProgress"),
78 : }
79 0 : }
80 : }
81 :
82 : impl TenantSlot {
83 : /// Return the `Tenant` in this slot if attached, else None
84 1370 : fn get_attached(&self) -> Option<&Arc<Tenant>> {
85 1370 : match self {
86 1351 : Self::Attached(t) => Some(t),
87 10 : Self::Secondary(_) => None,
88 9 : Self::InProgress(_) => None,
89 : }
90 1370 : }
91 : }
92 :
93 : /// The tenants known to the pageserver.
94 : /// The enum variants are used to distinguish the different states that the pageserver can be in.
95 : pub(crate) enum TenantsMap {
96 : /// [`init_tenant_mgr`] is not done yet.
97 : Initializing,
98 : /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
99 : /// New tenants can be added using [`tenant_map_acquire_slot`].
100 : Open(BTreeMap<TenantShardId, TenantSlot>),
101 : /// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
102 : /// Existing tenants are still accessible, but no new tenants can be created.
103 : ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
104 : }
105 :
106 : pub(crate) enum TenantsMapRemoveResult {
107 : Occupied(TenantSlot),
108 : Vacant,
109 : InProgress(utils::completion::Barrier),
110 : }
111 :
112 : /// When resolving a TenantId to a shard, we may be looking for the 0th
113 : /// shard, or we might be looking for whichever shard holds a particular page.
114 : pub(crate) enum ShardSelector {
115 : /// Only return the 0th shard, if it is present. If a non-0th shard is present,
116 : /// ignore it.
117 : Zero,
118 : /// Pick the first shard we find for the TenantId
119 : First,
120 : /// Pick the shard that holds this key
121 : Page(Key),
122 : }
123 :
124 : impl TenantsMap {
125 : /// Convenience function for typical usage, where we want to get a `Tenant` object, for
126 : /// working with attached tenants. If the TenantId is in the map but in Secondary state,
127 : /// None is returned.
128 324 : pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc<Tenant>> {
129 324 : match self {
130 0 : TenantsMap::Initializing => None,
131 324 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
132 324 : m.get(tenant_shard_id).and_then(|slot| slot.get_attached())
133 : }
134 : }
135 324 : }
136 :
137 : /// A page service client sends a TenantId, and to look up the correct Tenant we must
138 : /// resolve this to a fully qualified TenantShardId.
139 20472 : fn resolve_attached_shard(
140 20472 : &self,
141 20472 : tenant_id: &TenantId,
142 20472 : selector: ShardSelector,
143 20472 : ) -> Option<TenantShardId> {
144 20472 : let mut want_shard = None;
145 20472 : match self {
146 0 : TenantsMap::Initializing => None,
147 20472 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
148 20517 : for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
149 : // Ignore all slots that don't contain an attached tenant
150 20517 : let tenant = match &slot.1 {
151 20501 : TenantSlot::Attached(t) => t,
152 16 : _ => continue,
153 : };
154 :
155 2819 : match selector {
156 9910 : ShardSelector::First => return Some(*slot.0),
157 2819 : ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
158 2819 : return Some(*slot.0)
159 : }
160 7772 : ShardSelector::Page(key) => {
161 7772 : // First slot we see for this tenant, calculate the expected shard number
162 7772 : // for the key: we will use this for checking if this and subsequent
163 7772 : // slots contain the key, rather than recalculating the hash each time.
164 7772 : if want_shard.is_none() {
165 7722 : want_shard = Some(tenant.shard_identity.get_shard_number(&key));
166 7722 : }
167 :
168 7772 : if Some(tenant.shard_identity.number) == want_shard {
169 7722 : return Some(*slot.0);
170 50 : }
171 : }
172 0 : _ => continue,
173 : }
174 : }
175 :
176 : // Fall through: we didn't find an acceptable shard
177 21 : None
178 : }
179 : }
180 20472 : }
181 :
182 : /// Only for use from DeleteTenantFlow. This method directly removes a TenantSlot from the map.
183 : ///
184 : /// The normal way to remove a tenant is using a SlotGuard, which will gracefully remove the guarded
185 : /// slot if the enclosed tenant is shutdown.
186 72 : pub(crate) fn remove(&mut self, tenant_shard_id: TenantShardId) -> TenantsMapRemoveResult {
187 72 : use std::collections::btree_map::Entry;
188 72 : match self {
189 0 : TenantsMap::Initializing => TenantsMapRemoveResult::Vacant,
190 72 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => match m.entry(tenant_shard_id) {
191 72 : Entry::Occupied(entry) => match entry.get() {
192 1 : TenantSlot::InProgress(barrier) => {
193 1 : TenantsMapRemoveResult::InProgress(barrier.clone())
194 : }
195 71 : _ => TenantsMapRemoveResult::Occupied(entry.remove()),
196 : },
197 0 : Entry::Vacant(_entry) => TenantsMapRemoveResult::Vacant,
198 : },
199 : }
200 72 : }
201 :
202 72 : pub(crate) fn len(&self) -> usize {
203 72 : match self {
204 0 : TenantsMap::Initializing => 0,
205 72 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.len(),
206 : }
207 72 : }
208 : }
209 :
210 : /// This is "safe" in that that it won't leave behind a partially deleted directory
211 : /// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting
212 : /// the contents.
213 : ///
214 : /// This is pageserver-specific, as it relies on future processes after a crash to check
215 : /// for TEMP_FILE_SUFFIX when loading things.
216 16 : async fn safe_remove_tenant_dir_all(path: impl AsRef<Utf8Path>) -> std::io::Result<()> {
217 48 : let tmp_path = safe_rename_tenant_dir(path).await?;
218 16 : fs::remove_dir_all(tmp_path).await
219 16 : }
220 :
221 97 : async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<Utf8PathBuf> {
222 97 : let parent = path
223 97 : .as_ref()
224 97 : .parent()
225 97 : // It is invalid to call this function with a relative path. Tenant directories
226 97 : // should always have a parent.
227 97 : .ok_or(std::io::Error::new(
228 97 : std::io::ErrorKind::InvalidInput,
229 97 : "Path must be absolute",
230 97 : ))?;
231 97 : let rand_suffix = rand::thread_rng()
232 97 : .sample_iter(&Alphanumeric)
233 97 : .take(8)
234 97 : .map(char::from)
235 97 : .collect::<String>()
236 97 : + TEMP_FILE_SUFFIX;
237 97 : let tmp_path = path_with_suffix_extension(&path, &rand_suffix);
238 97 : fs::rename(path.as_ref(), &tmp_path).await?;
239 97 : fs::File::open(parent).await?.sync_all().await?;
240 97 : Ok(tmp_path)
241 97 : }
242 :
243 : static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
244 627 : Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
245 :
246 : /// The TenantManager is responsible for storing and mutating the collection of all tenants
247 : /// that this pageserver process has state for. Every Tenant and SecondaryTenant instance
248 : /// lives inside the TenantManager.
249 : ///
250 : /// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
251 : /// the same tenant twice concurrently, or trying to configure the same tenant into secondary
252 : /// and attached modes concurrently.
253 : pub struct TenantManager {
254 : conf: &'static PageServerConf,
255 : // TODO: currently this is a &'static pointing to TENANTs. When we finish refactoring
256 : // out of that static variable, the TenantManager can own this.
257 : // See https://github.com/neondatabase/neon/issues/5796
258 : tenants: &'static std::sync::RwLock<TenantsMap>,
259 : resources: TenantSharedResources,
260 : }
261 :
262 1 : fn emergency_generations(
263 1 : tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
264 1 : ) -> HashMap<TenantShardId, Generation> {
265 1 : tenant_confs
266 1 : .iter()
267 1 : .filter_map(|(tid, lc)| {
268 1 : let lc = match lc {
269 1 : Ok(lc) => lc,
270 0 : Err(_) => return None,
271 : };
272 1 : let gen = match &lc.mode {
273 1 : LocationMode::Attached(alc) => Some(alc.generation),
274 0 : LocationMode::Secondary(_) => None,
275 : };
276 :
277 1 : gen.map(|g| (*tid, g))
278 1 : })
279 1 : .collect()
280 1 : }
281 :
282 625 : async fn init_load_generations(
283 625 : conf: &'static PageServerConf,
284 625 : tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
285 625 : resources: &TenantSharedResources,
286 625 : cancel: &CancellationToken,
287 625 : ) -> anyhow::Result<Option<HashMap<TenantShardId, Generation>>> {
288 625 : let generations = if conf.control_plane_emergency_mode {
289 1 : error!(
290 1 : "Emergency mode! Tenants will be attached unsafely using their last known generation"
291 1 : );
292 1 : emergency_generations(tenant_confs)
293 624 : } else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
294 623 : info!("Calling control plane API to re-attach tenants");
295 : // If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
296 1879 : match client.re_attach().await {
297 623 : Ok(tenants) => tenants,
298 : Err(RetryForeverError::ShuttingDown) => {
299 0 : anyhow::bail!("Shut down while waiting for control plane re-attach response")
300 : }
301 : }
302 : } else {
303 1 : info!("Control plane API not configured, tenant generations are disabled");
304 1 : return Ok(None);
305 : };
306 :
307 : // The deletion queue needs to know about the startup attachment state to decide which (if any) stored
308 : // deletion list entries may still be valid. We provide that by pushing a recovery operation into
309 : // the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
310 : // are processed, even though we don't block on recovery completing here.
311 : //
312 : // Must only do this if remote storage is enabled, otherwise deletion queue
313 : // is not running and channel push will fail.
314 624 : if resources.remote_storage.is_some() {
315 624 : resources
316 624 : .deletion_queue_client
317 624 : .recover(generations.clone())?;
318 0 : }
319 :
320 624 : Ok(Some(generations))
321 625 : }
322 :
323 : /// Given a directory discovered in the pageserver's tenants/ directory, attempt
324 : /// to load a tenant config from it.
325 : ///
326 : /// If file is missing, return Ok(None)
327 242 : fn load_tenant_config(
328 242 : conf: &'static PageServerConf,
329 242 : dentry: Utf8DirEntry,
330 242 : ) -> anyhow::Result<Option<(TenantShardId, anyhow::Result<LocationConf>)>> {
331 242 : let tenant_dir_path = dentry.path().to_path_buf();
332 242 : if crate::is_temporary(&tenant_dir_path) {
333 0 : info!("Found temporary tenant directory, removing: {tenant_dir_path}");
334 : // No need to use safe_remove_tenant_dir_all because this is already
335 : // a temporary path
336 0 : if let Err(e) = std::fs::remove_dir_all(&tenant_dir_path) {
337 0 : error!(
338 0 : "Failed to remove temporary directory '{}': {:?}",
339 0 : tenant_dir_path, e
340 0 : );
341 0 : }
342 0 : return Ok(None);
343 242 : }
344 :
345 : // This case happens if we crash during attachment before writing a config into the dir
346 242 : let is_empty = tenant_dir_path
347 242 : .is_empty_dir()
348 242 : .with_context(|| format!("Failed to check whether {tenant_dir_path:?} is an empty dir"))?;
349 242 : if is_empty {
350 2 : info!("removing empty tenant directory {tenant_dir_path:?}");
351 2 : if let Err(e) = std::fs::remove_dir(&tenant_dir_path) {
352 0 : error!(
353 0 : "Failed to remove empty tenant directory '{}': {e:#}",
354 0 : tenant_dir_path
355 0 : )
356 2 : }
357 2 : return Ok(None);
358 240 : }
359 240 :
360 240 : let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
361 240 : if tenant_ignore_mark_file.exists() {
362 1 : info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
363 1 : return Ok(None);
364 239 : }
365 :
366 239 : let tenant_shard_id = match tenant_dir_path
367 239 : .file_name()
368 239 : .unwrap_or_default()
369 239 : .parse::<TenantShardId>()
370 : {
371 239 : Ok(id) => id,
372 : Err(_) => {
373 0 : warn!("Invalid tenant path (garbage in our repo directory?): {tenant_dir_path}",);
374 0 : return Ok(None);
375 : }
376 : };
377 :
378 239 : Ok(Some((
379 239 : tenant_shard_id,
380 239 : Tenant::load_tenant_config(conf, &tenant_shard_id),
381 239 : )))
382 242 : }
383 :
384 : /// Initial stage of load: walk the local tenants directory, clean up any temp files,
385 : /// and load configurations for the tenants we found.
386 : ///
387 : /// Do this in parallel, because we expect 10k+ tenants, so serial execution can take
388 : /// seconds even on reasonably fast drives.
389 625 : async fn init_load_tenant_configs(
390 625 : conf: &'static PageServerConf,
391 625 : ) -> anyhow::Result<HashMap<TenantShardId, anyhow::Result<LocationConf>>> {
392 625 : let tenants_dir = conf.tenants_path();
393 :
394 625 : let dentries = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<Utf8DirEntry>> {
395 625 : let dir_entries = tenants_dir
396 625 : .read_dir_utf8()
397 625 : .with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
398 :
399 625 : Ok(dir_entries.collect::<Result<Vec<_>, std::io::Error>>()?)
400 625 : })
401 625 : .await??;
402 :
403 625 : let mut configs = HashMap::new();
404 625 :
405 625 : let mut join_set = JoinSet::new();
406 867 : for dentry in dentries {
407 242 : join_set.spawn_blocking(move || load_tenant_config(conf, dentry));
408 242 : }
409 :
410 867 : while let Some(r) = join_set.join_next().await {
411 242 : if let Some((tenant_id, tenant_config)) = r?? {
412 239 : configs.insert(tenant_id, tenant_config);
413 239 : }
414 : }
415 :
416 625 : Ok(configs)
417 625 : }
418 :
419 : /// Initialize repositories with locally available timelines.
420 : /// Timelines that are only partially available locally (remote storage has more data than this pageserver)
421 : /// are scheduled for download and added to the tenant once download is completed.
422 1250 : #[instrument(skip_all)]
423 : pub async fn init_tenant_mgr(
424 : conf: &'static PageServerConf,
425 : resources: TenantSharedResources,
426 : init_order: InitializationOrder,
427 : cancel: CancellationToken,
428 : ) -> anyhow::Result<TenantManager> {
429 : let mut tenants = BTreeMap::new();
430 :
431 : let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
432 :
433 : // Scan local filesystem for attached tenants
434 : let tenant_configs = init_load_tenant_configs(conf).await?;
435 :
436 : // Determine which tenants are to be attached
437 : let tenant_generations =
438 : init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
439 :
440 625 : tracing::info!(
441 625 : "Attaching {} tenants at startup, warming up {} at a time",
442 625 : tenant_configs.len(),
443 625 : conf.concurrent_tenant_warmup.initial_permits()
444 625 : );
445 : TENANT.startup_scheduled.inc_by(tenant_configs.len() as u64);
446 :
447 : // Construct `Tenant` objects and start them running
448 : for (tenant_shard_id, location_conf) in tenant_configs {
449 : let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
450 :
451 : let mut location_conf = match location_conf {
452 : Ok(l) => l,
453 : Err(e) => {
454 0 : warn!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Marking tenant broken, failed to {e:#}");
455 :
456 : tenants.insert(
457 : tenant_shard_id,
458 : TenantSlot::Attached(Tenant::create_broken_tenant(
459 : conf,
460 : tenant_shard_id,
461 : format!("{}", e),
462 : )),
463 : );
464 : continue;
465 : }
466 : };
467 :
468 : let generation = if let Some(generations) = &tenant_generations {
469 : // We have a generation map: treat it as the authority for whether
470 : // this tenant is really attached.
471 : if let Some(gen) = generations.get(&tenant_shard_id) {
472 : if let LocationMode::Attached(attached) = &location_conf.mode {
473 : if attached.generation > *gen {
474 0 : tracing::error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
475 0 : "Control plane gave decreasing generation ({gen:?}) in re-attach response for tenant that was attached in generation {:?}, demoting to secondary",
476 0 : attached.generation
477 0 : );
478 :
479 : // We cannot safely attach this tenant given a bogus generation number, but let's avoid throwing away
480 : // local disk content: demote to secondary rather than detaching.
481 : tenants.insert(
482 : tenant_shard_id,
483 : TenantSlot::Secondary(SecondaryTenant::new(
484 : tenant_shard_id,
485 : location_conf.shard,
486 : location_conf.tenant_conf,
487 : &SecondaryLocationConfig { warm: false },
488 : )),
489 : );
490 : }
491 : }
492 : *gen
493 : } else {
494 : match &location_conf.mode {
495 : LocationMode::Secondary(secondary_config) => {
496 : // We do not require the control plane's permission for secondary mode
497 : // tenants, because they do no remote writes and hence require no
498 : // generation number
499 5 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Loaded tenant in secondary mode");
500 : tenants.insert(
501 : tenant_shard_id,
502 : TenantSlot::Secondary(SecondaryTenant::new(
503 : tenant_shard_id,
504 : location_conf.shard,
505 : location_conf.tenant_conf,
506 : secondary_config,
507 : )),
508 : );
509 : }
510 : LocationMode::Attached(_) => {
511 : // TODO: augment re-attach API to enable the control plane to
512 : // instruct us about secondary attachments. That way, instead of throwing
513 : // away local state, we can gracefully fall back to secondary here, if the control
514 : // plane tells us so.
515 : // (https://github.com/neondatabase/neon/issues/5377)
516 16 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Detaching tenant, control plane omitted it in re-attach response");
517 : if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await {
518 0 : error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
519 0 : "Failed to remove detached tenant directory '{tenant_dir_path}': {e:?}",
520 0 : );
521 : }
522 : }
523 : };
524 :
525 : continue;
526 : }
527 : } else {
528 : // Legacy mode: no generation information, any tenant present
529 : // on local disk may activate
530 0 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Starting tenant in legacy mode, no generation",);
531 : Generation::none()
532 : };
533 :
534 : // Presence of a generation number implies attachment: attach the tenant
535 : // if it wasn't already, and apply the generation number.
536 : location_conf.attach_in_generation(generation);
537 : Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
538 :
539 : let shard_identity = location_conf.shard;
540 : match tenant_spawn(
541 : conf,
542 : tenant_shard_id,
543 : &tenant_dir_path,
544 : resources.clone(),
545 : AttachedTenantConf::try_from(location_conf)?,
546 : shard_identity,
547 : Some(init_order.clone()),
548 : &TENANTS,
549 : SpawnMode::Normal,
550 : &ctx,
551 : ) {
552 : Ok(tenant) => {
553 : tenants.insert(tenant_shard_id, TenantSlot::Attached(tenant));
554 : }
555 : Err(e) => {
556 0 : error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to start tenant: {e:#}");
557 : }
558 : }
559 : }
560 :
561 625 : info!("Processed {} local tenants at startup", tenants.len());
562 :
563 : let mut tenants_map = TENANTS.write().unwrap();
564 : assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
565 : METRICS.tenant_slots.set(tenants.len() as u64);
566 : *tenants_map = TenantsMap::Open(tenants);
567 :
568 : Ok(TenantManager {
569 : conf,
570 : tenants: &TENANTS,
571 : resources,
572 : })
573 : }
574 :
575 : /// Wrapper for Tenant::spawn that checks invariants before running, and inserts
576 : /// a broken tenant in the map if Tenant::spawn fails.
577 : #[allow(clippy::too_many_arguments)]
578 884 : pub(crate) fn tenant_spawn(
579 884 : conf: &'static PageServerConf,
580 884 : tenant_shard_id: TenantShardId,
581 884 : tenant_path: &Utf8Path,
582 884 : resources: TenantSharedResources,
583 884 : location_conf: AttachedTenantConf,
584 884 : shard_identity: ShardIdentity,
585 884 : init_order: Option<InitializationOrder>,
586 884 : tenants: &'static std::sync::RwLock<TenantsMap>,
587 884 : mode: SpawnMode,
588 884 : ctx: &RequestContext,
589 884 : ) -> anyhow::Result<Arc<Tenant>> {
590 884 : anyhow::ensure!(
591 884 : tenant_path.is_dir(),
592 0 : "Cannot load tenant from path {tenant_path:?}, it either does not exist or not a directory"
593 : );
594 884 : anyhow::ensure!(
595 884 : !crate::is_temporary(tenant_path),
596 0 : "Cannot load tenant from temporary path {tenant_path:?}"
597 : );
598 : anyhow::ensure!(
599 884 : !tenant_path.is_empty_dir().with_context(|| {
600 0 : format!("Failed to check whether {tenant_path:?} is an empty dir")
601 884 : })?,
602 0 : "Cannot load tenant from empty directory {tenant_path:?}"
603 : );
604 :
605 884 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
606 884 : anyhow::ensure!(
607 884 : !conf.tenant_ignore_mark_file_path(&tenant_shard_id).exists(),
608 0 : "Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
609 : );
610 :
611 884 : let tenant = match Tenant::spawn(
612 884 : conf,
613 884 : tenant_shard_id,
614 884 : resources,
615 884 : location_conf,
616 884 : shard_identity,
617 884 : init_order,
618 884 : tenants,
619 884 : mode,
620 884 : ctx,
621 884 : ) {
622 884 : Ok(tenant) => tenant,
623 0 : Err(e) => {
624 0 : error!("Failed to spawn tenant {tenant_shard_id}, reason: {e:#}");
625 0 : Tenant::create_broken_tenant(conf, tenant_shard_id, format!("{e:#}"))
626 : }
627 : };
628 :
629 884 : Ok(tenant)
630 884 : }
631 :
632 : ///
633 : /// Shut down all tenants. This runs as part of pageserver shutdown.
634 : ///
635 : /// NB: We leave the tenants in the map, so that they remain accessible through
636 : /// the management API until we shut it down. If we removed the shut-down tenants
637 : /// from the tenants map, the management API would return 404 for these tenants,
638 : /// because TenantsMap::get() now returns `None`.
639 : /// That could be easily misinterpreted by control plane, the consumer of the
640 : /// management API. For example, it could attach the tenant on a different pageserver.
641 : /// We would then be in split-brain once this pageserver restarts.
642 366 : #[instrument(skip_all)]
643 : pub(crate) async fn shutdown_all_tenants() {
644 : shutdown_all_tenants0(&TENANTS).await
645 : }
646 :
647 185 : async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
648 185 : let mut join_set = JoinSet::new();
649 :
650 : // Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
651 185 : let (total_in_progress, total_attached) = {
652 185 : let mut m = tenants.write().unwrap();
653 185 : match &mut *m {
654 : TenantsMap::Initializing => {
655 0 : *m = TenantsMap::ShuttingDown(BTreeMap::default());
656 0 : info!("tenants map is empty");
657 0 : return;
658 : }
659 185 : TenantsMap::Open(tenants) => {
660 185 : let mut shutdown_state = BTreeMap::new();
661 185 : let mut total_in_progress = 0;
662 185 : let mut total_attached = 0;
663 :
664 209 : for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
665 209 : match v {
666 196 : TenantSlot::Attached(t) => {
667 196 : shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
668 196 : join_set.spawn(
669 196 : async move {
670 196 : let freeze_and_flush = true;
671 :
672 196 : let res = {
673 196 : let (_guard, shutdown_progress) = completion::channel();
674 440 : t.shutdown(shutdown_progress, freeze_and_flush).await
675 : };
676 :
677 196 : if let Err(other_progress) = res {
678 : // join the another shutdown in progress
679 0 : other_progress.wait().await;
680 196 : }
681 :
682 : // we cannot afford per tenant logging here, because if s3 is degraded, we are
683 : // going to log too many lines
684 196 : debug!("tenant successfully stopped");
685 196 : }
686 196 : .instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())),
687 : );
688 :
689 196 : total_attached += 1;
690 : }
691 5 : TenantSlot::Secondary(state) => {
692 5 : // We don't need to wait for this individually per-tenant: the
693 5 : // downloader task will be waited on eventually, this cancel
694 5 : // is just to encourage it to drop out if it is doing work
695 5 : // for this tenant right now.
696 5 : state.cancel.cancel();
697 5 :
698 5 : shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state));
699 5 : }
700 8 : TenantSlot::InProgress(notify) => {
701 8 : // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
702 8 : // wait for their notifications to fire in this function.
703 8 : join_set.spawn(async move {
704 8 : notify.wait().await;
705 8 : });
706 8 :
707 8 : total_in_progress += 1;
708 8 : }
709 : }
710 : }
711 185 : *m = TenantsMap::ShuttingDown(shutdown_state);
712 185 : (total_in_progress, total_attached)
713 : }
714 : TenantsMap::ShuttingDown(_) => {
715 0 : error!("already shutting down, this function isn't supposed to be called more than once");
716 0 : return;
717 : }
718 : }
719 : };
720 :
721 185 : let started_at = std::time::Instant::now();
722 :
723 185 : info!(
724 185 : "Waiting for {} InProgress tenants and {} Attached tenants to shut down",
725 185 : total_in_progress, total_attached
726 185 : );
727 :
728 185 : let total = join_set.len();
729 185 : let mut panicked = 0;
730 185 : let mut buffering = true;
731 185 : const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
732 185 : let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
733 :
734 397 : while !join_set.is_empty() {
735 212 : tokio::select! {
736 204 : Some(joined) = join_set.join_next() => {
737 : match joined {
738 : Ok(()) => {},
739 : Err(join_error) if join_error.is_cancelled() => {
740 : unreachable!("we are not cancelling any of the tasks");
741 : }
742 : Err(join_error) if join_error.is_panic() => {
743 : // cannot really do anything, as this panic is likely a bug
744 : panicked += 1;
745 : }
746 : Err(join_error) => {
747 0 : warn!("unknown kind of JoinError: {join_error}");
748 : }
749 : }
750 : if !buffering {
751 : // buffer so that every 500ms since the first update (or starting) we'll log
752 : // how far away we are; this is because we will get SIGKILL'd at 10s, and we
753 : // are not able to log *then*.
754 : buffering = true;
755 : buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
756 : }
757 : },
758 : _ = &mut buffered, if buffering => {
759 : buffering = false;
760 8 : info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
761 : }
762 : }
763 : }
764 :
765 185 : if panicked > 0 {
766 0 : warn!(
767 0 : panicked,
768 0 : total, "observed panicks while shutting down tenants"
769 0 : );
770 185 : }
771 :
772 : // caller will log how long we took
773 185 : }
774 :
775 0 : #[derive(Debug, thiserror::Error)]
776 : pub(crate) enum SetNewTenantConfigError {
777 : #[error(transparent)]
778 : GetTenant(#[from] GetTenantError),
779 : #[error(transparent)]
780 : Persist(anyhow::Error),
781 : #[error(transparent)]
782 : Other(anyhow::Error),
783 : }
784 :
785 30 : pub(crate) async fn set_new_tenant_config(
786 30 : conf: &'static PageServerConf,
787 30 : new_tenant_conf: TenantConfOpt,
788 30 : tenant_id: TenantId,
789 30 : ) -> Result<(), SetNewTenantConfigError> {
790 30 : // Legacy API: does not support sharding
791 30 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
792 :
793 30 : info!("configuring tenant {tenant_id}");
794 30 : let tenant = get_tenant(tenant_shard_id, true)?;
795 :
796 30 : if tenant.tenant_shard_id().shard_count > ShardCount(0) {
797 : // Note that we use ShardParameters::default below.
798 0 : return Err(SetNewTenantConfigError::Other(anyhow::anyhow!(
799 0 : "This API may only be used on single-sharded tenants, use the /location_config API for sharded tenants"
800 0 : )));
801 30 : }
802 30 :
803 30 : // This is a legacy API that only operates on attached tenants: the preferred
804 30 : // API to use is the location_config/ endpoint, which lets the caller provide
805 30 : // the full LocationConf.
806 30 : let location_conf = LocationConf::attached_single(
807 30 : new_tenant_conf,
808 30 : tenant.generation,
809 30 : &ShardParameters::default(),
810 30 : );
811 30 :
812 30 : Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf)
813 60 : .await
814 30 : .map_err(SetNewTenantConfigError::Persist)?;
815 30 : tenant.set_new_tenant_config(new_tenant_conf);
816 30 : Ok(())
817 30 : }
818 :
819 18 : #[derive(thiserror::Error, Debug)]
820 : pub(crate) enum UpsertLocationError {
821 : #[error("Bad config request: {0}")]
822 : BadRequest(anyhow::Error),
823 :
824 : #[error("Cannot change config in this state: {0}")]
825 : Unavailable(#[from] TenantMapError),
826 :
827 : #[error("Tenant is already being modified")]
828 : InProgress,
829 :
830 : #[error("Failed to flush: {0}")]
831 : Flush(anyhow::Error),
832 :
833 : #[error("Internal error: {0}")]
834 : Other(#[from] anyhow::Error),
835 : }
836 :
837 : impl TenantManager {
838 : /// Convenience function so that anyone with a TenantManager can get at the global configuration, without
839 : /// having to pass it around everywhere as a separate object.
840 1275 : pub(crate) fn get_conf(&self) -> &'static PageServerConf {
841 1275 : self.conf
842 1275 : }
843 :
844 : /// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or is not fitting to the query.
845 : /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
846 973 : pub(crate) fn get_attached_tenant_shard(
847 973 : &self,
848 973 : tenant_shard_id: TenantShardId,
849 973 : active_only: bool,
850 973 : ) -> Result<Arc<Tenant>, GetTenantError> {
851 973 : let locked = self.tenants.read().unwrap();
852 :
853 973 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
854 :
855 971 : match peek_slot {
856 968 : Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
857 : TenantState::Broken {
858 0 : reason,
859 0 : backtrace: _,
860 0 : } if active_only => Err(GetTenantError::Broken(reason)),
861 862 : TenantState::Active => Ok(Arc::clone(tenant)),
862 : _ => {
863 106 : if active_only {
864 0 : Err(GetTenantError::NotActive(tenant_shard_id))
865 : } else {
866 106 : Ok(Arc::clone(tenant))
867 : }
868 : }
869 : },
870 3 : Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
871 : None | Some(TenantSlot::Secondary(_)) => {
872 2 : Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
873 : }
874 : }
875 973 : }
876 :
877 6 : pub(crate) fn get_secondary_tenant_shard(
878 6 : &self,
879 6 : tenant_shard_id: TenantShardId,
880 6 : ) -> Option<Arc<SecondaryTenant>> {
881 6 : let locked = self.tenants.read().unwrap();
882 6 :
883 6 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
884 6 : .ok()
885 6 : .flatten();
886 :
887 6 : match peek_slot {
888 6 : Some(TenantSlot::Secondary(s)) => Some(s.clone()),
889 0 : _ => None,
890 : }
891 6 : }
892 :
893 : /// Whether the `TenantManager` is responsible for the tenant shard
894 1 : pub(crate) fn manages_tenant_shard(&self, tenant_shard_id: TenantShardId) -> bool {
895 1 : let locked = self.tenants.read().unwrap();
896 1 :
897 1 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
898 1 : .ok()
899 1 : .flatten();
900 1 :
901 1 : peek_slot.is_some()
902 1 : }
903 :
904 1600 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
905 : pub(crate) async fn upsert_location(
906 : &self,
907 : tenant_shard_id: TenantShardId,
908 : new_location_config: LocationConf,
909 : flush: Option<Duration>,
910 : mut spawn_mode: SpawnMode,
911 : ctx: &RequestContext,
912 : ) -> Result<Option<Arc<Tenant>>, UpsertLocationError> {
913 : debug_assert_current_span_has_tenant_id();
914 800 : info!("configuring tenant location to state {new_location_config:?}");
915 :
916 : enum FastPathModified {
917 : Attached(Arc<Tenant>),
918 : Secondary(Arc<SecondaryTenant>),
919 : }
920 :
921 : // Special case fast-path for updates to existing slots: if our upsert is only updating configuration,
922 : // then we do not need to set the slot to InProgress, we can just call into the
923 : // existng tenant.
924 : let fast_path_taken = {
925 : let locked = self.tenants.read().unwrap();
926 : let peek_slot =
927 : tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
928 : match (&new_location_config.mode, peek_slot) {
929 : (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
930 : match attach_conf.generation.cmp(&tenant.generation) {
931 : Ordering::Equal => {
932 : // A transition from Attached to Attached in the same generation, we may
933 : // take our fast path and just provide the updated configuration
934 : // to the tenant.
935 : tenant.set_new_location_config(
936 : AttachedTenantConf::try_from(new_location_config.clone())
937 : .map_err(UpsertLocationError::BadRequest)?,
938 : );
939 :
940 : Some(FastPathModified::Attached(tenant.clone()))
941 : }
942 : Ordering::Less => {
943 : return Err(UpsertLocationError::BadRequest(anyhow::anyhow!(
944 : "Generation {:?} is less than existing {:?}",
945 : attach_conf.generation,
946 : tenant.generation
947 : )));
948 : }
949 : Ordering::Greater => {
950 : // Generation advanced, fall through to general case of replacing `Tenant` object
951 : None
952 : }
953 : }
954 : }
955 : (
956 : LocationMode::Secondary(secondary_conf),
957 : Some(TenantSlot::Secondary(secondary_tenant)),
958 : ) => {
959 : secondary_tenant.set_config(secondary_conf);
960 : secondary_tenant.set_tenant_conf(&new_location_config.tenant_conf);
961 : Some(FastPathModified::Secondary(secondary_tenant.clone()))
962 : }
963 : _ => {
964 : // Not an Attached->Attached transition, fall through to general case
965 : None
966 : }
967 : }
968 : };
969 :
970 : // Fast-path continued: having dropped out of the self.tenants lock, do the async
971 : // phase of writing config and/or waiting for flush, before returning.
972 : match fast_path_taken {
973 : Some(FastPathModified::Attached(tenant)) => {
974 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
975 : .await?;
976 :
977 : // Transition to AttachedStale means we may well hold a valid generation
978 : // still, and have been requested to go stale as part of a migration. If
979 : // the caller set `flush`, then flush to remote storage.
980 : if let LocationMode::Attached(AttachedLocationConfig {
981 : generation: _,
982 : attach_mode: AttachmentMode::Stale,
983 : }) = &new_location_config.mode
984 : {
985 : if let Some(flush_timeout) = flush {
986 : match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
987 : Ok(Err(e)) => {
988 : return Err(UpsertLocationError::Flush(e));
989 : }
990 : Ok(Ok(_)) => return Ok(Some(tenant)),
991 : Err(_) => {
992 0 : tracing::warn!(
993 0 : timeout_ms = flush_timeout.as_millis(),
994 0 : "Timed out waiting for flush to remote storage, proceeding anyway."
995 0 : )
996 : }
997 : }
998 : }
999 : }
1000 :
1001 : return Ok(Some(tenant));
1002 : }
1003 : Some(FastPathModified::Secondary(_secondary_tenant)) => {
1004 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
1005 : .await?;
1006 :
1007 : return Ok(None);
1008 : }
1009 : None => {
1010 : // Proceed with the general case procedure, where we will shutdown & remove any existing
1011 : // slot contents and replace with a fresh one
1012 : }
1013 : };
1014 :
1015 : // General case for upserts to TenantsMap, excluding the case above: we will substitute an
1016 : // InProgress value to the slot while we make whatever changes are required. The state for
1017 : // the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
1018 : // the state is ill-defined while we're in transition. Transitions are async, but fast: we do
1019 : // not do significant I/O, and shutdowns should be prompt via cancellation tokens.
1020 : let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)
1021 18 : .map_err(|e| match e {
1022 : TenantSlotError::AlreadyExists(_, _) | TenantSlotError::NotFound(_) => {
1023 0 : unreachable!("Called with mode Any")
1024 : }
1025 18 : TenantSlotError::InProgress => UpsertLocationError::InProgress,
1026 0 : TenantSlotError::MapState(s) => UpsertLocationError::Unavailable(s),
1027 18 : })?;
1028 :
1029 : match slot_guard.get_old_value() {
1030 : Some(TenantSlot::Attached(tenant)) => {
1031 : // The case where we keep a Tenant alive was covered above in the special case
1032 : // for Attached->Attached transitions in the same generation. By this point,
1033 : // if we see an attached tenant we know it will be discarded and should be
1034 : // shut down.
1035 : let (_guard, progress) = utils::completion::channel();
1036 :
1037 : match tenant.get_attach_mode() {
1038 : AttachmentMode::Single | AttachmentMode::Multi => {
1039 : // Before we leave our state as the presumed holder of the latest generation,
1040 : // flush any outstanding deletions to reduce the risk of leaking objects.
1041 : self.resources.deletion_queue_client.flush_advisory()
1042 : }
1043 : AttachmentMode::Stale => {
1044 : // If we're stale there's not point trying to flush deletions
1045 : }
1046 : };
1047 :
1048 72 : info!("Shutting down attached tenant");
1049 : match tenant.shutdown(progress, false).await {
1050 : Ok(()) => {}
1051 : Err(barrier) => {
1052 0 : info!("Shutdown already in progress, waiting for it to complete");
1053 : barrier.wait().await;
1054 : }
1055 : }
1056 : slot_guard.drop_old_value().expect("We just shut it down");
1057 :
1058 : // Edge case: if we were called with SpawnMode::Create, but a Tenant already existed, then
1059 : // the caller thinks they're creating but the tenant already existed. We must switch to
1060 : // Normal mode so that when starting this Tenant we properly probe remote storage for timelines,
1061 : // rather than assuming it to be empty.
1062 : spawn_mode = SpawnMode::Normal;
1063 : }
1064 : Some(TenantSlot::Secondary(state)) => {
1065 16 : info!("Shutting down secondary tenant");
1066 : state.shutdown().await;
1067 : }
1068 : Some(TenantSlot::InProgress(_)) => {
1069 : // This should never happen: acquire_slot should error out
1070 : // if the contents of a slot were InProgress.
1071 : return Err(UpsertLocationError::Other(anyhow::anyhow!(
1072 : "Acquired an InProgress slot, this is a bug."
1073 : )));
1074 : }
1075 : None => {
1076 : // Slot was vacant, nothing needs shutting down.
1077 : }
1078 : }
1079 :
1080 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1081 : let timelines_path = self.conf.timelines_path(&tenant_shard_id);
1082 :
1083 : // Directory structure is the same for attached and secondary modes:
1084 : // create it if it doesn't exist. Timeline load/creation expects the
1085 : // timelines/ subdir to already exist.
1086 : //
1087 : // Does not need to be fsync'd because local storage is just a cache.
1088 : tokio::fs::create_dir_all(&timelines_path)
1089 : .await
1090 0 : .with_context(|| format!("Creating {timelines_path}"))?;
1091 :
1092 : // Before activating either secondary or attached mode, persist the
1093 : // configuration, so that on restart we will re-attach (or re-start
1094 : // secondary) on the tenant.
1095 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config).await?;
1096 :
1097 : let new_slot = match &new_location_config.mode {
1098 : LocationMode::Secondary(secondary_config) => {
1099 : let shard_identity = new_location_config.shard;
1100 : TenantSlot::Secondary(SecondaryTenant::new(
1101 : tenant_shard_id,
1102 : shard_identity,
1103 : new_location_config.tenant_conf,
1104 : secondary_config,
1105 : ))
1106 : }
1107 : LocationMode::Attached(_attach_config) => {
1108 : let shard_identity = new_location_config.shard;
1109 :
1110 : // Testing hack: if we are configured with no control plane, then drop the generation
1111 : // from upserts. This enables creating generation-less tenants even though neon_local
1112 : // always uses generations when calling the location conf API.
1113 : let attached_conf = if cfg!(feature = "testing") {
1114 : let mut conf = AttachedTenantConf::try_from(new_location_config)?;
1115 : if self.conf.control_plane_api.is_none() {
1116 : conf.location.generation = Generation::none();
1117 : }
1118 : conf
1119 : } else {
1120 : AttachedTenantConf::try_from(new_location_config)?
1121 : };
1122 :
1123 : let tenant = tenant_spawn(
1124 : self.conf,
1125 : tenant_shard_id,
1126 : &tenant_path,
1127 : self.resources.clone(),
1128 : attached_conf,
1129 : shard_identity,
1130 : None,
1131 : self.tenants,
1132 : spawn_mode,
1133 : ctx,
1134 : )?;
1135 :
1136 : TenantSlot::Attached(tenant)
1137 : }
1138 : };
1139 :
1140 : let attached_tenant = if let TenantSlot::Attached(tenant) = &new_slot {
1141 : Some(tenant.clone())
1142 : } else {
1143 : None
1144 : };
1145 :
1146 : match slot_guard.upsert(new_slot) {
1147 : Err(TenantSlotUpsertError::InternalError(e)) => {
1148 : Err(UpsertLocationError::Other(anyhow::anyhow!(e)))
1149 : }
1150 : Err(TenantSlotUpsertError::MapState(e)) => Err(UpsertLocationError::Unavailable(e)),
1151 : Err(TenantSlotUpsertError::ShuttingDown((new_slot, _completion))) => {
1152 : // If we just called tenant_spawn() on a new tenant, and can't insert it into our map, then
1153 : // we must not leak it: this would violate the invariant that after shutdown_all_tenants, all tenants
1154 : // are shutdown.
1155 : //
1156 : // We must shut it down inline here.
1157 : match new_slot {
1158 : TenantSlot::InProgress(_) => {
1159 : // Unreachable because we never insert an InProgress
1160 : unreachable!()
1161 : }
1162 : TenantSlot::Attached(tenant) => {
1163 : let (_guard, progress) = utils::completion::channel();
1164 6 : info!("Shutting down just-spawned tenant, because tenant manager is shut down");
1165 : match tenant.shutdown(progress, false).await {
1166 : Ok(()) => {
1167 6 : info!("Finished shutting down just-spawned tenant");
1168 : }
1169 : Err(barrier) => {
1170 0 : info!("Shutdown already in progress, waiting for it to complete");
1171 : barrier.wait().await;
1172 : }
1173 : }
1174 : }
1175 : TenantSlot::Secondary(secondary_tenant) => {
1176 : secondary_tenant.shutdown().await;
1177 : }
1178 : }
1179 :
1180 : Err(UpsertLocationError::Unavailable(
1181 : TenantMapError::ShuttingDown,
1182 : ))
1183 : }
1184 : Ok(()) => Ok(attached_tenant),
1185 : }
1186 : }
1187 :
1188 : /// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
1189 : /// LocationConf that was last used to attach it. Optionally, the local file cache may be
1190 : /// dropped before re-attaching.
1191 : ///
1192 : /// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
1193 : /// where an issue is identified that would go away with a restart of the tenant.
1194 : ///
1195 : /// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
1196 : /// to respect the cancellation tokens used in normal shutdown().
1197 4 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
1198 : pub(crate) async fn reset_tenant(
1199 : &self,
1200 : tenant_shard_id: TenantShardId,
1201 : drop_cache: bool,
1202 : ctx: &RequestContext,
1203 : ) -> anyhow::Result<()> {
1204 : let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1205 : let Some(old_slot) = slot_guard.get_old_value() else {
1206 : anyhow::bail!("Tenant not found when trying to reset");
1207 : };
1208 :
1209 : let Some(tenant) = old_slot.get_attached() else {
1210 : slot_guard.revert();
1211 : anyhow::bail!("Tenant is not in attached state");
1212 : };
1213 :
1214 : let (_guard, progress) = utils::completion::channel();
1215 : match tenant.shutdown(progress, false).await {
1216 : Ok(()) => {
1217 : slot_guard.drop_old_value()?;
1218 : }
1219 : Err(_barrier) => {
1220 : slot_guard.revert();
1221 : anyhow::bail!("Cannot reset Tenant, already shutting down");
1222 : }
1223 : }
1224 :
1225 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1226 : let timelines_path = self.conf.timelines_path(&tenant_shard_id);
1227 : let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
1228 :
1229 : if drop_cache {
1230 1 : tracing::info!("Dropping local file cache");
1231 :
1232 : match tokio::fs::read_dir(&timelines_path).await {
1233 : Err(e) => {
1234 0 : tracing::warn!("Failed to list timelines while dropping cache: {}", e);
1235 : }
1236 : Ok(mut entries) => {
1237 : while let Some(entry) = entries.next_entry().await? {
1238 : tokio::fs::remove_dir_all(entry.path()).await?;
1239 : }
1240 : }
1241 : }
1242 : }
1243 :
1244 : let shard_identity = config.shard;
1245 : let tenant = tenant_spawn(
1246 : self.conf,
1247 : tenant_shard_id,
1248 : &tenant_path,
1249 : self.resources.clone(),
1250 : AttachedTenantConf::try_from(config)?,
1251 : shard_identity,
1252 : None,
1253 : self.tenants,
1254 : SpawnMode::Normal,
1255 : ctx,
1256 : )?;
1257 :
1258 : slot_guard.upsert(TenantSlot::Attached(tenant))?;
1259 :
1260 : Ok(())
1261 : }
1262 :
1263 1257 : pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec<Arc<Tenant>> {
1264 1257 : let locked = self.tenants.read().unwrap();
1265 1257 : match &*locked {
1266 0 : TenantsMap::Initializing => Vec::new(),
1267 1257 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => map
1268 1257 : .values()
1269 1257 : .filter_map(|slot| {
1270 1035 : slot.get_attached()
1271 1035 : .and_then(|t| if t.is_active() { Some(t.clone()) } else { None })
1272 1257 : })
1273 1257 : .collect(),
1274 : }
1275 1257 : }
1276 : // Do some synchronous work for all tenant slots in Secondary state. The provided
1277 : // callback should be small and fast, as it will be called inside the global
1278 : // TenantsMap lock.
1279 1273 : pub(crate) fn foreach_secondary_tenants<F>(&self, mut func: F)
1280 1273 : where
1281 1273 : // TODO: let the callback return a hint to drop out of the loop early
1282 1273 : F: FnMut(&TenantShardId, &Arc<SecondaryTenant>),
1283 1273 : {
1284 1273 : let locked = self.tenants.read().unwrap();
1285 :
1286 1273 : let map = match &*locked {
1287 0 : TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
1288 1273 : TenantsMap::Open(m) => m,
1289 : };
1290 :
1291 2346 : for (tenant_id, slot) in map {
1292 1073 : if let TenantSlot::Secondary(state) = slot {
1293 : // Only expose secondary tenants that are not currently shutting down
1294 12 : if !state.cancel.is_cancelled() {
1295 12 : func(tenant_id, state)
1296 0 : }
1297 1061 : }
1298 : }
1299 1273 : }
1300 :
1301 : /// Total list of all tenant slots: this includes attached, secondary, and InProgress.
1302 6 : pub(crate) fn list(&self) -> Vec<(TenantShardId, TenantSlot)> {
1303 6 : let locked = self.tenants.read().unwrap();
1304 6 : match &*locked {
1305 0 : TenantsMap::Initializing => Vec::new(),
1306 6 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
1307 6 : map.iter().map(|(k, v)| (*k, v.clone())).collect()
1308 : }
1309 : }
1310 6 : }
1311 :
1312 121 : pub(crate) async fn delete_tenant(
1313 121 : &self,
1314 121 : tenant_shard_id: TenantShardId,
1315 121 : activation_timeout: Duration,
1316 121 : ) -> Result<(), DeleteTenantError> {
1317 121 : super::span::debug_assert_current_span_has_tenant_id();
1318 : // We acquire a SlotGuard during this function to protect against concurrent
1319 : // changes while the ::prepare phase of DeleteTenantFlow executes, but then
1320 : // have to return the Tenant to the map while the background deletion runs.
1321 : //
1322 : // TODO: refactor deletion to happen outside the lifetime of a Tenant.
1323 : // Currently, deletion requires a reference to the tenants map in order to
1324 : // keep the Tenant in the map until deletion is complete, and then remove
1325 : // it at the end.
1326 : //
1327 : // See https://github.com/neondatabase/neon/issues/5080
1328 :
1329 105 : let slot_guard =
1330 121 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
1331 :
1332 : // unwrap is safe because we used MustExist mode when acquiring
1333 105 : let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
1334 105 : TenantSlot::Attached(tenant) => tenant.clone(),
1335 : _ => {
1336 : // Express "not attached" as equivalent to "not found"
1337 0 : return Err(DeleteTenantError::NotAttached);
1338 : }
1339 : };
1340 :
1341 105 : match tenant.current_state() {
1342 26 : TenantState::Broken { .. } | TenantState::Stopping { .. } => {
1343 26 : // If a tenant is broken or stopping, DeleteTenantFlow can
1344 26 : // handle it: broken tenants proceed to delete, stopping tenants
1345 26 : // are checked for deletion already in progress.
1346 26 : }
1347 : _ => {
1348 79 : tenant
1349 79 : .wait_to_become_active(activation_timeout)
1350 2 : .await
1351 79 : .map_err(|e| match e {
1352 : GetActiveTenantError::WillNotBecomeActive(_) => {
1353 0 : DeleteTenantError::InvalidState(tenant.current_state())
1354 : }
1355 0 : GetActiveTenantError::Cancelled => DeleteTenantError::Cancelled,
1356 0 : GetActiveTenantError::NotFound(_) => DeleteTenantError::NotAttached,
1357 : GetActiveTenantError::WaitForActiveTimeout {
1358 0 : latest_state: _latest_state,
1359 0 : wait_time: _wait_time,
1360 0 : } => DeleteTenantError::InvalidState(tenant.current_state()),
1361 79 : })?;
1362 : }
1363 : }
1364 :
1365 105 : let result = DeleteTenantFlow::run(
1366 105 : self.conf,
1367 105 : self.resources.remote_storage.clone(),
1368 105 : &TENANTS,
1369 105 : tenant,
1370 105 : )
1371 527 : .await;
1372 :
1373 : // The Tenant goes back into the map in Stopping state, it will eventually be removed by DeleteTenantFLow
1374 105 : slot_guard.revert();
1375 105 : result
1376 121 : }
1377 :
1378 10 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), new_shard_count=%new_shard_count.0))]
1379 : pub(crate) async fn shard_split(
1380 : &self,
1381 : tenant_shard_id: TenantShardId,
1382 : new_shard_count: ShardCount,
1383 : ctx: &RequestContext,
1384 : ) -> anyhow::Result<Vec<TenantShardId>> {
1385 : let tenant = get_tenant(tenant_shard_id, true)?;
1386 :
1387 : // Plan: identify what the new child shards will be
1388 : let effective_old_shard_count = std::cmp::max(tenant_shard_id.shard_count.0, 1);
1389 : if new_shard_count <= ShardCount(effective_old_shard_count) {
1390 : anyhow::bail!("Requested shard count is not an increase");
1391 : }
1392 : let expansion_factor = new_shard_count.0 / effective_old_shard_count;
1393 : if !expansion_factor.is_power_of_two() {
1394 : anyhow::bail!("Requested split is not a power of two");
1395 : }
1396 :
1397 : let parent_shard_identity = tenant.shard_identity;
1398 : let parent_tenant_conf = tenant.get_tenant_conf();
1399 : let parent_generation = tenant.generation;
1400 :
1401 : let child_shards = tenant_shard_id.split(new_shard_count);
1402 5 : tracing::info!(
1403 5 : "Shard {} splits into: {}",
1404 5 : tenant_shard_id.to_index(),
1405 5 : child_shards
1406 5 : .iter()
1407 10 : .map(|id| format!("{}", id.to_index()))
1408 5 : .join(",")
1409 5 : );
1410 :
1411 : // Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
1412 : if let Err(e) = tenant.split_prepare(&child_shards).await {
1413 : // If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
1414 : // have been left in a partially-shut-down state.
1415 0 : tracing::warn!("Failed to prepare for split: {e}, reloading Tenant before returning");
1416 : self.reset_tenant(tenant_shard_id, false, ctx).await?;
1417 : return Err(e);
1418 : }
1419 :
1420 : self.resources.deletion_queue_client.flush_advisory();
1421 :
1422 : // Phase 2: Put the parent shard to InProgress and grab a reference to the parent Tenant
1423 : drop(tenant);
1424 : let mut parent_slot_guard =
1425 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1426 : let parent = match parent_slot_guard.get_old_value() {
1427 : Some(TenantSlot::Attached(t)) => t,
1428 : Some(TenantSlot::Secondary(_)) => anyhow::bail!("Tenant location in secondary mode"),
1429 : Some(TenantSlot::InProgress(_)) => {
1430 : // tenant_map_acquire_slot never returns InProgress, if a slot was InProgress
1431 : // it would return an error.
1432 : unreachable!()
1433 : }
1434 : None => {
1435 : // We don't actually need the parent shard to still be attached to do our work, but it's
1436 : // a weird enough situation that the caller probably didn't want us to continue working
1437 : // if they had detached the tenant they requested the split on.
1438 : anyhow::bail!("Detached parent shard in the middle of split!")
1439 : }
1440 : };
1441 :
1442 : // TODO: hardlink layers from the parent into the child shard directories so that they don't immediately re-download
1443 : // TODO: erase the dentries from the parent
1444 :
1445 : // Take a snapshot of where the parent's WAL ingest had got to: we will wait for
1446 : // child shards to reach this point.
1447 : let mut target_lsns = HashMap::new();
1448 : for timeline in parent.timelines.lock().unwrap().clone().values() {
1449 : target_lsns.insert(timeline.timeline_id, timeline.get_last_record_lsn());
1450 : }
1451 :
1452 : // TODO: we should have the parent shard stop its WAL ingest here, it's a waste of resources
1453 : // and could slow down the children trying to catch up.
1454 :
1455 : // Phase 3: Spawn the child shards
1456 : for child_shard in &child_shards {
1457 : let mut child_shard_identity = parent_shard_identity;
1458 : child_shard_identity.count = child_shard.shard_count;
1459 : child_shard_identity.number = child_shard.shard_number;
1460 :
1461 : let child_location_conf = LocationConf {
1462 : mode: LocationMode::Attached(AttachedLocationConfig {
1463 : generation: parent_generation,
1464 : attach_mode: AttachmentMode::Single,
1465 : }),
1466 : shard: child_shard_identity,
1467 : tenant_conf: parent_tenant_conf,
1468 : };
1469 :
1470 : self.upsert_location(
1471 : *child_shard,
1472 : child_location_conf,
1473 : None,
1474 : SpawnMode::Normal,
1475 : ctx,
1476 : )
1477 : .await?;
1478 : }
1479 :
1480 : // Phase 4: wait for child chards WAL ingest to catch up to target LSN
1481 : for child_shard_id in &child_shards {
1482 : let child_shard = {
1483 : let locked = TENANTS.read().unwrap();
1484 : let peek_slot =
1485 : tenant_map_peek_slot(&locked, child_shard_id, TenantSlotPeekMode::Read)?;
1486 10 : peek_slot.and_then(|s| s.get_attached()).cloned()
1487 : };
1488 : if let Some(t) = child_shard {
1489 : let timelines = t.timelines.lock().unwrap().clone();
1490 : for timeline in timelines.values() {
1491 : let Some(target_lsn) = target_lsns.get(&timeline.timeline_id) else {
1492 : continue;
1493 : };
1494 :
1495 0 : tracing::info!(
1496 0 : "Waiting for child shard {}/{} to reach target lsn {}...",
1497 0 : child_shard_id,
1498 0 : timeline.timeline_id,
1499 0 : target_lsn
1500 0 : );
1501 : if let Err(e) = timeline.wait_lsn(*target_lsn, ctx).await {
1502 : // Failure here might mean shutdown, in any case this part is an optimization
1503 : // and we shouldn't hold up the split operation.
1504 0 : tracing::warn!(
1505 0 : "Failed to wait for timeline {} to reach lsn {target_lsn}: {e}",
1506 0 : timeline.timeline_id
1507 0 : );
1508 : } else {
1509 0 : tracing::info!(
1510 0 : "Child shard {}/{} reached target lsn {}",
1511 0 : child_shard_id,
1512 0 : timeline.timeline_id,
1513 0 : target_lsn
1514 0 : );
1515 : }
1516 : }
1517 : }
1518 : }
1519 :
1520 : // Phase 5: Shut down the parent shard.
1521 : let (_guard, progress) = completion::channel();
1522 : match parent.shutdown(progress, false).await {
1523 : Ok(()) => {}
1524 : Err(other) => {
1525 : other.wait().await;
1526 : }
1527 : }
1528 : parent_slot_guard.drop_old_value()?;
1529 :
1530 : // Phase 6: Release the InProgress on the parent shard
1531 : drop(parent_slot_guard);
1532 :
1533 : Ok(child_shards)
1534 : }
1535 : }
1536 :
1537 21 : #[derive(Debug, thiserror::Error)]
1538 : pub(crate) enum GetTenantError {
1539 : /// NotFound is a TenantId rather than TenantShardId, because this error type is used from
1540 : /// getters that use a TenantId and a ShardSelector, not just getters that target a specific shard.
1541 : #[error("Tenant {0} not found")]
1542 : NotFound(TenantId),
1543 :
1544 : #[error("Tenant {0} is not active")]
1545 : NotActive(TenantShardId),
1546 : /// Broken is logically a subset of NotActive, but a distinct error is useful as
1547 : /// NotActive is usually a retryable state for API purposes, whereas Broken
1548 : /// is a stuck error state
1549 : #[error("Tenant is broken: {0}")]
1550 : Broken(String),
1551 :
1552 : // Initializing or shutting down: cannot authoritatively say whether we have this tenant
1553 : #[error("Tenant map is not available: {0}")]
1554 : MapState(#[from] TenantMapError),
1555 : }
1556 :
1557 : /// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
1558 : /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
1559 : ///
1560 : /// This method is cancel-safe.
1561 6272 : pub(crate) fn get_tenant(
1562 6272 : tenant_shard_id: TenantShardId,
1563 6272 : active_only: bool,
1564 6272 : ) -> Result<Arc<Tenant>, GetTenantError> {
1565 6272 : let locked = TENANTS.read().unwrap();
1566 :
1567 6272 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
1568 :
1569 6204 : match peek_slot {
1570 6204 : Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
1571 : TenantState::Broken {
1572 1 : reason,
1573 1 : backtrace: _,
1574 1 : } if active_only => Err(GetTenantError::Broken(reason)),
1575 5837 : TenantState::Active => Ok(Arc::clone(tenant)),
1576 : _ => {
1577 366 : if active_only {
1578 12 : Err(GetTenantError::NotActive(tenant_shard_id))
1579 : } else {
1580 354 : Ok(Arc::clone(tenant))
1581 : }
1582 : }
1583 : },
1584 0 : Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
1585 : None | Some(TenantSlot::Secondary(_)) => {
1586 68 : Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
1587 : }
1588 : }
1589 6272 : }
1590 :
1591 21 : #[derive(thiserror::Error, Debug)]
1592 : pub(crate) enum GetActiveTenantError {
1593 : /// We may time out either while TenantSlot is InProgress, or while the Tenant
1594 : /// is in a non-Active state
1595 : #[error(
1596 : "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
1597 : )]
1598 : WaitForActiveTimeout {
1599 : latest_state: Option<TenantState>,
1600 : wait_time: Duration,
1601 : },
1602 :
1603 : /// The TenantSlot is absent, or in secondary mode
1604 : #[error(transparent)]
1605 : NotFound(#[from] GetTenantError),
1606 :
1607 : /// Cancellation token fired while we were waiting
1608 : #[error("cancelled")]
1609 : Cancelled,
1610 :
1611 : /// Tenant exists, but is in a state that cannot become active (e.g. Stopping, Broken)
1612 : #[error("will not become active. Current state: {0}")]
1613 : WillNotBecomeActive(TenantState),
1614 : }
1615 :
1616 : /// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`]
1617 : /// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`],
1618 : /// then wait for up to `timeout` (minus however long we waited for the slot).
1619 20472 : pub(crate) async fn get_active_tenant_with_timeout(
1620 20472 : tenant_id: TenantId,
1621 20472 : shard_selector: ShardSelector,
1622 20472 : timeout: Duration,
1623 20472 : cancel: &CancellationToken,
1624 20472 : ) -> Result<Arc<Tenant>, GetActiveTenantError> {
1625 20472 : enum WaitFor {
1626 20472 : Barrier(utils::completion::Barrier),
1627 20472 : Tenant(Arc<Tenant>),
1628 20472 : }
1629 20472 :
1630 20472 : let wait_start = Instant::now();
1631 20472 : let deadline = wait_start + timeout;
1632 :
1633 127 : let (wait_for, tenant_shard_id) = {
1634 20472 : let locked = TENANTS.read().unwrap();
1635 :
1636 : // Resolve TenantId to TenantShardId
1637 20472 : let tenant_shard_id = locked
1638 20472 : .resolve_attached_shard(&tenant_id, shard_selector)
1639 20472 : .ok_or(GetActiveTenantError::NotFound(GetTenantError::NotFound(
1640 20472 : tenant_id,
1641 20472 : )))?;
1642 :
1643 20451 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
1644 20451 : .map_err(GetTenantError::MapState)?;
1645 20451 : match peek_slot {
1646 20451 : Some(TenantSlot::Attached(tenant)) => {
1647 20451 : match tenant.current_state() {
1648 : TenantState::Active => {
1649 : // Fast path: we don't need to do any async waiting.
1650 20324 : return Ok(tenant.clone());
1651 : }
1652 : _ => {
1653 127 : tenant.activate_now();
1654 127 : (WaitFor::Tenant(tenant.clone()), tenant_shard_id)
1655 : }
1656 : }
1657 : }
1658 : Some(TenantSlot::Secondary(_)) => {
1659 0 : return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
1660 0 : tenant_shard_id,
1661 0 : )))
1662 : }
1663 0 : Some(TenantSlot::InProgress(barrier)) => {
1664 0 : (WaitFor::Barrier(barrier.clone()), tenant_shard_id)
1665 : }
1666 : None => {
1667 0 : return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
1668 0 : tenant_id,
1669 0 : )))
1670 : }
1671 : }
1672 : };
1673 :
1674 127 : let tenant = match wait_for {
1675 0 : WaitFor::Barrier(barrier) => {
1676 0 : tracing::debug!("Waiting for tenant InProgress state to pass...");
1677 0 : timeout_cancellable(
1678 0 : deadline.duration_since(Instant::now()),
1679 0 : cancel,
1680 0 : barrier.wait(),
1681 0 : )
1682 0 : .await
1683 0 : .map_err(|e| match e {
1684 0 : TimeoutCancellableError::Timeout => GetActiveTenantError::WaitForActiveTimeout {
1685 0 : latest_state: None,
1686 0 : wait_time: wait_start.elapsed(),
1687 0 : },
1688 0 : TimeoutCancellableError::Cancelled => GetActiveTenantError::Cancelled,
1689 0 : })?;
1690 : {
1691 0 : let locked = TENANTS.read().unwrap();
1692 0 : let peek_slot =
1693 0 : tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
1694 0 : .map_err(GetTenantError::MapState)?;
1695 0 : match peek_slot {
1696 0 : Some(TenantSlot::Attached(tenant)) => tenant.clone(),
1697 : _ => {
1698 0 : return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
1699 0 : tenant_shard_id,
1700 0 : )))
1701 : }
1702 : }
1703 : }
1704 : }
1705 127 : WaitFor::Tenant(tenant) => tenant,
1706 : };
1707 :
1708 0 : tracing::debug!("Waiting for tenant to enter active state...");
1709 127 : tenant
1710 127 : .wait_to_become_active(deadline.duration_since(Instant::now()))
1711 379 : .await?;
1712 127 : Ok(tenant)
1713 20472 : }
1714 :
1715 0 : #[derive(Debug, thiserror::Error)]
1716 : pub(crate) enum DeleteTimelineError {
1717 : #[error("Tenant {0}")]
1718 : Tenant(#[from] GetTenantError),
1719 :
1720 : #[error("Timeline {0}")]
1721 : Timeline(#[from] crate::tenant::DeleteTimelineError),
1722 : }
1723 :
1724 0 : #[derive(Debug, thiserror::Error)]
1725 : pub(crate) enum TenantStateError {
1726 : #[error("Tenant {0} is stopping")]
1727 : IsStopping(TenantShardId),
1728 : #[error(transparent)]
1729 : SlotError(#[from] TenantSlotError),
1730 : #[error(transparent)]
1731 : SlotUpsertError(#[from] TenantSlotUpsertError),
1732 : #[error(transparent)]
1733 : Other(#[from] anyhow::Error),
1734 : }
1735 :
1736 94 : pub(crate) async fn detach_tenant(
1737 94 : conf: &'static PageServerConf,
1738 94 : tenant_shard_id: TenantShardId,
1739 94 : detach_ignored: bool,
1740 94 : deletion_queue_client: &DeletionQueueClient,
1741 94 : ) -> Result<(), TenantStateError> {
1742 94 : let tmp_path = detach_tenant0(
1743 94 : conf,
1744 94 : &TENANTS,
1745 94 : tenant_shard_id,
1746 94 : detach_ignored,
1747 94 : deletion_queue_client,
1748 94 : )
1749 396 : .await?;
1750 : // Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
1751 : // After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
1752 81 : let task_tenant_id = None;
1753 81 : task_mgr::spawn(
1754 81 : task_mgr::BACKGROUND_RUNTIME.handle(),
1755 81 : TaskKind::MgmtRequest,
1756 81 : task_tenant_id,
1757 81 : None,
1758 81 : "tenant_files_delete",
1759 81 : false,
1760 81 : async move {
1761 81 : fs::remove_dir_all(tmp_path.as_path())
1762 81 : .await
1763 81 : .with_context(|| format!("tenant directory {:?} deletion", tmp_path))
1764 81 : },
1765 81 : );
1766 81 : Ok(())
1767 94 : }
1768 :
1769 94 : async fn detach_tenant0(
1770 94 : conf: &'static PageServerConf,
1771 94 : tenants: &std::sync::RwLock<TenantsMap>,
1772 94 : tenant_shard_id: TenantShardId,
1773 94 : detach_ignored: bool,
1774 94 : deletion_queue_client: &DeletionQueueClient,
1775 94 : ) -> Result<Utf8PathBuf, TenantStateError> {
1776 94 : let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
1777 81 : let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
1778 81 : safe_rename_tenant_dir(&local_tenant_directory)
1779 242 : .await
1780 81 : .with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))
1781 95 : };
1782 :
1783 94 : let removal_result = remove_tenant_from_memory(
1784 94 : tenants,
1785 94 : tenant_shard_id,
1786 94 : tenant_dir_rename_operation(tenant_shard_id),
1787 94 : )
1788 393 : .await;
1789 :
1790 : // Flush pending deletions, so that they have a good chance of passing validation
1791 : // before this tenant is potentially re-attached elsewhere.
1792 94 : deletion_queue_client.flush_advisory();
1793 94 :
1794 94 : // Ignored tenants are not present in memory and will bail the removal from memory operation.
1795 94 : // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
1796 94 : if detach_ignored
1797 : && matches!(
1798 11 : removal_result,
1799 : Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
1800 : )
1801 : {
1802 11 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
1803 11 : if tenant_ignore_mark.exists() {
1804 1 : info!("Detaching an ignored tenant");
1805 1 : let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
1806 3 : .await
1807 1 : .with_context(|| {
1808 0 : format!("Ignored tenant {tenant_shard_id} local directory rename")
1809 1 : })?;
1810 1 : return Ok(tmp_path);
1811 10 : }
1812 83 : }
1813 :
1814 93 : removal_result
1815 94 : }
1816 :
1817 4 : pub(crate) async fn load_tenant(
1818 4 : conf: &'static PageServerConf,
1819 4 : tenant_id: TenantId,
1820 4 : generation: Generation,
1821 4 : broker_client: storage_broker::BrokerClientChannel,
1822 4 : remote_storage: Option<GenericRemoteStorage>,
1823 4 : deletion_queue_client: DeletionQueueClient,
1824 4 : ctx: &RequestContext,
1825 4 : ) -> Result<(), TenantMapInsertError> {
1826 4 : // This is a legacy API (replaced by `/location_conf`). It does not support sharding
1827 4 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
1828 :
1829 3 : let slot_guard =
1830 4 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
1831 3 : let tenant_path = conf.tenant_path(&tenant_shard_id);
1832 3 :
1833 3 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
1834 3 : if tenant_ignore_mark.exists() {
1835 3 : std::fs::remove_file(&tenant_ignore_mark).with_context(|| {
1836 0 : format!(
1837 0 : "Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"
1838 0 : )
1839 3 : })?;
1840 0 : }
1841 :
1842 3 : let resources = TenantSharedResources {
1843 3 : broker_client,
1844 3 : remote_storage,
1845 3 : deletion_queue_client,
1846 3 : };
1847 :
1848 3 : let mut location_conf =
1849 3 : Tenant::load_tenant_config(conf, &tenant_shard_id).map_err(TenantMapInsertError::Other)?;
1850 3 : location_conf.attach_in_generation(generation);
1851 3 :
1852 6 : Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
1853 :
1854 3 : let shard_identity = location_conf.shard;
1855 3 : let new_tenant = tenant_spawn(
1856 3 : conf,
1857 3 : tenant_shard_id,
1858 3 : &tenant_path,
1859 3 : resources,
1860 3 : AttachedTenantConf::try_from(location_conf)?,
1861 3 : shard_identity,
1862 3 : None,
1863 3 : &TENANTS,
1864 3 : SpawnMode::Normal,
1865 3 : ctx,
1866 3 : )
1867 3 : .with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))?;
1868 :
1869 3 : slot_guard.upsert(TenantSlot::Attached(new_tenant))?;
1870 3 : Ok(())
1871 4 : }
1872 :
1873 5 : pub(crate) async fn ignore_tenant(
1874 5 : conf: &'static PageServerConf,
1875 5 : tenant_id: TenantId,
1876 5 : ) -> Result<(), TenantStateError> {
1877 17 : ignore_tenant0(conf, &TENANTS, tenant_id).await
1878 5 : }
1879 :
1880 10 : #[instrument(skip_all, fields(shard_id))]
1881 : async fn ignore_tenant0(
1882 : conf: &'static PageServerConf,
1883 : tenants: &std::sync::RwLock<TenantsMap>,
1884 : tenant_id: TenantId,
1885 : ) -> Result<(), TenantStateError> {
1886 : // This is a legacy API (replaced by `/location_conf`). It does not support sharding
1887 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
1888 : tracing::Span::current().record(
1889 : "shard_id",
1890 : tracing::field::display(tenant_shard_id.shard_slug()),
1891 : );
1892 :
1893 5 : remove_tenant_from_memory(tenants, tenant_shard_id, async {
1894 5 : let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
1895 5 : fs::File::create(&ignore_mark_file)
1896 5 : .await
1897 5 : .context("Failed to create ignore mark file")
1898 5 : .and_then(|_| {
1899 5 : crashsafe::fsync_file_and_parent(&ignore_mark_file)
1900 5 : .context("Failed to fsync ignore mark file")
1901 5 : })
1902 5 : .with_context(|| format!("Failed to crate ignore mark for tenant {tenant_shard_id}"))?;
1903 5 : Ok(())
1904 5 : })
1905 : .await
1906 : }
1907 :
1908 0 : #[derive(Debug, thiserror::Error)]
1909 : pub(crate) enum TenantMapListError {
1910 : #[error("tenant map is still initiailizing")]
1911 : Initializing,
1912 : }
1913 :
1914 : ///
1915 : /// Get list of tenants, for the mgmt API
1916 : ///
1917 249 : pub(crate) async fn list_tenants(
1918 249 : ) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
1919 249 : let tenants = TENANTS.read().unwrap();
1920 249 : let m = match &*tenants {
1921 0 : TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
1922 249 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
1923 249 : };
1924 249 : Ok(m.iter()
1925 263 : .filter_map(|(id, tenant)| match tenant {
1926 261 : TenantSlot::Attached(tenant) => {
1927 261 : Some((*id, tenant.current_state(), tenant.generation()))
1928 : }
1929 2 : TenantSlot::Secondary(_) => None,
1930 0 : TenantSlot::InProgress(_) => None,
1931 263 : })
1932 249 : .collect())
1933 249 : }
1934 :
1935 0 : #[derive(Debug, thiserror::Error)]
1936 : pub(crate) enum TenantMapInsertError {
1937 : #[error(transparent)]
1938 : SlotError(#[from] TenantSlotError),
1939 : #[error(transparent)]
1940 : SlotUpsertError(#[from] TenantSlotUpsertError),
1941 : #[error(transparent)]
1942 : Other(#[from] anyhow::Error),
1943 : }
1944 :
1945 : /// Superset of TenantMapError: issues that can occur when acquiring a slot
1946 : /// for a particular tenant ID.
1947 1 : #[derive(Debug, thiserror::Error)]
1948 : pub(crate) enum TenantSlotError {
1949 : /// When acquiring a slot with the expectation that the tenant already exists.
1950 : #[error("Tenant {0} not found")]
1951 : NotFound(TenantShardId),
1952 :
1953 : /// When acquiring a slot with the expectation that the tenant does not already exist.
1954 : #[error("tenant {0} already exists, state: {1:?}")]
1955 : AlreadyExists(TenantShardId, TenantState),
1956 :
1957 : // Tried to read a slot that is currently being mutated by another administrative
1958 : // operation.
1959 : #[error("tenant has a state change in progress, try again later")]
1960 : InProgress,
1961 :
1962 : #[error(transparent)]
1963 : MapState(#[from] TenantMapError),
1964 : }
1965 :
1966 : /// Superset of TenantMapError: issues that can occur when using a SlotGuard
1967 : /// to insert a new value.
1968 0 : #[derive(thiserror::Error)]
1969 : pub(crate) enum TenantSlotUpsertError {
1970 : /// An error where the slot is in an unexpected state, indicating a code bug
1971 : #[error("Internal error updating Tenant")]
1972 : InternalError(Cow<'static, str>),
1973 :
1974 : #[error(transparent)]
1975 : MapState(TenantMapError),
1976 :
1977 : // If we encounter TenantManager shutdown during upsert, we must carry the Completion
1978 : // from the SlotGuard, so that the caller can hold it while they clean up: otherwise
1979 : // TenantManager shutdown might race ahead before we're done cleaning up any Tenant that
1980 : // was protected by the SlotGuard.
1981 : #[error("Shutting down")]
1982 : ShuttingDown((TenantSlot, utils::completion::Completion)),
1983 : }
1984 :
1985 : impl std::fmt::Debug for TenantSlotUpsertError {
1986 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
1987 0 : match self {
1988 0 : Self::InternalError(reason) => write!(f, "Internal Error {reason}"),
1989 0 : Self::MapState(map_error) => write!(f, "Tenant map state: {map_error:?}"),
1990 0 : Self::ShuttingDown(_completion) => write!(f, "Tenant map shutting down"),
1991 : }
1992 0 : }
1993 : }
1994 :
1995 0 : #[derive(Debug, thiserror::Error)]
1996 : enum TenantSlotDropError {
1997 : /// It is only legal to drop a TenantSlot if its contents are fully shut down
1998 : #[error("Tenant was not shut down")]
1999 : NotShutdown,
2000 : }
2001 :
2002 : /// Errors that can happen any time we are walking the tenant map to try and acquire
2003 : /// the TenantSlot for a particular tenant.
2004 0 : #[derive(Debug, thiserror::Error)]
2005 : pub enum TenantMapError {
2006 : // Tried to read while initializing
2007 : #[error("tenant map is still initializing")]
2008 : StillInitializing,
2009 :
2010 : // Tried to read while shutting down
2011 : #[error("tenant map is shutting down")]
2012 : ShuttingDown,
2013 : }
2014 :
2015 : /// Guards a particular tenant_id's content in the TenantsMap. While this
2016 : /// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
2017 : /// for this tenant, which acts as a marker for any operations targeting
2018 : /// this tenant to retry later, or wait for the InProgress state to end.
2019 : ///
2020 : /// This structure enforces the important invariant that we do not have overlapping
2021 : /// tasks that will try use local storage for a the same tenant ID: we enforce that
2022 : /// the previous contents of a slot have been shut down before the slot can be
2023 : /// left empty or used for something else
2024 : ///
2025 : /// Holders of a SlotGuard should explicitly dispose of it, using either `upsert`
2026 : /// to provide a new value, or `revert` to put the slot back into its initial
2027 : /// state. If the SlotGuard is dropped without calling either of these, then
2028 : /// we will leave the slot empty if our `old_value` is already shut down, else
2029 : /// we will replace the slot with `old_value` (equivalent to doing a revert).
2030 : ///
2031 : /// The `old_value` may be dropped before the SlotGuard is dropped, by calling
2032 : /// `drop_old_value`. It is an error to call this without shutting down
2033 : /// the conents of `old_value`.
2034 : pub struct SlotGuard {
2035 : tenant_shard_id: TenantShardId,
2036 : old_value: Option<TenantSlot>,
2037 : upserted: bool,
2038 :
2039 : /// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will
2040 : /// release any waiters as soon as this SlotGuard is dropped.
2041 : completion: utils::completion::Completion,
2042 : }
2043 :
2044 : impl SlotGuard {
2045 892 : fn new(
2046 892 : tenant_shard_id: TenantShardId,
2047 892 : old_value: Option<TenantSlot>,
2048 892 : completion: utils::completion::Completion,
2049 892 : ) -> Self {
2050 892 : Self {
2051 892 : tenant_shard_id,
2052 892 : old_value,
2053 892 : upserted: false,
2054 892 : completion,
2055 892 : }
2056 892 : }
2057 :
2058 : /// Get any value that was present in the slot before we acquired ownership
2059 : /// of it: in state transitions, this will be the old state.
2060 889 : fn get_old_value(&self) -> &Option<TenantSlot> {
2061 889 : &self.old_value
2062 889 : }
2063 :
2064 : /// Emplace a new value in the slot. This consumes the guard, and after
2065 : /// returning, the slot is no longer protected from concurrent changes.
2066 799 : fn upsert(mut self, new_value: TenantSlot) -> Result<(), TenantSlotUpsertError> {
2067 799 : if !self.old_value_is_shutdown() {
2068 : // This is a bug: callers should never try to drop an old value without
2069 : // shutting it down
2070 0 : return Err(TenantSlotUpsertError::InternalError(
2071 0 : "Old TenantSlot value not shut down".into(),
2072 0 : ));
2073 799 : }
2074 :
2075 793 : let replaced = {
2076 799 : let mut locked = TENANTS.write().unwrap();
2077 799 :
2078 799 : if let TenantSlot::InProgress(_) = new_value {
2079 : // It is never expected to try and upsert InProgress via this path: it should
2080 : // only be written via the tenant_map_acquire_slot path. If we hit this it's a bug.
2081 0 : return Err(TenantSlotUpsertError::InternalError(
2082 0 : "Attempt to upsert an InProgress state".into(),
2083 0 : ));
2084 799 : }
2085 :
2086 799 : let m = match &mut *locked {
2087 : TenantsMap::Initializing => {
2088 0 : return Err(TenantSlotUpsertError::MapState(
2089 0 : TenantMapError::StillInitializing,
2090 0 : ))
2091 : }
2092 : TenantsMap::ShuttingDown(_) => {
2093 6 : return Err(TenantSlotUpsertError::ShuttingDown((
2094 6 : new_value,
2095 6 : self.completion.clone(),
2096 6 : )));
2097 : }
2098 793 : TenantsMap::Open(m) => m,
2099 793 : };
2100 793 :
2101 793 : let replaced = m.insert(self.tenant_shard_id, new_value);
2102 793 : self.upserted = true;
2103 793 :
2104 793 : METRICS.tenant_slots.set(m.len() as u64);
2105 793 :
2106 793 : replaced
2107 : };
2108 :
2109 : // Sanity check: on an upsert we should always be replacing an InProgress marker
2110 793 : match replaced {
2111 : Some(TenantSlot::InProgress(_)) => {
2112 : // Expected case: we find our InProgress in the map: nothing should have
2113 : // replaced it because the code that acquires slots will not grant another
2114 : // one for the same TenantId.
2115 793 : Ok(())
2116 : }
2117 : None => {
2118 0 : METRICS.unexpected_errors.inc();
2119 0 : error!(
2120 0 : tenant_shard_id = %self.tenant_shard_id,
2121 0 : "Missing InProgress marker during tenant upsert, this is a bug."
2122 0 : );
2123 0 : Err(TenantSlotUpsertError::InternalError(
2124 0 : "Missing InProgress marker during tenant upsert".into(),
2125 0 : ))
2126 : }
2127 0 : Some(slot) => {
2128 0 : METRICS.unexpected_errors.inc();
2129 0 : error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot);
2130 0 : Err(TenantSlotUpsertError::InternalError(
2131 0 : "Unexpected contents of TenantSlot".into(),
2132 0 : ))
2133 : }
2134 : }
2135 799 : }
2136 :
2137 : /// Replace the InProgress slot with whatever was in the guard when we started
2138 105 : fn revert(mut self) {
2139 105 : if let Some(value) = self.old_value.take() {
2140 105 : match self.upsert(value) {
2141 0 : Err(TenantSlotUpsertError::InternalError(_)) => {
2142 0 : // We already logged the error, nothing else we can do.
2143 0 : }
2144 : Err(
2145 : TenantSlotUpsertError::MapState(_) | TenantSlotUpsertError::ShuttingDown(_),
2146 0 : ) => {
2147 0 : // If the map is shutting down, we need not replace anything
2148 0 : }
2149 105 : Ok(()) => {}
2150 : }
2151 0 : }
2152 105 : }
2153 :
2154 : /// We may never drop our old value until it is cleanly shut down: otherwise we might leave
2155 : /// rogue background tasks that would write to the local tenant directory that this guard
2156 : /// is responsible for protecting
2157 1056 : fn old_value_is_shutdown(&self) -> bool {
2158 1056 : match self.old_value.as_ref() {
2159 157 : Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(),
2160 25 : Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(),
2161 : Some(TenantSlot::InProgress(_)) => {
2162 : // A SlotGuard cannot be constructed for a slot that was already InProgress
2163 0 : unreachable!()
2164 : }
2165 874 : None => true,
2166 : }
2167 1056 : }
2168 :
2169 : /// The guard holder is done with the old value of the slot: they are obliged to already
2170 : /// shut it down before we reach this point.
2171 166 : fn drop_old_value(&mut self) -> Result<(), TenantSlotDropError> {
2172 166 : if !self.old_value_is_shutdown() {
2173 0 : Err(TenantSlotDropError::NotShutdown)
2174 : } else {
2175 166 : self.old_value.take();
2176 166 : Ok(())
2177 : }
2178 166 : }
2179 : }
2180 :
2181 : impl Drop for SlotGuard {
2182 892 : fn drop(&mut self) {
2183 892 : if self.upserted {
2184 793 : return;
2185 99 : }
2186 99 : // Our old value is already shutdown, or it never existed: it is safe
2187 99 : // for us to fully release the TenantSlot back into an empty state
2188 99 :
2189 99 : let mut locked = TENANTS.write().unwrap();
2190 :
2191 99 : let m = match &mut *locked {
2192 : TenantsMap::Initializing => {
2193 : // There is no map, this should never happen.
2194 2 : return;
2195 : }
2196 : TenantsMap::ShuttingDown(_) => {
2197 : // When we transition to shutdown, InProgress elements are removed
2198 : // from the map, so we do not need to clean up our Inprogress marker.
2199 : // See [`shutdown_all_tenants0`]
2200 6 : return;
2201 : }
2202 91 : TenantsMap::Open(m) => m,
2203 91 : };
2204 91 :
2205 91 : use std::collections::btree_map::Entry;
2206 91 : match m.entry(self.tenant_shard_id) {
2207 91 : Entry::Occupied(mut entry) => {
2208 91 : if !matches!(entry.get(), TenantSlot::InProgress(_)) {
2209 0 : METRICS.unexpected_errors.inc();
2210 0 : error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get());
2211 91 : }
2212 :
2213 91 : if self.old_value_is_shutdown() {
2214 91 : entry.remove();
2215 91 : } else {
2216 0 : entry.insert(self.old_value.take().unwrap());
2217 0 : }
2218 : }
2219 : Entry::Vacant(_) => {
2220 0 : METRICS.unexpected_errors.inc();
2221 0 : error!(
2222 0 : tenant_shard_id = %self.tenant_shard_id,
2223 0 : "Missing InProgress marker during SlotGuard drop, this is a bug."
2224 0 : );
2225 : }
2226 : }
2227 :
2228 91 : METRICS.tenant_slots.set(m.len() as u64);
2229 892 : }
2230 : }
2231 :
2232 : enum TenantSlotPeekMode {
2233 : /// In Read mode, peek will be permitted to see the slots even if the pageserver is shutting down
2234 : Read,
2235 : /// In Write mode, trying to peek at a slot while the pageserver is shutting down is an error
2236 : Write,
2237 : }
2238 :
2239 28513 : fn tenant_map_peek_slot<'a>(
2240 28513 : tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
2241 28513 : tenant_shard_id: &TenantShardId,
2242 28513 : mode: TenantSlotPeekMode,
2243 28513 : ) -> Result<Option<&'a TenantSlot>, TenantMapError> {
2244 28513 : match tenants.deref() {
2245 0 : TenantsMap::Initializing => Err(TenantMapError::StillInitializing),
2246 12 : TenantsMap::ShuttingDown(m) => match mode {
2247 : TenantSlotPeekMode::Read => Ok(Some(
2248 : // When reading in ShuttingDown state, we must translate None results
2249 : // into a ShuttingDown error, because absence of a tenant shard ID in the map
2250 : // isn't a reliable indicator of the tenant being gone: it might have been
2251 : // InProgress when shutdown started, and cleaned up from that state such
2252 : // that it's now no longer in the map. Callers will have to wait until
2253 : // we next start up to get a proper answer. This avoids incorrect 404 API responses.
2254 0 : m.get(tenant_shard_id).ok_or(TenantMapError::ShuttingDown)?,
2255 : )),
2256 12 : TenantSlotPeekMode::Write => Err(TenantMapError::ShuttingDown),
2257 : },
2258 28501 : TenantsMap::Open(m) => Ok(m.get(tenant_shard_id)),
2259 : }
2260 28513 : }
2261 :
2262 : enum TenantSlotAcquireMode {
2263 : /// Acquire the slot irrespective of current state, or whether it already exists
2264 : Any,
2265 : /// Return an error if trying to acquire a slot and it doesn't already exist
2266 : MustExist,
2267 : /// Return an error if trying to acquire a slot and it already exists
2268 : MustNotExist,
2269 : }
2270 :
2271 840 : fn tenant_map_acquire_slot(
2272 840 : tenant_shard_id: &TenantShardId,
2273 840 : mode: TenantSlotAcquireMode,
2274 840 : ) -> Result<SlotGuard, TenantSlotError> {
2275 840 : tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode)
2276 840 : }
2277 :
2278 941 : fn tenant_map_acquire_slot_impl(
2279 941 : tenant_shard_id: &TenantShardId,
2280 941 : tenants: &std::sync::RwLock<TenantsMap>,
2281 941 : mode: TenantSlotAcquireMode,
2282 941 : ) -> Result<SlotGuard, TenantSlotError> {
2283 941 : use TenantSlotAcquireMode::*;
2284 941 : METRICS.tenant_slot_writes.inc();
2285 941 :
2286 941 : let mut locked = tenants.write().unwrap();
2287 941 : let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug());
2288 941 : let _guard = span.enter();
2289 :
2290 941 : let m = match &mut *locked {
2291 0 : TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
2292 0 : TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
2293 941 : TenantsMap::Open(m) => m,
2294 941 : };
2295 941 :
2296 941 : use std::collections::btree_map::Entry;
2297 941 :
2298 941 : let entry = m.entry(*tenant_shard_id);
2299 941 :
2300 941 : match entry {
2301 633 : Entry::Vacant(v) => match mode {
2302 : MustExist => {
2303 28 : tracing::debug!("Vacant && MustExist: return NotFound");
2304 28 : Err(TenantSlotError::NotFound(*tenant_shard_id))
2305 : }
2306 : _ => {
2307 605 : let (completion, barrier) = utils::completion::channel();
2308 605 : v.insert(TenantSlot::InProgress(barrier));
2309 605 : tracing::debug!("Vacant, inserted InProgress");
2310 605 : Ok(SlotGuard::new(*tenant_shard_id, None, completion))
2311 : }
2312 : },
2313 308 : Entry::Occupied(mut o) => {
2314 308 : // Apply mode-driven checks
2315 308 : match (o.get(), mode) {
2316 : (TenantSlot::InProgress(_), _) => {
2317 20 : tracing::debug!("Occupied, failing for InProgress");
2318 20 : Err(TenantSlotError::InProgress)
2319 : }
2320 1 : (slot, MustNotExist) => match slot {
2321 1 : TenantSlot::Attached(tenant) => {
2322 1 : tracing::debug!("Attached && MustNotExist, return AlreadyExists");
2323 1 : Err(TenantSlotError::AlreadyExists(
2324 1 : *tenant_shard_id,
2325 1 : tenant.current_state(),
2326 1 : ))
2327 : }
2328 : _ => {
2329 : // FIXME: the AlreadyExists error assumes that we have a Tenant
2330 : // to get the state from
2331 0 : tracing::debug!("Occupied & MustNotExist, return AlreadyExists");
2332 0 : Err(TenantSlotError::AlreadyExists(
2333 0 : *tenant_shard_id,
2334 0 : TenantState::Broken {
2335 0 : reason: "Present but not attached".to_string(),
2336 0 : backtrace: "".to_string(),
2337 0 : },
2338 0 : ))
2339 : }
2340 : },
2341 : _ => {
2342 : // Happy case: the slot was not in any state that violated our mode
2343 287 : let (completion, barrier) = utils::completion::channel();
2344 287 : let old_value = o.insert(TenantSlot::InProgress(barrier));
2345 287 : tracing::debug!("Occupied, replaced with InProgress");
2346 287 : Ok(SlotGuard::new(
2347 287 : *tenant_shard_id,
2348 287 : Some(old_value),
2349 287 : completion,
2350 287 : ))
2351 : }
2352 : }
2353 : }
2354 : }
2355 941 : }
2356 :
2357 : /// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
2358 : /// Allows to remove other tenant resources manually, via `tenant_cleanup`.
2359 : /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
2360 : /// operation would be needed to remove it.
2361 101 : async fn remove_tenant_from_memory<V, F>(
2362 101 : tenants: &std::sync::RwLock<TenantsMap>,
2363 101 : tenant_shard_id: TenantShardId,
2364 101 : tenant_cleanup: F,
2365 101 : ) -> Result<V, TenantStateError>
2366 101 : where
2367 101 : F: std::future::Future<Output = anyhow::Result<V>>,
2368 101 : {
2369 87 : let mut slot_guard =
2370 101 : tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
2371 :
2372 : // allow pageserver shutdown to await for our completion
2373 87 : let (_guard, progress) = completion::channel();
2374 :
2375 : // The SlotGuard allows us to manipulate the Tenant object without fear of some
2376 : // concurrent API request doing something else for the same tenant ID.
2377 87 : let attached_tenant = match slot_guard.get_old_value() {
2378 78 : Some(TenantSlot::Attached(tenant)) => {
2379 78 : // whenever we remove a tenant from memory, we don't want to flush and wait for upload
2380 78 : let freeze_and_flush = false;
2381 78 :
2382 78 : // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
2383 78 : // that we can continue safely to cleanup.
2384 166 : match tenant.shutdown(progress, freeze_and_flush).await {
2385 78 : Ok(()) => {}
2386 0 : Err(_other) => {
2387 0 : // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
2388 0 : // wait for it but return an error right away because these are distinct requests.
2389 0 : slot_guard.revert();
2390 0 : return Err(TenantStateError::IsStopping(tenant_shard_id));
2391 : }
2392 : }
2393 78 : Some(tenant)
2394 : }
2395 9 : Some(TenantSlot::Secondary(secondary_state)) => {
2396 9 : tracing::info!("Shutting down in secondary mode");
2397 9 : secondary_state.shutdown().await;
2398 9 : None
2399 : }
2400 : Some(TenantSlot::InProgress(_)) => {
2401 : // Acquiring a slot guarantees its old value was not InProgress
2402 0 : unreachable!();
2403 : }
2404 0 : None => None,
2405 : };
2406 :
2407 87 : match tenant_cleanup
2408 246 : .await
2409 87 : .with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
2410 : {
2411 87 : Ok(hook_value) => {
2412 87 : // Success: drop the old TenantSlot::Attached.
2413 87 : slot_guard
2414 87 : .drop_old_value()
2415 87 : .expect("We just called shutdown");
2416 87 :
2417 87 : Ok(hook_value)
2418 : }
2419 0 : Err(e) => {
2420 : // If we had a Tenant, set it to Broken and put it back in the TenantsMap
2421 0 : if let Some(attached_tenant) = attached_tenant {
2422 0 : attached_tenant.set_broken(e.to_string()).await;
2423 0 : }
2424 : // Leave the broken tenant in the map
2425 0 : slot_guard.revert();
2426 0 :
2427 0 : Err(TenantStateError::Other(e))
2428 : }
2429 : }
2430 101 : }
2431 :
2432 : use {
2433 : crate::repository::GcResult, pageserver_api::models::TimelineGcRequest,
2434 : utils::http::error::ApiError,
2435 : };
2436 :
2437 324 : pub(crate) async fn immediate_gc(
2438 324 : tenant_shard_id: TenantShardId,
2439 324 : timeline_id: TimelineId,
2440 324 : gc_req: TimelineGcRequest,
2441 324 : cancel: CancellationToken,
2442 324 : ctx: &RequestContext,
2443 324 : ) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
2444 324 : let guard = TENANTS.read().unwrap();
2445 :
2446 324 : let tenant = guard
2447 324 : .get(&tenant_shard_id)
2448 324 : .map(Arc::clone)
2449 324 : .with_context(|| format!("tenant {tenant_shard_id}"))
2450 324 : .map_err(|e| ApiError::NotFound(e.into()))?;
2451 :
2452 323 : let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
2453 323 : // Use tenant's pitr setting
2454 323 : let pitr = tenant.get_pitr_interval();
2455 323 :
2456 323 : // Run in task_mgr to avoid race with tenant_detach operation
2457 323 : let ctx = ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
2458 323 : let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
2459 323 : // TODO: spawning is redundant now, need to hold the gate
2460 323 : task_mgr::spawn(
2461 323 : &tokio::runtime::Handle::current(),
2462 323 : TaskKind::GarbageCollector,
2463 323 : Some(tenant_shard_id),
2464 323 : Some(timeline_id),
2465 323 : &format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"),
2466 323 : false,
2467 323 : async move {
2468 0 : fail::fail_point!("immediate_gc_task_pre");
2469 :
2470 : #[allow(unused_mut)]
2471 323 : let mut result = tenant
2472 323 : .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
2473 323 : .instrument(info_span!("manual_gc", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))
2474 326 : .await;
2475 : // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
2476 : // better once the types support it.
2477 :
2478 : #[cfg(feature = "testing")]
2479 : {
2480 323 : if let Ok(result) = result.as_mut() {
2481 : // why not futures unordered? it seems it needs very much the same task structure
2482 : // but would only run on single task.
2483 322 : let mut js = tokio::task::JoinSet::new();
2484 1080 : for layer in std::mem::take(&mut result.doomed_layers) {
2485 1080 : js.spawn(layer.wait_drop());
2486 1080 : }
2487 322 : tracing::info!(total = js.len(), "starting to wait for the gc'd layers to be dropped");
2488 1402 : while let Some(res) = js.join_next().await {
2489 1080 : res.expect("wait_drop should not panic");
2490 1080 : }
2491 1 : }
2492 :
2493 323 : let timeline = tenant.get_timeline(timeline_id, false).ok();
2494 323 : let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref());
2495 :
2496 323 : if let Some(rtc) = rtc {
2497 : // layer drops schedule actions on remote timeline client to actually do the
2498 : // deletions; don't care just exit fast about the shutdown error
2499 322 : drop(rtc.wait_completion().await);
2500 1 : }
2501 : }
2502 :
2503 323 : match task_done.send(result) {
2504 323 : Ok(_) => (),
2505 0 : Err(result) => error!("failed to send gc result: {result:?}"),
2506 : }
2507 323 : Ok(())
2508 323 : }
2509 323 : );
2510 323 :
2511 323 : // drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
2512 323 : drop(guard);
2513 323 :
2514 323 : Ok(wait_task_done)
2515 324 : }
2516 :
2517 : #[cfg(test)]
2518 : mod tests {
2519 : use std::collections::BTreeMap;
2520 : use std::sync::Arc;
2521 : use tracing::Instrument;
2522 :
2523 : use crate::tenant::mgr::TenantSlot;
2524 :
2525 : use super::{super::harness::TenantHarness, TenantsMap};
2526 :
2527 2 : #[tokio::test(start_paused = true)]
2528 2 : async fn shutdown_awaits_in_progress_tenant() {
2529 2 : // Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully
2530 2 : // wait for it to complete before proceeding.
2531 2 :
2532 2 : let h = TenantHarness::create("shutdown_awaits_in_progress_tenant").unwrap();
2533 2 : let (t, _ctx) = h.load().await;
2534 2 :
2535 2 : // harness loads it to active, which is forced and nothing is running on the tenant
2536 2 :
2537 2 : let id = t.tenant_shard_id();
2538 2 :
2539 2 : // tenant harness configures the logging and we cannot escape it
2540 2 : let span = h.span();
2541 2 : let _e = span.enter();
2542 2 :
2543 2 : let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
2544 2 : let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
2545 2 :
2546 2 : // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
2547 2 : // permit it to proceed: that will stick the tenant in InProgress
2548 2 :
2549 2 : let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
2550 2 : let (until_cleanup_started, cleanup_started) = utils::completion::channel();
2551 2 : let mut remove_tenant_from_memory_task = {
2552 2 : let jh = tokio::spawn({
2553 2 : let tenants = tenants.clone();
2554 2 : async move {
2555 2 : let cleanup = async move {
2556 2 : drop(until_cleanup_started);
2557 2 : can_complete_cleanup.wait().await;
2558 2 : anyhow::Ok(())
2559 2 : };
2560 2 : super::remove_tenant_from_memory(&tenants, id, cleanup).await
2561 2 : }
2562 2 : .instrument(h.span())
2563 2 : });
2564 2 :
2565 2 : // now the long cleanup should be in place, with the stopping state
2566 2 : cleanup_started.wait().await;
2567 2 : jh
2568 2 : };
2569 2 :
2570 2 : let mut shutdown_task = {
2571 2 : let (until_shutdown_started, shutdown_started) = utils::completion::channel();
2572 2 :
2573 2 : let shutdown_task = tokio::spawn(async move {
2574 2 : drop(until_shutdown_started);
2575 4 : super::shutdown_all_tenants0(&tenants).await;
2576 2 : });
2577 2 :
2578 2 : shutdown_started.wait().await;
2579 2 : shutdown_task
2580 2 : };
2581 2 :
2582 2 : let long_time = std::time::Duration::from_secs(15);
2583 4 : tokio::select! {
2584 4 : _ = &mut shutdown_task => unreachable!("shutdown should block on remove_tenant_from_memory completing"),
2585 4 : _ = &mut remove_tenant_from_memory_task => unreachable!("remove_tenant_from_memory_task should not complete until explicitly unblocked"),
2586 4 : _ = tokio::time::sleep(long_time) => {},
2587 4 : }
2588 2 :
2589 2 : drop(until_cleanup_completed);
2590 2 :
2591 2 : // Now that we allow it to proceed, shutdown should complete immediately
2592 2 : remove_tenant_from_memory_task.await.unwrap().unwrap();
2593 2 : shutdown_task.await.unwrap();
2594 2 : }
2595 : }
|