TLA Line data Source code
1 : //! This module acts as a switchboard to access different repositories managed by this
2 : //! page server.
3 :
4 : use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
5 : use pageserver_api::key::Key;
6 : use pageserver_api::shard::{ShardIdentity, ShardNumber, TenantShardId};
7 : use rand::{distributions::Alphanumeric, Rng};
8 : use std::borrow::Cow;
9 : use std::collections::{BTreeMap, HashMap};
10 : use std::ops::Deref;
11 : use std::sync::Arc;
12 : use std::time::{Duration, Instant};
13 : use tokio::fs;
14 : use utils::timeout::{timeout_cancellable, TimeoutCancellableError};
15 :
16 : use anyhow::Context;
17 : use once_cell::sync::Lazy;
18 : use tokio::task::JoinSet;
19 : use tokio_util::sync::CancellationToken;
20 : use tracing::*;
21 :
22 : use remote_storage::GenericRemoteStorage;
23 : use utils::crashsafe;
24 :
25 : use crate::config::PageServerConf;
26 : use crate::context::{DownloadBehavior, RequestContext};
27 : use crate::control_plane_client::{
28 : ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError,
29 : };
30 : use crate::deletion_queue::DeletionQueueClient;
31 : use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
32 : use crate::task_mgr::{self, TaskKind};
33 : use crate::tenant::config::{
34 : AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, TenantConfOpt,
35 : };
36 : use crate::tenant::delete::DeleteTenantFlow;
37 : use crate::tenant::span::debug_assert_current_span_has_tenant_id;
38 : use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState};
39 : use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
40 :
41 : use utils::crashsafe::path_with_suffix_extension;
42 : use utils::fs_ext::PathExt;
43 : use utils::generation::Generation;
44 : use utils::id::{TenantId, TimelineId};
45 :
46 : use super::delete::DeleteTenantError;
47 : use super::secondary::SecondaryTenant;
48 : use super::TenantSharedResources;
49 :
50 : /// For a tenant that appears in TenantsMap, it may either be
51 : /// - `Attached`: has a full Tenant object, is elegible to service
52 : /// reads and ingest WAL.
53 : /// - `Secondary`: is only keeping a local cache warm.
54 : ///
55 : /// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
56 : /// that way we avoid having to carefully switch a tenant's ingestion etc on and off during
57 : /// its lifetime, and we can preserve some important safety invariants like `Tenant` always
58 : /// having a properly acquired generation (Secondary doesn't need a generation)
59 : pub(crate) enum TenantSlot {
60 : Attached(Arc<Tenant>),
61 : Secondary(Arc<SecondaryTenant>),
62 : /// In this state, other administrative operations acting on the TenantId should
63 : /// block, or return a retry indicator equivalent to HTTP 503.
64 : InProgress(utils::completion::Barrier),
65 : }
66 :
67 : impl std::fmt::Debug for TenantSlot {
68 UBC 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 0 : match self {
70 0 : Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()),
71 0 : Self::Secondary(_) => write!(f, "Secondary"),
72 0 : Self::InProgress(_) => write!(f, "InProgress"),
73 : }
74 0 : }
75 : }
76 :
77 : impl TenantSlot {
78 : /// Return the `Tenant` in this slot if attached, else None
79 CBC 1231 : fn get_attached(&self) -> Option<&Arc<Tenant>> {
80 1231 : match self {
81 1208 : Self::Attached(t) => Some(t),
82 11 : Self::Secondary(_) => None,
83 12 : Self::InProgress(_) => None,
84 : }
85 1231 : }
86 : }
87 :
88 : /// The tenants known to the pageserver.
89 : /// The enum variants are used to distinguish the different states that the pageserver can be in.
90 : pub(crate) enum TenantsMap {
91 : /// [`init_tenant_mgr`] is not done yet.
92 : Initializing,
93 : /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
94 : /// New tenants can be added using [`tenant_map_acquire_slot`].
95 : Open(BTreeMap<TenantShardId, TenantSlot>),
96 : /// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
97 : /// Existing tenants are still accessible, but no new tenants can be created.
98 : ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
99 : }
100 :
101 : pub(crate) enum TenantsMapRemoveResult {
102 : Occupied(TenantSlot),
103 : Vacant,
104 : InProgress(utils::completion::Barrier),
105 : }
106 :
107 : /// When resolving a TenantId to a shard, we may be looking for the 0th
108 : /// shard, or we might be looking for whichever shard holds a particular page.
109 : pub(crate) enum ShardSelector {
110 : /// Only return the 0th shard, if it is present. If a non-0th shard is present,
111 : /// ignore it.
112 : Zero,
113 : /// Pick the first shard we find for the TenantId
114 : First,
115 : /// Pick the shard that holds this key
116 : Page(Key),
117 : }
118 :
119 : impl TenantsMap {
120 : /// Convenience function for typical usage, where we want to get a `Tenant` object, for
121 : /// working with attached tenants. If the TenantId is in the map but in Secondary state,
122 : /// None is returned.
123 322 : pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc<Tenant>> {
124 322 : match self {
125 UBC 0 : TenantsMap::Initializing => None,
126 CBC 322 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
127 322 : m.get(tenant_shard_id).and_then(|slot| slot.get_attached())
128 : }
129 : }
130 322 : }
131 :
132 : /// A page service client sends a TenantId, and to look up the correct Tenant we must
133 : /// resolve this to a fully qualified TenantShardId.
134 8067 : fn resolve_attached_shard(
135 8067 : &self,
136 8067 : tenant_id: &TenantId,
137 8067 : selector: ShardSelector,
138 8067 : ) -> Option<TenantShardId> {
139 8067 : let mut want_shard = None;
140 8067 : match self {
141 UBC 0 : TenantsMap::Initializing => None,
142 CBC 8067 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
143 8067 : for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
144 : // Ignore all slots that don't contain an attached tenant
145 8060 : let tenant = match &slot.1 {
146 7553 : TenantSlot::Attached(t) => t,
147 507 : _ => continue,
148 : };
149 :
150 592 : match selector {
151 6961 : ShardSelector::First => return Some(*slot.0),
152 592 : ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
153 592 : return Some(*slot.0)
154 : }
155 UBC 0 : ShardSelector::Page(key) => {
156 0 : // First slot we see for this tenant, calculate the expected shard number
157 0 : // for the key: we will use this for checking if this and subsequent
158 0 : // slots contain the key, rather than recalculating the hash each time.
159 0 : if want_shard.is_none() {
160 0 : want_shard = Some(tenant.shard_identity.get_shard_number(&key));
161 0 : }
162 :
163 0 : if Some(tenant.shard_identity.number) == want_shard {
164 0 : return Some(*slot.0);
165 0 : }
166 : }
167 0 : _ => continue,
168 : }
169 : }
170 :
171 : // Fall through: we didn't find an acceptable shard
172 CBC 514 : None
173 : }
174 : }
175 8067 : }
176 :
177 : /// Only for use from DeleteTenantFlow. This method directly removes a TenantSlot from the map.
178 : ///
179 : /// The normal way to remove a tenant is using a SlotGuard, which will gracefully remove the guarded
180 : /// slot if the enclosed tenant is shutdown.
181 57 : pub(crate) fn remove(&mut self, tenant_shard_id: TenantShardId) -> TenantsMapRemoveResult {
182 57 : use std::collections::btree_map::Entry;
183 57 : match self {
184 UBC 0 : TenantsMap::Initializing => TenantsMapRemoveResult::Vacant,
185 CBC 57 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => match m.entry(tenant_shard_id) {
186 57 : Entry::Occupied(entry) => match entry.get() {
187 1 : TenantSlot::InProgress(barrier) => {
188 1 : TenantsMapRemoveResult::InProgress(barrier.clone())
189 : }
190 56 : _ => TenantsMapRemoveResult::Occupied(entry.remove()),
191 : },
192 UBC 0 : Entry::Vacant(_entry) => TenantsMapRemoveResult::Vacant,
193 : },
194 : }
195 CBC 57 : }
196 :
197 57 : pub(crate) fn len(&self) -> usize {
198 57 : match self {
199 UBC 0 : TenantsMap::Initializing => 0,
200 CBC 57 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.len(),
201 : }
202 57 : }
203 : }
204 :
205 : /// This is "safe" in that that it won't leave behind a partially deleted directory
206 : /// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting
207 : /// the contents.
208 : ///
209 : /// This is pageserver-specific, as it relies on future processes after a crash to check
210 : /// for TEMP_FILE_SUFFIX when loading things.
211 11 : async fn safe_remove_tenant_dir_all(path: impl AsRef<Utf8Path>) -> std::io::Result<()> {
212 31 : let tmp_path = safe_rename_tenant_dir(path).await?;
213 11 : fs::remove_dir_all(tmp_path).await
214 11 : }
215 :
216 76 : async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<Utf8PathBuf> {
217 76 : let parent = path
218 76 : .as_ref()
219 76 : .parent()
220 76 : // It is invalid to call this function with a relative path. Tenant directories
221 76 : // should always have a parent.
222 76 : .ok_or(std::io::Error::new(
223 76 : std::io::ErrorKind::InvalidInput,
224 76 : "Path must be absolute",
225 76 : ))?;
226 76 : let rand_suffix = rand::thread_rng()
227 76 : .sample_iter(&Alphanumeric)
228 76 : .take(8)
229 76 : .map(char::from)
230 76 : .collect::<String>()
231 76 : + TEMP_FILE_SUFFIX;
232 76 : let tmp_path = path_with_suffix_extension(&path, &rand_suffix);
233 76 : fs::rename(path.as_ref(), &tmp_path).await?;
234 76 : fs::File::open(parent).await?.sync_all().await?;
235 76 : Ok(tmp_path)
236 76 : }
237 :
238 : static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
239 558 : Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
240 :
241 : /// The TenantManager is responsible for storing and mutating the collection of all tenants
242 : /// that this pageserver process has state for. Every Tenant and SecondaryTenant instance
243 : /// lives inside the TenantManager.
244 : ///
245 : /// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
246 : /// the same tenant twice concurrently, or trying to configure the same tenant into secondary
247 : /// and attached modes concurrently.
248 : pub struct TenantManager {
249 : conf: &'static PageServerConf,
250 : // TODO: currently this is a &'static pointing to TENANTs. When we finish refactoring
251 : // out of that static variable, the TenantManager can own this.
252 : // See https://github.com/neondatabase/neon/issues/5796
253 : tenants: &'static std::sync::RwLock<TenantsMap>,
254 : resources: TenantSharedResources,
255 : }
256 :
257 1 : fn emergency_generations(
258 1 : tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
259 1 : ) -> HashMap<TenantShardId, Generation> {
260 1 : tenant_confs
261 1 : .iter()
262 1 : .filter_map(|(tid, lc)| {
263 1 : let lc = match lc {
264 1 : Ok(lc) => lc,
265 UBC 0 : Err(_) => return None,
266 : };
267 CBC 1 : let gen = match &lc.mode {
268 1 : LocationMode::Attached(alc) => Some(alc.generation),
269 UBC 0 : LocationMode::Secondary(_) => None,
270 : };
271 :
272 CBC 1 : gen.map(|g| (*tid, g))
273 1 : })
274 1 : .collect()
275 1 : }
276 :
277 557 : async fn init_load_generations(
278 557 : conf: &'static PageServerConf,
279 557 : tenant_confs: &HashMap<TenantShardId, anyhow::Result<LocationConf>>,
280 557 : resources: &TenantSharedResources,
281 557 : cancel: &CancellationToken,
282 557 : ) -> anyhow::Result<Option<HashMap<TenantShardId, Generation>>> {
283 557 : let generations = if conf.control_plane_emergency_mode {
284 1 : error!(
285 1 : "Emergency mode! Tenants will be attached unsafely using their last known generation"
286 1 : );
287 1 : emergency_generations(tenant_confs)
288 556 : } else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
289 555 : info!("Calling control plane API to re-attach tenants");
290 : // If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
291 1686 : match client.re_attach().await {
292 555 : Ok(tenants) => tenants,
293 : Err(RetryForeverError::ShuttingDown) => {
294 UBC 0 : anyhow::bail!("Shut down while waiting for control plane re-attach response")
295 : }
296 : }
297 : } else {
298 CBC 1 : info!("Control plane API not configured, tenant generations are disabled");
299 1 : return Ok(None);
300 : };
301 :
302 : // The deletion queue needs to know about the startup attachment state to decide which (if any) stored
303 : // deletion list entries may still be valid. We provide that by pushing a recovery operation into
304 : // the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
305 : // are processed, even though we don't block on recovery completing here.
306 : //
307 : // Must only do this if remote storage is enabled, otherwise deletion queue
308 : // is not running and channel push will fail.
309 556 : if resources.remote_storage.is_some() {
310 556 : resources
311 556 : .deletion_queue_client
312 556 : .recover(generations.clone())?;
313 UBC 0 : }
314 :
315 CBC 556 : Ok(Some(generations))
316 557 : }
317 :
318 : /// Given a directory discovered in the pageserver's tenants/ directory, attempt
319 : /// to load a tenant config from it.
320 : ///
321 : /// If file is missing, return Ok(None)
322 221 : fn load_tenant_config(
323 221 : conf: &'static PageServerConf,
324 221 : dentry: Utf8DirEntry,
325 221 : ) -> anyhow::Result<Option<(TenantShardId, anyhow::Result<LocationConf>)>> {
326 221 : let tenant_dir_path = dentry.path().to_path_buf();
327 221 : if crate::is_temporary(&tenant_dir_path) {
328 UBC 0 : info!("Found temporary tenant directory, removing: {tenant_dir_path}");
329 : // No need to use safe_remove_tenant_dir_all because this is already
330 : // a temporary path
331 0 : if let Err(e) = std::fs::remove_dir_all(&tenant_dir_path) {
332 0 : error!(
333 0 : "Failed to remove temporary directory '{}': {:?}",
334 0 : tenant_dir_path, e
335 0 : );
336 0 : }
337 0 : return Ok(None);
338 CBC 221 : }
339 :
340 : // This case happens if we crash during attachment before writing a config into the dir
341 221 : let is_empty = tenant_dir_path
342 221 : .is_empty_dir()
343 221 : .with_context(|| format!("Failed to check whether {tenant_dir_path:?} is an empty dir"))?;
344 221 : if is_empty {
345 2 : info!("removing empty tenant directory {tenant_dir_path:?}");
346 2 : if let Err(e) = std::fs::remove_dir(&tenant_dir_path) {
347 UBC 0 : error!(
348 0 : "Failed to remove empty tenant directory '{}': {e:#}",
349 0 : tenant_dir_path
350 0 : )
351 CBC 2 : }
352 2 : return Ok(None);
353 219 : }
354 219 :
355 219 : let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
356 219 : if tenant_ignore_mark_file.exists() {
357 1 : info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
358 1 : return Ok(None);
359 218 : }
360 :
361 218 : let tenant_shard_id = match tenant_dir_path
362 218 : .file_name()
363 218 : .unwrap_or_default()
364 218 : .parse::<TenantShardId>()
365 : {
366 218 : Ok(id) => id,
367 : Err(_) => {
368 UBC 0 : warn!("Invalid tenant path (garbage in our repo directory?): {tenant_dir_path}",);
369 0 : return Ok(None);
370 : }
371 : };
372 :
373 CBC 218 : Ok(Some((
374 218 : tenant_shard_id,
375 218 : Tenant::load_tenant_config(conf, &tenant_shard_id),
376 218 : )))
377 221 : }
378 :
379 : /// Initial stage of load: walk the local tenants directory, clean up any temp files,
380 : /// and load configurations for the tenants we found.
381 : ///
382 : /// Do this in parallel, because we expect 10k+ tenants, so serial execution can take
383 : /// seconds even on reasonably fast drives.
384 557 : async fn init_load_tenant_configs(
385 557 : conf: &'static PageServerConf,
386 557 : ) -> anyhow::Result<HashMap<TenantShardId, anyhow::Result<LocationConf>>> {
387 557 : let tenants_dir = conf.tenants_path();
388 :
389 557 : let dentries = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<Utf8DirEntry>> {
390 557 : let dir_entries = tenants_dir
391 557 : .read_dir_utf8()
392 557 : .with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
393 :
394 557 : Ok(dir_entries.collect::<Result<Vec<_>, std::io::Error>>()?)
395 557 : })
396 557 : .await??;
397 :
398 557 : let mut configs = HashMap::new();
399 557 :
400 557 : let mut join_set = JoinSet::new();
401 778 : for dentry in dentries {
402 221 : join_set.spawn_blocking(move || load_tenant_config(conf, dentry));
403 221 : }
404 :
405 778 : while let Some(r) = join_set.join_next().await {
406 221 : if let Some((tenant_id, tenant_config)) = r?? {
407 218 : configs.insert(tenant_id, tenant_config);
408 218 : }
409 : }
410 :
411 557 : Ok(configs)
412 557 : }
413 :
414 : /// Initialize repositories with locally available timelines.
415 : /// Timelines that are only partially available locally (remote storage has more data than this pageserver)
416 : /// are scheduled for download and added to the tenant once download is completed.
417 557 : #[instrument(skip_all)]
418 : pub async fn init_tenant_mgr(
419 : conf: &'static PageServerConf,
420 : resources: TenantSharedResources,
421 : init_order: InitializationOrder,
422 : cancel: CancellationToken,
423 : ) -> anyhow::Result<TenantManager> {
424 : let mut tenants = BTreeMap::new();
425 :
426 : let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
427 :
428 : // Scan local filesystem for attached tenants
429 : let tenant_configs = init_load_tenant_configs(conf).await?;
430 :
431 : // Determine which tenants are to be attached
432 : let tenant_generations =
433 : init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
434 :
435 557 : tracing::info!(
436 557 : "Attaching {} tenants at startup, warming up {} at a time",
437 557 : tenant_configs.len(),
438 557 : conf.concurrent_tenant_warmup.initial_permits()
439 557 : );
440 : TENANT.startup_scheduled.inc_by(tenant_configs.len() as u64);
441 :
442 : // Construct `Tenant` objects and start them running
443 : for (tenant_shard_id, location_conf) in tenant_configs {
444 : let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
445 :
446 : let mut location_conf = match location_conf {
447 : Ok(l) => l,
448 : Err(e) => {
449 UBC 0 : warn!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Marking tenant broken, failed to {e:#}");
450 :
451 : tenants.insert(
452 : tenant_shard_id,
453 : TenantSlot::Attached(Tenant::create_broken_tenant(
454 : conf,
455 : tenant_shard_id,
456 : format!("{}", e),
457 : )),
458 : );
459 : continue;
460 : }
461 : };
462 :
463 : let generation = if let Some(generations) = &tenant_generations {
464 : // We have a generation map: treat it as the authority for whether
465 : // this tenant is really attached.
466 : if let Some(gen) = generations.get(&tenant_shard_id) {
467 : *gen
468 : } else {
469 : match &location_conf.mode {
470 : LocationMode::Secondary(secondary_config) => {
471 : // We do not require the control plane's permission for secondary mode
472 : // tenants, because they do no remote writes and hence require no
473 : // generation number
474 CBC 5 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Loaded tenant in secondary mode");
475 : tenants.insert(
476 : tenant_shard_id,
477 : TenantSlot::Secondary(SecondaryTenant::new(
478 : tenant_shard_id,
479 : secondary_config,
480 : )),
481 : );
482 : }
483 : LocationMode::Attached(_) => {
484 : // TODO: augment re-attach API to enable the control plane to
485 : // instruct us about secondary attachments. That way, instead of throwing
486 : // away local state, we can gracefully fall back to secondary here, if the control
487 : // plane tells us so.
488 : // (https://github.com/neondatabase/neon/issues/5377)
489 11 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Detaching tenant, control plane omitted it in re-attach response");
490 : if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await {
491 UBC 0 : error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
492 0 : "Failed to remove detached tenant directory '{tenant_dir_path}': {e:?}",
493 0 : );
494 : }
495 : }
496 : };
497 :
498 : continue;
499 : }
500 : } else {
501 : // Legacy mode: no generation information, any tenant present
502 : // on local disk may activate
503 0 : info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Starting tenant in legacy mode, no generation",);
504 : Generation::none()
505 : };
506 :
507 : // Presence of a generation number implies attachment: attach the tenant
508 : // if it wasn't already, and apply the generation number.
509 : location_conf.attach_in_generation(generation);
510 : Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
511 :
512 : let shard_identity = location_conf.shard;
513 : match tenant_spawn(
514 : conf,
515 : tenant_shard_id,
516 : &tenant_dir_path,
517 : resources.clone(),
518 : AttachedTenantConf::try_from(location_conf)?,
519 : shard_identity,
520 : Some(init_order.clone()),
521 : &TENANTS,
522 : SpawnMode::Normal,
523 : &ctx,
524 : ) {
525 : Ok(tenant) => {
526 : tenants.insert(tenant_shard_id, TenantSlot::Attached(tenant));
527 : }
528 : Err(e) => {
529 0 : error!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Failed to start tenant: {e:#}");
530 : }
531 : }
532 : }
533 :
534 CBC 557 : info!("Processed {} local tenants at startup", tenants.len());
535 :
536 : let mut tenants_map = TENANTS.write().unwrap();
537 : assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
538 : METRICS.tenant_slots.set(tenants.len() as u64);
539 : *tenants_map = TenantsMap::Open(tenants);
540 :
541 : Ok(TenantManager {
542 : conf,
543 : tenants: &TENANTS,
544 : resources,
545 : })
546 : }
547 :
548 : /// Wrapper for Tenant::spawn that checks invariants before running, and inserts
549 : /// a broken tenant in the map if Tenant::spawn fails.
550 : #[allow(clippy::too_many_arguments)]
551 736 : pub(crate) fn tenant_spawn(
552 736 : conf: &'static PageServerConf,
553 736 : tenant_shard_id: TenantShardId,
554 736 : tenant_path: &Utf8Path,
555 736 : resources: TenantSharedResources,
556 736 : location_conf: AttachedTenantConf,
557 736 : shard_identity: ShardIdentity,
558 736 : init_order: Option<InitializationOrder>,
559 736 : tenants: &'static std::sync::RwLock<TenantsMap>,
560 736 : mode: SpawnMode,
561 736 : ctx: &RequestContext,
562 736 : ) -> anyhow::Result<Arc<Tenant>> {
563 736 : anyhow::ensure!(
564 736 : tenant_path.is_dir(),
565 UBC 0 : "Cannot load tenant from path {tenant_path:?}, it either does not exist or not a directory"
566 : );
567 : anyhow::ensure!(
568 CBC 736 : !crate::is_temporary(tenant_path),
569 UBC 0 : "Cannot load tenant from temporary path {tenant_path:?}"
570 : );
571 : anyhow::ensure!(
572 CBC 736 : !tenant_path.is_empty_dir().with_context(|| {
573 UBC 0 : format!("Failed to check whether {tenant_path:?} is an empty dir")
574 CBC 736 : })?,
575 UBC 0 : "Cannot load tenant from empty directory {tenant_path:?}"
576 : );
577 :
578 CBC 736 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
579 736 : anyhow::ensure!(
580 736 : !conf.tenant_ignore_mark_file_path(&tenant_shard_id).exists(),
581 UBC 0 : "Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
582 : );
583 :
584 CBC 736 : info!(
585 736 : tenant_id = %tenant_shard_id.tenant_id,
586 736 : shard_id = %tenant_shard_id.shard_slug(),
587 736 : generation = ?location_conf.location.generation,
588 736 : attach_mode = ?location_conf.location.attach_mode,
589 736 : "Attaching tenant"
590 736 : );
591 736 : let tenant = match Tenant::spawn(
592 736 : conf,
593 736 : tenant_shard_id,
594 736 : resources,
595 736 : location_conf,
596 736 : shard_identity,
597 736 : init_order,
598 736 : tenants,
599 736 : mode,
600 736 : ctx,
601 736 : ) {
602 736 : Ok(tenant) => tenant,
603 UBC 0 : Err(e) => {
604 0 : error!("Failed to spawn tenant {tenant_shard_id}, reason: {e:#}");
605 0 : Tenant::create_broken_tenant(conf, tenant_shard_id, format!("{e:#}"))
606 : }
607 : };
608 :
609 CBC 736 : Ok(tenant)
610 736 : }
611 :
612 : ///
613 : /// Shut down all tenants. This runs as part of pageserver shutdown.
614 : ///
615 : /// NB: We leave the tenants in the map, so that they remain accessible through
616 : /// the management API until we shut it down. If we removed the shut-down tenants
617 : /// from the tenants map, the management API would return 404 for these tenants,
618 : /// because TenantsMap::get() now returns `None`.
619 : /// That could be easily misinterpreted by control plane, the consumer of the
620 : /// management API. For example, it could attach the tenant on a different pageserver.
621 : /// We would then be in split-brain once this pageserver restarts.
622 159 : #[instrument(skip_all)]
623 : pub(crate) async fn shutdown_all_tenants() {
624 : shutdown_all_tenants0(&TENANTS).await
625 : }
626 :
627 160 : async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
628 160 : use utils::completion;
629 160 :
630 160 : let mut join_set = JoinSet::new();
631 :
632 : // Atomically, 1. create the shutdown tasks and 2. prevent creation of new tenants.
633 160 : let (total_in_progress, total_attached) = {
634 160 : let mut m = tenants.write().unwrap();
635 160 : match &mut *m {
636 : TenantsMap::Initializing => {
637 UBC 0 : *m = TenantsMap::ShuttingDown(BTreeMap::default());
638 0 : info!("tenants map is empty");
639 0 : return;
640 : }
641 CBC 160 : TenantsMap::Open(tenants) => {
642 160 : let mut shutdown_state = BTreeMap::new();
643 160 : let mut total_in_progress = 0;
644 160 : let mut total_attached = 0;
645 :
646 185 : for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
647 185 : match v {
648 179 : TenantSlot::Attached(t) => {
649 179 : shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
650 179 : join_set.spawn(
651 179 : async move {
652 179 : let freeze_and_flush = true;
653 :
654 179 : let res = {
655 179 : let (_guard, shutdown_progress) = completion::channel();
656 408 : t.shutdown(shutdown_progress, freeze_and_flush).await
657 : };
658 :
659 179 : if let Err(other_progress) = res {
660 : // join the another shutdown in progress
661 UBC 0 : other_progress.wait().await;
662 CBC 179 : }
663 :
664 : // we cannot afford per tenant logging here, because if s3 is degraded, we are
665 : // going to log too many lines
666 179 : debug!("tenant successfully stopped");
667 179 : }
668 179 : .instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug())),
669 : );
670 :
671 179 : total_attached += 1;
672 : }
673 5 : TenantSlot::Secondary(state) => {
674 5 : // We don't need to wait for this individually per-tenant: the
675 5 : // downloader task will be waited on eventually, this cancel
676 5 : // is just to encourage it to drop out if it is doing work
677 5 : // for this tenant right now.
678 5 : state.cancel.cancel();
679 5 :
680 5 : shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state));
681 5 : }
682 1 : TenantSlot::InProgress(notify) => {
683 1 : // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
684 1 : // wait for their notifications to fire in this function.
685 1 : join_set.spawn(async move {
686 1 : notify.wait().await;
687 1 : });
688 1 :
689 1 : total_in_progress += 1;
690 1 : }
691 : }
692 : }
693 160 : *m = TenantsMap::ShuttingDown(shutdown_state);
694 160 : (total_in_progress, total_attached)
695 : }
696 : TenantsMap::ShuttingDown(_) => {
697 UBC 0 : error!("already shutting down, this function isn't supposed to be called more than once");
698 0 : return;
699 : }
700 : }
701 : };
702 :
703 CBC 160 : let started_at = std::time::Instant::now();
704 :
705 160 : info!(
706 160 : "Waiting for {} InProgress tenants and {} Attached tenants to shut down",
707 160 : total_in_progress, total_attached
708 160 : );
709 :
710 160 : let total = join_set.len();
711 160 : let mut panicked = 0;
712 160 : let mut buffering = true;
713 160 : const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
714 160 : let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
715 :
716 347 : while !join_set.is_empty() {
717 187 : tokio::select! {
718 180 : Some(joined) = join_set.join_next() => {
719 : match joined {
720 : Ok(()) => {}
721 : Err(join_error) if join_error.is_cancelled() => {
722 : unreachable!("we are not cancelling any of the tasks");
723 : }
724 : Err(join_error) if join_error.is_panic() => {
725 : // cannot really do anything, as this panic is likely a bug
726 : panicked += 1;
727 : }
728 : Err(join_error) => {
729 UBC 0 : warn!("unknown kind of JoinError: {join_error}");
730 : }
731 : }
732 : if !buffering {
733 : // buffer so that every 500ms since the first update (or starting) we'll log
734 : // how far away we are; this is because we will get SIGKILL'd at 10s, and we
735 : // are not able to log *then*.
736 : buffering = true;
737 : buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
738 : }
739 : },
740 : _ = &mut buffered, if buffering => {
741 : buffering = false;
742 CBC 7 : info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
743 : }
744 : }
745 : }
746 :
747 160 : if panicked > 0 {
748 UBC 0 : warn!(
749 0 : panicked,
750 0 : total, "observed panicks while shutting down tenants"
751 0 : );
752 CBC 160 : }
753 :
754 : // caller will log how long we took
755 160 : }
756 :
757 431 : pub(crate) async fn create_tenant(
758 431 : conf: &'static PageServerConf,
759 431 : tenant_conf: TenantConfOpt,
760 431 : tenant_shard_id: TenantShardId,
761 431 : generation: Generation,
762 431 : resources: TenantSharedResources,
763 431 : ctx: &RequestContext,
764 431 : ) -> Result<Arc<Tenant>, TenantMapInsertError> {
765 431 : let location_conf = LocationConf::attached_single(tenant_conf, generation);
766 431 : info!("Creating tenant at location {location_conf:?}");
767 :
768 431 : let slot_guard =
769 431 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
770 861 : let tenant_path = super::create_tenant_files(conf, &location_conf, &tenant_shard_id).await?;
771 :
772 430 : let shard_identity = location_conf.shard;
773 430 : let created_tenant = tenant_spawn(
774 430 : conf,
775 430 : tenant_shard_id,
776 430 : &tenant_path,
777 430 : resources,
778 430 : AttachedTenantConf::try_from(location_conf)?,
779 430 : shard_identity,
780 430 : None,
781 430 : &TENANTS,
782 430 : SpawnMode::Create,
783 430 : ctx,
784 UBC 0 : )?;
785 : // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
786 : // See https://github.com/neondatabase/neon/issues/4233
787 :
788 CBC 430 : let created_tenant_id = created_tenant.tenant_id();
789 430 : debug_assert_eq!(created_tenant_id, tenant_shard_id.tenant_id);
790 :
791 430 : slot_guard.upsert(TenantSlot::Attached(created_tenant.clone()))?;
792 :
793 430 : Ok(created_tenant)
794 431 : }
795 :
796 UBC 0 : #[derive(Debug, thiserror::Error)]
797 : pub(crate) enum SetNewTenantConfigError {
798 : #[error(transparent)]
799 : GetTenant(#[from] GetTenantError),
800 : #[error(transparent)]
801 : Persist(anyhow::Error),
802 : }
803 :
804 CBC 30 : pub(crate) async fn set_new_tenant_config(
805 30 : conf: &'static PageServerConf,
806 30 : new_tenant_conf: TenantConfOpt,
807 30 : tenant_id: TenantId,
808 30 : ) -> Result<(), SetNewTenantConfigError> {
809 30 : // Legacy API: does not support sharding
810 30 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
811 :
812 30 : info!("configuring tenant {tenant_id}");
813 30 : let tenant = get_tenant(tenant_shard_id, true)?;
814 :
815 : // This is a legacy API that only operates on attached tenants: the preferred
816 : // API to use is the location_config/ endpoint, which lets the caller provide
817 : // the full LocationConf.
818 30 : let location_conf = LocationConf::attached_single(new_tenant_conf, tenant.generation);
819 30 :
820 30 : Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf)
821 60 : .await
822 30 : .map_err(SetNewTenantConfigError::Persist)?;
823 30 : tenant.set_new_tenant_config(new_tenant_conf);
824 30 : Ok(())
825 30 : }
826 :
827 : impl TenantManager {
828 : /// Convenience function so that anyone with a TenantManager can get at the global configuration, without
829 : /// having to pass it around everywhere as a separate object.
830 1121 : pub(crate) fn get_conf(&self) -> &'static PageServerConf {
831 1121 : self.conf
832 1121 : }
833 :
834 : /// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or is not fitting to the query.
835 : /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
836 850 : pub(crate) fn get_attached_tenant_shard(
837 850 : &self,
838 850 : tenant_shard_id: TenantShardId,
839 850 : active_only: bool,
840 850 : ) -> Result<Arc<Tenant>, GetTenantError> {
841 850 : let locked = self.tenants.read().unwrap();
842 :
843 850 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
844 :
845 849 : match peek_slot {
846 849 : Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
847 : TenantState::Broken {
848 UBC 0 : reason,
849 0 : backtrace: _,
850 0 : } if active_only => Err(GetTenantError::Broken(reason)),
851 CBC 848 : TenantState::Active => Ok(Arc::clone(tenant)),
852 : _ => {
853 1 : if active_only {
854 UBC 0 : Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
855 : } else {
856 CBC 1 : Ok(Arc::clone(tenant))
857 : }
858 : }
859 : },
860 : Some(TenantSlot::InProgress(_)) => {
861 UBC 0 : Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
862 : }
863 : None | Some(TenantSlot::Secondary(_)) => {
864 CBC 1 : Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
865 : }
866 : }
867 850 : }
868 :
869 4 : pub(crate) fn get_secondary_tenant_shard(
870 4 : &self,
871 4 : tenant_shard_id: TenantShardId,
872 4 : ) -> Option<Arc<SecondaryTenant>> {
873 4 : let locked = self.tenants.read().unwrap();
874 4 :
875 4 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
876 4 : .ok()
877 4 : .flatten();
878 :
879 4 : match peek_slot {
880 4 : Some(TenantSlot::Secondary(s)) => Some(s.clone()),
881 UBC 0 : _ => None,
882 : }
883 CBC 4 : }
884 :
885 117 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
886 : pub(crate) async fn upsert_location(
887 : &self,
888 : tenant_shard_id: TenantShardId,
889 : new_location_config: LocationConf,
890 : flush: Option<Duration>,
891 : ctx: &RequestContext,
892 : ) -> Result<(), anyhow::Error> {
893 : debug_assert_current_span_has_tenant_id();
894 117 : info!("configuring tenant location to state {new_location_config:?}");
895 :
896 : enum FastPathModified {
897 : Attached(Arc<Tenant>),
898 : Secondary(Arc<SecondaryTenant>),
899 : }
900 :
901 : // Special case fast-path for updates to existing slots: if our upsert is only updating configuration,
902 : // then we do not need to set the slot to InProgress, we can just call into the
903 : // existng tenant.
904 : let fast_path_taken = {
905 : let locked = self.tenants.read().unwrap();
906 : let peek_slot =
907 : tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
908 : match (&new_location_config.mode, peek_slot) {
909 : (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
910 : if attach_conf.generation == tenant.generation {
911 : // A transition from Attached to Attached in the same generation, we may
912 : // take our fast path and just provide the updated configuration
913 : // to the tenant.
914 : tenant.set_new_location_config(AttachedTenantConf::try_from(
915 : new_location_config.clone(),
916 : )?);
917 :
918 : Some(FastPathModified::Attached(tenant.clone()))
919 : } else {
920 : // Different generations, fall through to general case
921 : None
922 : }
923 : }
924 : (
925 : LocationMode::Secondary(secondary_conf),
926 : Some(TenantSlot::Secondary(secondary_tenant)),
927 : ) => {
928 : secondary_tenant.set_config(secondary_conf);
929 : Some(FastPathModified::Secondary(secondary_tenant.clone()))
930 : }
931 : _ => {
932 : // Not an Attached->Attached transition, fall through to general case
933 : None
934 : }
935 : }
936 : };
937 :
938 : // Fast-path continued: having dropped out of the self.tenants lock, do the async
939 : // phase of writing config and/or waiting for flush, before returning.
940 : match fast_path_taken {
941 : Some(FastPathModified::Attached(tenant)) => {
942 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
943 : .await
944 : .map_err(SetNewTenantConfigError::Persist)?;
945 :
946 : // Transition to AttachedStale means we may well hold a valid generation
947 : // still, and have been requested to go stale as part of a migration. If
948 : // the caller set `flush`, then flush to remote storage.
949 : if let LocationMode::Attached(AttachedLocationConfig {
950 : generation: _,
951 : attach_mode: AttachmentMode::Stale,
952 : }) = &new_location_config.mode
953 : {
954 : if let Some(flush_timeout) = flush {
955 : match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
956 : Ok(Err(e)) => {
957 : return Err(e);
958 : }
959 : Ok(Ok(_)) => return Ok(()),
960 : Err(_) => {
961 UBC 0 : tracing::warn!(
962 0 : timeout_ms = flush_timeout.as_millis(),
963 0 : "Timed out waiting for flush to remote storage, proceeding anyway."
964 0 : )
965 : }
966 : }
967 : }
968 : }
969 :
970 : return Ok(());
971 : }
972 : Some(FastPathModified::Secondary(_secondary_tenant)) => {
973 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
974 : .await
975 : .map_err(SetNewTenantConfigError::Persist)?;
976 :
977 : return Ok(());
978 : }
979 : None => {
980 : // Proceed with the general case procedure, where we will shutdown & remove any existing
981 : // slot contents and replace with a fresh one
982 : }
983 : };
984 :
985 : // General case for upserts to TenantsMap, excluding the case above: we will substitute an
986 : // InProgress value to the slot while we make whatever changes are required. The state for
987 : // the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
988 : // the state is ill-defined while we're in transition. Transitions are async, but fast: we do
989 : // not do significant I/O, and shutdowns should be prompt via cancellation tokens.
990 : let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
991 :
992 : match slot_guard.get_old_value() {
993 : Some(TenantSlot::Attached(tenant)) => {
994 : // The case where we keep a Tenant alive was covered above in the special case
995 : // for Attached->Attached transitions in the same generation. By this point,
996 : // if we see an attached tenant we know it will be discarded and should be
997 : // shut down.
998 : let (_guard, progress) = utils::completion::channel();
999 :
1000 : match tenant.get_attach_mode() {
1001 : AttachmentMode::Single | AttachmentMode::Multi => {
1002 : // Before we leave our state as the presumed holder of the latest generation,
1003 : // flush any outstanding deletions to reduce the risk of leaking objects.
1004 : self.resources.deletion_queue_client.flush_advisory()
1005 : }
1006 : AttachmentMode::Stale => {
1007 : // If we're stale there's not point trying to flush deletions
1008 : }
1009 : };
1010 :
1011 CBC 34 : info!("Shutting down attached tenant");
1012 : match tenant.shutdown(progress, false).await {
1013 : Ok(()) => {}
1014 : Err(barrier) => {
1015 UBC 0 : info!("Shutdown already in progress, waiting for it to complete");
1016 : barrier.wait().await;
1017 : }
1018 : }
1019 : slot_guard.drop_old_value().expect("We just shut it down");
1020 : }
1021 : Some(TenantSlot::Secondary(state)) => {
1022 CBC 16 : info!("Shutting down secondary tenant");
1023 : state.shutdown().await;
1024 : }
1025 : Some(TenantSlot::InProgress(_)) => {
1026 : // This should never happen: acquire_slot should error out
1027 : // if the contents of a slot were InProgress.
1028 : anyhow::bail!("Acquired an InProgress slot, this is a bug.")
1029 : }
1030 : None => {
1031 : // Slot was vacant, nothing needs shutting down.
1032 : }
1033 : }
1034 :
1035 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1036 : let timelines_path = self.conf.timelines_path(&tenant_shard_id);
1037 :
1038 : // Directory structure is the same for attached and secondary modes:
1039 : // create it if it doesn't exist. Timeline load/creation expects the
1040 : // timelines/ subdir to already exist.
1041 : //
1042 : // Does not need to be fsync'd because local storage is just a cache.
1043 : tokio::fs::create_dir_all(&timelines_path)
1044 : .await
1045 UBC 0 : .with_context(|| format!("Creating {timelines_path}"))?;
1046 :
1047 : // Before activating either secondary or attached mode, persist the
1048 : // configuration, so that on restart we will re-attach (or re-start
1049 : // secondary) on the tenant.
1050 : Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
1051 : .await
1052 : .map_err(SetNewTenantConfigError::Persist)?;
1053 :
1054 : let new_slot = match &new_location_config.mode {
1055 : LocationMode::Secondary(secondary_config) => {
1056 : TenantSlot::Secondary(SecondaryTenant::new(tenant_shard_id, secondary_config))
1057 : }
1058 : LocationMode::Attached(_attach_config) => {
1059 : let shard_identity = new_location_config.shard;
1060 : let tenant = tenant_spawn(
1061 : self.conf,
1062 : tenant_shard_id,
1063 : &tenant_path,
1064 : self.resources.clone(),
1065 : AttachedTenantConf::try_from(new_location_config)?,
1066 : shard_identity,
1067 : None,
1068 : self.tenants,
1069 : SpawnMode::Normal,
1070 : ctx,
1071 : )?;
1072 :
1073 : TenantSlot::Attached(tenant)
1074 : }
1075 : };
1076 :
1077 : slot_guard.upsert(new_slot)?;
1078 :
1079 : Ok(())
1080 : }
1081 :
1082 : /// Resetting a tenant is equivalent to detaching it, then attaching it again with the same
1083 : /// LocationConf that was last used to attach it. Optionally, the local file cache may be
1084 : /// dropped before re-attaching.
1085 : ///
1086 : /// This is not part of a tenant's normal lifecycle: it is used for debug/support, in situations
1087 : /// where an issue is identified that would go away with a restart of the tenant.
1088 : ///
1089 : /// This does not have any special "force" shutdown of a tenant: it relies on the tenant's tasks
1090 : /// to respect the cancellation tokens used in normal shutdown().
1091 0 : #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %drop_cache))]
1092 : pub(crate) async fn reset_tenant(
1093 : &self,
1094 : tenant_shard_id: TenantShardId,
1095 : drop_cache: bool,
1096 : ctx: RequestContext,
1097 : ) -> anyhow::Result<()> {
1098 : let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
1099 : let Some(old_slot) = slot_guard.get_old_value() else {
1100 : anyhow::bail!("Tenant not found when trying to reset");
1101 : };
1102 :
1103 : let Some(tenant) = old_slot.get_attached() else {
1104 : slot_guard.revert();
1105 : anyhow::bail!("Tenant is not in attached state");
1106 : };
1107 :
1108 : let (_guard, progress) = utils::completion::channel();
1109 : match tenant.shutdown(progress, false).await {
1110 : Ok(()) => {
1111 : slot_guard.drop_old_value()?;
1112 : }
1113 : Err(_barrier) => {
1114 : slot_guard.revert();
1115 : anyhow::bail!("Cannot reset Tenant, already shutting down");
1116 : }
1117 : }
1118 :
1119 : let tenant_path = self.conf.tenant_path(&tenant_shard_id);
1120 : let timelines_path = self.conf.timelines_path(&tenant_shard_id);
1121 : let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
1122 :
1123 : if drop_cache {
1124 CBC 1 : tracing::info!("Dropping local file cache");
1125 :
1126 : match tokio::fs::read_dir(&timelines_path).await {
1127 : Err(e) => {
1128 UBC 0 : tracing::warn!("Failed to list timelines while dropping cache: {}", e);
1129 : }
1130 : Ok(mut entries) => {
1131 : while let Some(entry) = entries.next_entry().await? {
1132 : tokio::fs::remove_dir_all(entry.path()).await?;
1133 : }
1134 : }
1135 : }
1136 : }
1137 :
1138 : let shard_identity = config.shard;
1139 : let tenant = tenant_spawn(
1140 : self.conf,
1141 : tenant_shard_id,
1142 : &tenant_path,
1143 : self.resources.clone(),
1144 : AttachedTenantConf::try_from(config)?,
1145 : shard_identity,
1146 : None,
1147 : self.tenants,
1148 : SpawnMode::Normal,
1149 : &ctx,
1150 : )?;
1151 :
1152 : slot_guard.upsert(TenantSlot::Attached(tenant))?;
1153 :
1154 : Ok(())
1155 : }
1156 :
1157 CBC 1109 : pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec<Arc<Tenant>> {
1158 1109 : let locked = self.tenants.read().unwrap();
1159 1109 : match &*locked {
1160 UBC 0 : TenantsMap::Initializing => Vec::new(),
1161 CBC 1109 : TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => map
1162 1109 : .values()
1163 1109 : .filter_map(|slot| {
1164 908 : slot.get_attached()
1165 908 : .and_then(|t| if t.is_active() { Some(t.clone()) } else { None })
1166 1109 : })
1167 1109 : .collect(),
1168 : }
1169 1109 : }
1170 : // Do some synchronous work for all tenant slots in Secondary state. The provided
1171 : // callback should be small and fast, as it will be called inside the global
1172 : // TenantsMap lock.
1173 1109 : pub(crate) fn foreach_secondary_tenants<F>(&self, mut func: F)
1174 1109 : where
1175 1109 : // TODO: let the callback return a hint to drop out of the loop early
1176 1109 : F: FnMut(&TenantShardId, &Arc<SecondaryTenant>),
1177 1109 : {
1178 1109 : let locked = self.tenants.read().unwrap();
1179 :
1180 1109 : let map = match &*locked {
1181 UBC 0 : TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
1182 CBC 1109 : TenantsMap::Open(m) => m,
1183 : };
1184 :
1185 2017 : for (tenant_id, slot) in map {
1186 908 : if let TenantSlot::Secondary(state) = slot {
1187 : // Only expose secondary tenants that are not currently shutting down
1188 11 : if !state.cancel.is_cancelled() {
1189 11 : func(tenant_id, state)
1190 UBC 0 : }
1191 CBC 897 : }
1192 : }
1193 1109 : }
1194 :
1195 91 : pub(crate) async fn delete_tenant(
1196 91 : &self,
1197 91 : tenant_shard_id: TenantShardId,
1198 91 : activation_timeout: Duration,
1199 91 : ) -> Result<(), DeleteTenantError> {
1200 : // We acquire a SlotGuard during this function to protect against concurrent
1201 : // changes while the ::prepare phase of DeleteTenantFlow executes, but then
1202 : // have to return the Tenant to the map while the background deletion runs.
1203 : //
1204 : // TODO: refactor deletion to happen outside the lifetime of a Tenant.
1205 : // Currently, deletion requires a reference to the tenants map in order to
1206 : // keep the Tenant in the map until deletion is complete, and then remove
1207 : // it at the end.
1208 : //
1209 : // See https://github.com/neondatabase/neon/issues/5080
1210 :
1211 90 : let slot_guard =
1212 91 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
1213 :
1214 : // unwrap is safe because we used MustExist mode when acquiring
1215 90 : let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
1216 90 : TenantSlot::Attached(tenant) => tenant.clone(),
1217 : _ => {
1218 : // Express "not attached" as equivalent to "not found"
1219 UBC 0 : return Err(DeleteTenantError::NotAttached);
1220 : }
1221 : };
1222 :
1223 CBC 90 : match tenant.current_state() {
1224 26 : TenantState::Broken { .. } | TenantState::Stopping { .. } => {
1225 26 : // If a tenant is broken or stopping, DeleteTenantFlow can
1226 26 : // handle it: broken tenants proceed to delete, stopping tenants
1227 26 : // are checked for deletion already in progress.
1228 26 : }
1229 : _ => {
1230 64 : tenant
1231 64 : .wait_to_become_active(activation_timeout)
1232 2 : .await
1233 64 : .map_err(|e| match e {
1234 : GetActiveTenantError::WillNotBecomeActive(_) => {
1235 UBC 0 : DeleteTenantError::InvalidState(tenant.current_state())
1236 : }
1237 0 : GetActiveTenantError::Cancelled => DeleteTenantError::Cancelled,
1238 0 : GetActiveTenantError::NotFound(_) => DeleteTenantError::NotAttached,
1239 : GetActiveTenantError::WaitForActiveTimeout {
1240 0 : latest_state: _latest_state,
1241 0 : wait_time: _wait_time,
1242 0 : } => DeleteTenantError::InvalidState(tenant.current_state()),
1243 CBC 64 : })?;
1244 : }
1245 : }
1246 :
1247 90 : let result = DeleteTenantFlow::run(
1248 90 : self.conf,
1249 90 : self.resources.remote_storage.clone(),
1250 90 : &TENANTS,
1251 90 : tenant,
1252 90 : )
1253 446 : .await;
1254 :
1255 : // The Tenant goes back into the map in Stopping state, it will eventually be removed by DeleteTenantFLow
1256 90 : slot_guard.revert();
1257 90 : result
1258 91 : }
1259 : }
1260 :
1261 2056 : #[derive(Debug, thiserror::Error)]
1262 : pub(crate) enum GetTenantError {
1263 : #[error("Tenant {0} not found")]
1264 : NotFound(TenantId),
1265 : #[error("Tenant {0} is not active")]
1266 : NotActive(TenantId),
1267 : /// Broken is logically a subset of NotActive, but a distinct error is useful as
1268 : /// NotActive is usually a retryable state for API purposes, whereas Broken
1269 : /// is a stuck error state
1270 : #[error("Tenant is broken: {0}")]
1271 : Broken(String),
1272 :
1273 : // Initializing or shutting down: cannot authoritatively say whether we have this tenant
1274 : #[error("Tenant map is not available: {0}")]
1275 : MapState(#[from] TenantMapError),
1276 : }
1277 :
1278 : /// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
1279 : /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
1280 : ///
1281 : /// This method is cancel-safe.
1282 6149 : pub(crate) fn get_tenant(
1283 6149 : tenant_shard_id: TenantShardId,
1284 6149 : active_only: bool,
1285 6149 : ) -> Result<Arc<Tenant>, GetTenantError> {
1286 6149 : let locked = TENANTS.read().unwrap();
1287 :
1288 6149 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
1289 :
1290 6084 : match peek_slot {
1291 6084 : Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
1292 : TenantState::Broken {
1293 1 : reason,
1294 1 : backtrace: _,
1295 1 : } if active_only => Err(GetTenantError::Broken(reason)),
1296 5729 : TenantState::Active => Ok(Arc::clone(tenant)),
1297 : _ => {
1298 354 : if active_only {
1299 6 : Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
1300 : } else {
1301 348 : Ok(Arc::clone(tenant))
1302 : }
1303 : }
1304 : },
1305 : Some(TenantSlot::InProgress(_)) => {
1306 UBC 0 : Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
1307 : }
1308 : None | Some(TenantSlot::Secondary(_)) => {
1309 CBC 65 : Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
1310 : }
1311 : }
1312 6149 : }
1313 :
1314 2056 : #[derive(thiserror::Error, Debug)]
1315 : pub(crate) enum GetActiveTenantError {
1316 : /// We may time out either while TenantSlot is InProgress, or while the Tenant
1317 : /// is in a non-Active state
1318 : #[error(
1319 : "Timed out waiting {wait_time:?} for tenant active state. Latest state: {latest_state:?}"
1320 : )]
1321 : WaitForActiveTimeout {
1322 : latest_state: Option<TenantState>,
1323 : wait_time: Duration,
1324 : },
1325 :
1326 : /// The TenantSlot is absent, or in secondary mode
1327 : #[error(transparent)]
1328 : NotFound(#[from] GetTenantError),
1329 :
1330 : /// Cancellation token fired while we were waiting
1331 : #[error("cancelled")]
1332 : Cancelled,
1333 :
1334 : /// Tenant exists, but is in a state that cannot become active (e.g. Stopping, Broken)
1335 : #[error("will not become active. Current state: {0}")]
1336 : WillNotBecomeActive(TenantState),
1337 : }
1338 :
1339 : /// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`]
1340 : /// state, then wait for up to `timeout`. If the [`Tenant`] is not currently in [`TenantState::Active`],
1341 : /// then wait for up to `timeout` (minus however long we waited for the slot).
1342 8067 : pub(crate) async fn get_active_tenant_with_timeout(
1343 8067 : tenant_id: TenantId,
1344 8067 : shard_selector: ShardSelector,
1345 8067 : timeout: Duration,
1346 8067 : cancel: &CancellationToken,
1347 8067 : ) -> Result<Arc<Tenant>, GetActiveTenantError> {
1348 8067 : enum WaitFor {
1349 8067 : Barrier(utils::completion::Barrier),
1350 8067 : Tenant(Arc<Tenant>),
1351 8067 : }
1352 8067 :
1353 8067 : let wait_start = Instant::now();
1354 8067 : let deadline = wait_start + timeout;
1355 :
1356 141 : let (wait_for, tenant_shard_id) = {
1357 8067 : let locked = TENANTS.read().unwrap();
1358 :
1359 : // Resolve TenantId to TenantShardId
1360 8067 : let tenant_shard_id = locked
1361 8067 : .resolve_attached_shard(&tenant_id, shard_selector)
1362 8067 : .ok_or(GetActiveTenantError::NotFound(GetTenantError::NotFound(
1363 8067 : tenant_id,
1364 8067 : )))?;
1365 :
1366 7553 : let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
1367 7553 : .map_err(GetTenantError::MapState)?;
1368 7553 : match peek_slot {
1369 7553 : Some(TenantSlot::Attached(tenant)) => {
1370 7553 : match tenant.current_state() {
1371 : TenantState::Active => {
1372 : // Fast path: we don't need to do any async waiting.
1373 7412 : return Ok(tenant.clone());
1374 : }
1375 : _ => {
1376 141 : tenant.activate_now();
1377 141 : (WaitFor::Tenant(tenant.clone()), tenant_shard_id)
1378 : }
1379 : }
1380 : }
1381 : Some(TenantSlot::Secondary(_)) => {
1382 UBC 0 : return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
1383 0 : tenant_id,
1384 0 : )))
1385 : }
1386 0 : Some(TenantSlot::InProgress(barrier)) => {
1387 0 : (WaitFor::Barrier(barrier.clone()), tenant_shard_id)
1388 : }
1389 : None => {
1390 0 : return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
1391 0 : tenant_id,
1392 0 : )))
1393 : }
1394 : }
1395 : };
1396 :
1397 CBC 141 : let tenant = match wait_for {
1398 UBC 0 : WaitFor::Barrier(barrier) => {
1399 0 : tracing::debug!("Waiting for tenant InProgress state to pass...");
1400 0 : timeout_cancellable(
1401 0 : deadline.duration_since(Instant::now()),
1402 0 : cancel,
1403 0 : barrier.wait(),
1404 0 : )
1405 0 : .await
1406 0 : .map_err(|e| match e {
1407 0 : TimeoutCancellableError::Timeout => GetActiveTenantError::WaitForActiveTimeout {
1408 0 : latest_state: None,
1409 0 : wait_time: wait_start.elapsed(),
1410 0 : },
1411 0 : TimeoutCancellableError::Cancelled => GetActiveTenantError::Cancelled,
1412 0 : })?;
1413 : {
1414 0 : let locked = TENANTS.read().unwrap();
1415 0 : let peek_slot =
1416 0 : tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
1417 0 : .map_err(GetTenantError::MapState)?;
1418 0 : match peek_slot {
1419 0 : Some(TenantSlot::Attached(tenant)) => tenant.clone(),
1420 : _ => {
1421 0 : return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
1422 0 : tenant_id,
1423 0 : )))
1424 : }
1425 : }
1426 : }
1427 : }
1428 CBC 141 : WaitFor::Tenant(tenant) => tenant,
1429 : };
1430 :
1431 UBC 0 : tracing::debug!("Waiting for tenant to enter active state...");
1432 CBC 141 : tenant
1433 141 : .wait_to_become_active(deadline.duration_since(Instant::now()))
1434 427 : .await?;
1435 141 : Ok(tenant)
1436 8067 : }
1437 :
1438 UBC 0 : #[derive(Debug, thiserror::Error)]
1439 : pub(crate) enum DeleteTimelineError {
1440 : #[error("Tenant {0}")]
1441 : Tenant(#[from] GetTenantError),
1442 :
1443 : #[error("Timeline {0}")]
1444 : Timeline(#[from] crate::tenant::DeleteTimelineError),
1445 : }
1446 :
1447 0 : #[derive(Debug, thiserror::Error)]
1448 : pub(crate) enum TenantStateError {
1449 : #[error("Tenant {0} is stopping")]
1450 : IsStopping(TenantId),
1451 : #[error(transparent)]
1452 : SlotError(#[from] TenantSlotError),
1453 : #[error(transparent)]
1454 : SlotUpsertError(#[from] TenantSlotUpsertError),
1455 : #[error(transparent)]
1456 : Other(#[from] anyhow::Error),
1457 : }
1458 :
1459 CBC 77 : pub(crate) async fn detach_tenant(
1460 77 : conf: &'static PageServerConf,
1461 77 : tenant_shard_id: TenantShardId,
1462 77 : detach_ignored: bool,
1463 77 : deletion_queue_client: &DeletionQueueClient,
1464 77 : ) -> Result<(), TenantStateError> {
1465 77 : let tmp_path = detach_tenant0(
1466 77 : conf,
1467 77 : &TENANTS,
1468 77 : tenant_shard_id,
1469 77 : detach_ignored,
1470 77 : deletion_queue_client,
1471 77 : )
1472 326 : .await?;
1473 : // Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
1474 : // After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
1475 65 : let task_tenant_id = None;
1476 65 : task_mgr::spawn(
1477 65 : task_mgr::BACKGROUND_RUNTIME.handle(),
1478 65 : TaskKind::MgmtRequest,
1479 65 : task_tenant_id,
1480 65 : None,
1481 65 : "tenant_files_delete",
1482 65 : false,
1483 65 : async move {
1484 65 : fs::remove_dir_all(tmp_path.as_path())
1485 65 : .await
1486 65 : .with_context(|| format!("tenant directory {:?} deletion", tmp_path))
1487 65 : },
1488 65 : );
1489 65 : Ok(())
1490 77 : }
1491 :
1492 77 : async fn detach_tenant0(
1493 77 : conf: &'static PageServerConf,
1494 77 : tenants: &std::sync::RwLock<TenantsMap>,
1495 77 : tenant_shard_id: TenantShardId,
1496 77 : detach_ignored: bool,
1497 77 : deletion_queue_client: &DeletionQueueClient,
1498 77 : ) -> Result<Utf8PathBuf, TenantStateError> {
1499 77 : let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
1500 65 : let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
1501 65 : safe_rename_tenant_dir(&local_tenant_directory)
1502 195 : .await
1503 77 : .with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))
1504 65 : };
1505 :
1506 77 : let removal_result = remove_tenant_from_memory(
1507 77 : tenants,
1508 77 : tenant_shard_id,
1509 77 : tenant_dir_rename_operation(tenant_shard_id),
1510 77 : )
1511 323 : .await;
1512 :
1513 : // Flush pending deletions, so that they have a good chance of passing validation
1514 : // before this tenant is potentially re-attached elsewhere.
1515 77 : deletion_queue_client.flush_advisory();
1516 77 :
1517 77 : // Ignored tenants are not present in memory and will bail the removal from memory operation.
1518 77 : // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
1519 77 : if detach_ignored
1520 : && matches!(
1521 10 : removal_result,
1522 : Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
1523 : )
1524 : {
1525 10 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
1526 10 : if tenant_ignore_mark.exists() {
1527 1 : info!("Detaching an ignored tenant");
1528 1 : let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
1529 3 : .await
1530 1 : .with_context(|| {
1531 UBC 0 : format!("Ignored tenant {tenant_shard_id} local directory rename")
1532 CBC 1 : })?;
1533 1 : return Ok(tmp_path);
1534 9 : }
1535 67 : }
1536 :
1537 76 : removal_result
1538 77 : }
1539 :
1540 5 : pub(crate) async fn load_tenant(
1541 5 : conf: &'static PageServerConf,
1542 5 : tenant_id: TenantId,
1543 5 : generation: Generation,
1544 5 : broker_client: storage_broker::BrokerClientChannel,
1545 5 : remote_storage: Option<GenericRemoteStorage>,
1546 5 : deletion_queue_client: DeletionQueueClient,
1547 5 : ctx: &RequestContext,
1548 5 : ) -> Result<(), TenantMapInsertError> {
1549 5 : // This is a legacy API (replaced by `/location_conf`). It does not support sharding
1550 5 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
1551 :
1552 4 : let slot_guard =
1553 5 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
1554 4 : let tenant_path = conf.tenant_path(&tenant_shard_id);
1555 4 :
1556 4 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
1557 4 : if tenant_ignore_mark.exists() {
1558 4 : std::fs::remove_file(&tenant_ignore_mark).with_context(|| {
1559 UBC 0 : format!(
1560 0 : "Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"
1561 0 : )
1562 CBC 4 : })?;
1563 UBC 0 : }
1564 :
1565 CBC 4 : let resources = TenantSharedResources {
1566 4 : broker_client,
1567 4 : remote_storage,
1568 4 : deletion_queue_client,
1569 4 : };
1570 :
1571 4 : let mut location_conf =
1572 4 : Tenant::load_tenant_config(conf, &tenant_shard_id).map_err(TenantMapInsertError::Other)?;
1573 4 : location_conf.attach_in_generation(generation);
1574 4 :
1575 8 : Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
1576 :
1577 4 : let shard_identity = location_conf.shard;
1578 4 : let new_tenant = tenant_spawn(
1579 4 : conf,
1580 4 : tenant_shard_id,
1581 4 : &tenant_path,
1582 4 : resources,
1583 4 : AttachedTenantConf::try_from(location_conf)?,
1584 4 : shard_identity,
1585 4 : None,
1586 4 : &TENANTS,
1587 4 : SpawnMode::Normal,
1588 4 : ctx,
1589 4 : )
1590 4 : .with_context(|| format!("Failed to schedule tenant processing in path {tenant_path:?}"))?;
1591 :
1592 4 : slot_guard.upsert(TenantSlot::Attached(new_tenant))?;
1593 4 : Ok(())
1594 5 : }
1595 :
1596 6 : pub(crate) async fn ignore_tenant(
1597 6 : conf: &'static PageServerConf,
1598 6 : tenant_id: TenantId,
1599 6 : ) -> Result<(), TenantStateError> {
1600 21 : ignore_tenant0(conf, &TENANTS, tenant_id).await
1601 6 : }
1602 :
1603 6 : async fn ignore_tenant0(
1604 6 : conf: &'static PageServerConf,
1605 6 : tenants: &std::sync::RwLock<TenantsMap>,
1606 6 : tenant_id: TenantId,
1607 6 : ) -> Result<(), TenantStateError> {
1608 6 : // This is a legacy API (replaced by `/location_conf`). It does not support sharding
1609 6 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
1610 6 :
1611 6 : remove_tenant_from_memory(tenants, tenant_shard_id, async {
1612 6 : let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_shard_id);
1613 6 : fs::File::create(&ignore_mark_file)
1614 6 : .await
1615 6 : .context("Failed to create ignore mark file")
1616 6 : .and_then(|_| {
1617 6 : crashsafe::fsync_file_and_parent(&ignore_mark_file)
1618 6 : .context("Failed to fsync ignore mark file")
1619 6 : })
1620 6 : .with_context(|| format!("Failed to crate ignore mark for tenant {tenant_shard_id}"))?;
1621 6 : Ok(())
1622 6 : })
1623 21 : .await
1624 6 : }
1625 :
1626 UBC 0 : #[derive(Debug, thiserror::Error)]
1627 : pub(crate) enum TenantMapListError {
1628 : #[error("tenant map is still initiailizing")]
1629 : Initializing,
1630 : }
1631 :
1632 : ///
1633 : /// Get list of tenants, for the mgmt API
1634 : ///
1635 CBC 239 : pub(crate) async fn list_tenants() -> Result<Vec<(TenantShardId, TenantState)>, TenantMapListError>
1636 239 : {
1637 239 : let tenants = TENANTS.read().unwrap();
1638 239 : let m = match &*tenants {
1639 UBC 0 : TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
1640 CBC 239 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
1641 239 : };
1642 239 : Ok(m.iter()
1643 240 : .filter_map(|(id, tenant)| match tenant {
1644 240 : TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
1645 UBC 0 : TenantSlot::Secondary(_) => None,
1646 0 : TenantSlot::InProgress(_) => None,
1647 CBC 240 : })
1648 239 : .collect())
1649 239 : }
1650 :
1651 : /// Execute Attach mgmt API command.
1652 : ///
1653 : /// Downloading all the tenant data is performed in the background, this merely
1654 : /// spawns the background task and returns quickly.
1655 44 : pub(crate) async fn attach_tenant(
1656 44 : conf: &'static PageServerConf,
1657 44 : tenant_id: TenantId,
1658 44 : generation: Generation,
1659 44 : tenant_conf: TenantConfOpt,
1660 44 : resources: TenantSharedResources,
1661 44 : ctx: &RequestContext,
1662 44 : ) -> Result<(), TenantMapInsertError> {
1663 44 : // This is a legacy API (replaced by `/location_conf`). It does not support sharding
1664 44 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
1665 :
1666 31 : let slot_guard =
1667 44 : tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
1668 31 : let location_conf = LocationConf::attached_single(tenant_conf, generation);
1669 58 : let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_shard_id).await?;
1670 : // TODO: tenant directory remains on disk if we bail out from here on.
1671 : // See https://github.com/neondatabase/neon/issues/4233
1672 :
1673 29 : let shard_identity = location_conf.shard;
1674 29 : let attached_tenant = tenant_spawn(
1675 29 : conf,
1676 29 : tenant_shard_id,
1677 29 : &tenant_dir,
1678 29 : resources,
1679 29 : AttachedTenantConf::try_from(location_conf)?,
1680 29 : shard_identity,
1681 29 : None,
1682 29 : &TENANTS,
1683 29 : SpawnMode::Normal,
1684 29 : ctx,
1685 UBC 0 : )?;
1686 : // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
1687 : // See https://github.com/neondatabase/neon/issues/4233
1688 :
1689 CBC 29 : let attached_tenant_id = attached_tenant.tenant_id();
1690 29 : if tenant_id != attached_tenant_id {
1691 UBC 0 : return Err(TenantMapInsertError::Other(anyhow::anyhow!(
1692 0 : "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})",
1693 0 : )));
1694 CBC 29 : }
1695 29 :
1696 29 : slot_guard.upsert(TenantSlot::Attached(attached_tenant))?;
1697 29 : Ok(())
1698 44 : }
1699 :
1700 UBC 0 : #[derive(Debug, thiserror::Error)]
1701 : pub(crate) enum TenantMapInsertError {
1702 : #[error(transparent)]
1703 : SlotError(#[from] TenantSlotError),
1704 : #[error(transparent)]
1705 : SlotUpsertError(#[from] TenantSlotUpsertError),
1706 : #[error(transparent)]
1707 : Other(#[from] anyhow::Error),
1708 : }
1709 :
1710 : /// Superset of TenantMapError: issues that can occur when acquiring a slot
1711 : /// for a particular tenant ID.
1712 CBC 14 : #[derive(Debug, thiserror::Error)]
1713 : pub enum TenantSlotError {
1714 : /// When acquiring a slot with the expectation that the tenant already exists.
1715 : #[error("Tenant {0} not found")]
1716 : NotFound(TenantShardId),
1717 :
1718 : /// When acquiring a slot with the expectation that the tenant does not already exist.
1719 : #[error("tenant {0} already exists, state: {1:?}")]
1720 : AlreadyExists(TenantShardId, TenantState),
1721 :
1722 : #[error("tenant {0} already exists in but is not attached")]
1723 : Conflict(TenantShardId),
1724 :
1725 : // Tried to read a slot that is currently being mutated by another administrative
1726 : // operation.
1727 : #[error("tenant has a state change in progress, try again later")]
1728 : InProgress,
1729 :
1730 : #[error(transparent)]
1731 : MapState(#[from] TenantMapError),
1732 : }
1733 :
1734 : /// Superset of TenantMapError: issues that can occur when using a SlotGuard
1735 : /// to insert a new value.
1736 UBC 0 : #[derive(Debug, thiserror::Error)]
1737 : pub enum TenantSlotUpsertError {
1738 : /// An error where the slot is in an unexpected state, indicating a code bug
1739 : #[error("Internal error updating Tenant")]
1740 : InternalError(Cow<'static, str>),
1741 :
1742 : #[error(transparent)]
1743 : MapState(#[from] TenantMapError),
1744 : }
1745 :
1746 0 : #[derive(Debug, thiserror::Error)]
1747 : enum TenantSlotDropError {
1748 : /// It is only legal to drop a TenantSlot if its contents are fully shut down
1749 : #[error("Tenant was not shut down")]
1750 : NotShutdown,
1751 : }
1752 :
1753 : /// Errors that can happen any time we are walking the tenant map to try and acquire
1754 : /// the TenantSlot for a particular tenant.
1755 0 : #[derive(Debug, thiserror::Error)]
1756 : pub enum TenantMapError {
1757 : // Tried to read while initializing
1758 : #[error("tenant map is still initializing")]
1759 : StillInitializing,
1760 :
1761 : // Tried to read while shutting down
1762 : #[error("tenant map is shutting down")]
1763 : ShuttingDown,
1764 : }
1765 :
1766 : /// Guards a particular tenant_id's content in the TenantsMap. While this
1767 : /// structure exists, the TenantsMap will contain a [`TenantSlot::InProgress`]
1768 : /// for this tenant, which acts as a marker for any operations targeting
1769 : /// this tenant to retry later, or wait for the InProgress state to end.
1770 : ///
1771 : /// This structure enforces the important invariant that we do not have overlapping
1772 : /// tasks that will try use local storage for a the same tenant ID: we enforce that
1773 : /// the previous contents of a slot have been shut down before the slot can be
1774 : /// left empty or used for something else
1775 : ///
1776 : /// Holders of a SlotGuard should explicitly dispose of it, using either `upsert`
1777 : /// to provide a new value, or `revert` to put the slot back into its initial
1778 : /// state. If the SlotGuard is dropped without calling either of these, then
1779 : /// we will leave the slot empty if our `old_value` is already shut down, else
1780 : /// we will replace the slot with `old_value` (equivalent to doing a revert).
1781 : ///
1782 : /// The `old_value` may be dropped before the SlotGuard is dropped, by calling
1783 : /// `drop_old_value`. It is an error to call this without shutting down
1784 : /// the conents of `old_value`.
1785 : pub struct SlotGuard {
1786 : tenant_shard_id: TenantShardId,
1787 : old_value: Option<TenantSlot>,
1788 : upserted: bool,
1789 :
1790 : /// [`TenantSlot::InProgress`] carries the corresponding Barrier: it will
1791 : /// release any waiters as soon as this SlotGuard is dropped.
1792 : _completion: utils::completion::Completion,
1793 : }
1794 :
1795 : impl SlotGuard {
1796 CBC 720 : fn new(
1797 720 : tenant_shard_id: TenantShardId,
1798 720 : old_value: Option<TenantSlot>,
1799 720 : completion: utils::completion::Completion,
1800 720 : ) -> Self {
1801 720 : Self {
1802 720 : tenant_shard_id,
1803 720 : old_value,
1804 720 : upserted: false,
1805 720 : _completion: completion,
1806 720 : }
1807 720 : }
1808 :
1809 : /// Get any value that was present in the slot before we acquired ownership
1810 : /// of it: in state transitions, this will be the old state.
1811 254 : fn get_old_value(&self) -> &Option<TenantSlot> {
1812 254 : &self.old_value
1813 254 : }
1814 :
1815 : /// Emplace a new value in the slot. This consumes the guard, and after
1816 : /// returning, the slot is no longer protected from concurrent changes.
1817 646 : fn upsert(mut self, new_value: TenantSlot) -> Result<(), TenantSlotUpsertError> {
1818 646 : if !self.old_value_is_shutdown() {
1819 : // This is a bug: callers should never try to drop an old value without
1820 : // shutting it down
1821 UBC 0 : return Err(TenantSlotUpsertError::InternalError(
1822 0 : "Old TenantSlot value not shut down".into(),
1823 0 : ));
1824 CBC 646 : }
1825 :
1826 646 : let replaced = {
1827 646 : let mut locked = TENANTS.write().unwrap();
1828 646 :
1829 646 : if let TenantSlot::InProgress(_) = new_value {
1830 : // It is never expected to try and upsert InProgress via this path: it should
1831 : // only be written via the tenant_map_acquire_slot path. If we hit this it's a bug.
1832 UBC 0 : return Err(TenantSlotUpsertError::InternalError(
1833 0 : "Attempt to upsert an InProgress state".into(),
1834 0 : ));
1835 CBC 646 : }
1836 :
1837 646 : let m = match &mut *locked {
1838 UBC 0 : TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
1839 : TenantsMap::ShuttingDown(_) => {
1840 0 : return Err(TenantMapError::ShuttingDown.into());
1841 : }
1842 CBC 646 : TenantsMap::Open(m) => m,
1843 646 : };
1844 646 :
1845 646 : let replaced = m.insert(self.tenant_shard_id, new_value);
1846 646 : self.upserted = true;
1847 646 :
1848 646 : METRICS.tenant_slots.set(m.len() as u64);
1849 646 :
1850 646 : replaced
1851 : };
1852 :
1853 : // Sanity check: on an upsert we should always be replacing an InProgress marker
1854 646 : match replaced {
1855 : Some(TenantSlot::InProgress(_)) => {
1856 : // Expected case: we find our InProgress in the map: nothing should have
1857 : // replaced it because the code that acquires slots will not grant another
1858 : // one for the same TenantId.
1859 646 : Ok(())
1860 : }
1861 : None => {
1862 UBC 0 : METRICS.unexpected_errors.inc();
1863 0 : error!(
1864 0 : tenant_shard_id = %self.tenant_shard_id,
1865 0 : "Missing InProgress marker during tenant upsert, this is a bug."
1866 0 : );
1867 0 : Err(TenantSlotUpsertError::InternalError(
1868 0 : "Missing InProgress marker during tenant upsert".into(),
1869 0 : ))
1870 : }
1871 0 : Some(slot) => {
1872 0 : METRICS.unexpected_errors.inc();
1873 0 : error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot);
1874 0 : Err(TenantSlotUpsertError::InternalError(
1875 0 : "Unexpected contents of TenantSlot".into(),
1876 0 : ))
1877 : }
1878 : }
1879 CBC 646 : }
1880 :
1881 : /// Replace the InProgress slot with whatever was in the guard when we started
1882 90 : fn revert(mut self) {
1883 90 : if let Some(value) = self.old_value.take() {
1884 90 : match self.upsert(value) {
1885 UBC 0 : Err(TenantSlotUpsertError::InternalError(_)) => {
1886 0 : // We already logged the error, nothing else we can do.
1887 0 : }
1888 0 : Err(TenantSlotUpsertError::MapState(_)) => {
1889 0 : // If the map is shutting down, we need not replace anything
1890 0 : }
1891 CBC 90 : Ok(()) => {}
1892 : }
1893 UBC 0 : }
1894 CBC 90 : }
1895 :
1896 : /// We may never drop our old value until it is cleanly shut down: otherwise we might leave
1897 : /// rogue background tasks that would write to the local tenant directory that this guard
1898 : /// is responsible for protecting
1899 826 : fn old_value_is_shutdown(&self) -> bool {
1900 826 : match self.old_value.as_ref() {
1901 102 : Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(),
1902 21 : Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(),
1903 : Some(TenantSlot::InProgress(_)) => {
1904 : // A SlotGuard cannot be constructed for a slot that was already InProgress
1905 UBC 0 : unreachable!()
1906 : }
1907 CBC 703 : None => true,
1908 : }
1909 826 : }
1910 :
1911 : /// The guard holder is done with the old value of the slot: they are obliged to already
1912 : /// shut it down before we reach this point.
1913 107 : fn drop_old_value(&mut self) -> Result<(), TenantSlotDropError> {
1914 107 : if !self.old_value_is_shutdown() {
1915 UBC 0 : Err(TenantSlotDropError::NotShutdown)
1916 : } else {
1917 CBC 107 : self.old_value.take();
1918 107 : Ok(())
1919 : }
1920 107 : }
1921 : }
1922 :
1923 : impl Drop for SlotGuard {
1924 720 : fn drop(&mut self) {
1925 720 : if self.upserted {
1926 646 : return;
1927 74 : }
1928 74 : // Our old value is already shutdown, or it never existed: it is safe
1929 74 : // for us to fully release the TenantSlot back into an empty state
1930 74 :
1931 74 : let mut locked = TENANTS.write().unwrap();
1932 :
1933 74 : let m = match &mut *locked {
1934 : TenantsMap::Initializing => {
1935 : // There is no map, this should never happen.
1936 1 : return;
1937 : }
1938 : TenantsMap::ShuttingDown(_) => {
1939 : // When we transition to shutdown, InProgress elements are removed
1940 : // from the map, so we do not need to clean up our Inprogress marker.
1941 : // See [`shutdown_all_tenants0`]
1942 UBC 0 : return;
1943 : }
1944 CBC 73 : TenantsMap::Open(m) => m,
1945 73 : };
1946 73 :
1947 73 : use std::collections::btree_map::Entry;
1948 73 : match m.entry(self.tenant_shard_id) {
1949 73 : Entry::Occupied(mut entry) => {
1950 73 : if !matches!(entry.get(), TenantSlot::InProgress(_)) {
1951 UBC 0 : METRICS.unexpected_errors.inc();
1952 0 : error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get());
1953 CBC 73 : }
1954 :
1955 73 : if self.old_value_is_shutdown() {
1956 73 : entry.remove();
1957 73 : } else {
1958 UBC 0 : entry.insert(self.old_value.take().unwrap());
1959 0 : }
1960 : }
1961 : Entry::Vacant(_) => {
1962 0 : METRICS.unexpected_errors.inc();
1963 0 : error!(
1964 0 : tenant_shard_id = %self.tenant_shard_id,
1965 0 : "Missing InProgress marker during SlotGuard drop, this is a bug."
1966 0 : );
1967 : }
1968 : }
1969 :
1970 CBC 73 : METRICS.tenant_slots.set(m.len() as u64);
1971 720 : }
1972 : }
1973 :
1974 : enum TenantSlotPeekMode {
1975 : /// In Read mode, peek will be permitted to see the slots even if the pageserver is shutting down
1976 : Read,
1977 : /// In Write mode, trying to peek at a slot while the pageserver is shutting down is an error
1978 : Write,
1979 : }
1980 :
1981 14673 : fn tenant_map_peek_slot<'a>(
1982 14673 : tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
1983 14673 : tenant_shard_id: &TenantShardId,
1984 14673 : mode: TenantSlotPeekMode,
1985 14673 : ) -> Result<Option<&'a TenantSlot>, TenantMapError> {
1986 14673 : let m = match tenants.deref() {
1987 UBC 0 : TenantsMap::Initializing => return Err(TenantMapError::StillInitializing),
1988 0 : TenantsMap::ShuttingDown(m) => match mode {
1989 0 : TenantSlotPeekMode::Read => m,
1990 : TenantSlotPeekMode::Write => {
1991 0 : return Err(TenantMapError::ShuttingDown);
1992 : }
1993 : },
1994 CBC 14673 : TenantsMap::Open(m) => m,
1995 : };
1996 :
1997 14673 : Ok(m.get(tenant_shard_id))
1998 14673 : }
1999 :
2000 : enum TenantSlotAcquireMode {
2001 : /// Acquire the slot irrespective of current state, or whether it already exists
2002 : Any,
2003 : /// Return an error if trying to acquire a slot and it doesn't already exist
2004 : MustExist,
2005 : /// Return an error if trying to acquire a slot and it already exists
2006 : MustNotExist,
2007 : }
2008 :
2009 664 : fn tenant_map_acquire_slot(
2010 664 : tenant_shard_id: &TenantShardId,
2011 664 : mode: TenantSlotAcquireMode,
2012 664 : ) -> Result<SlotGuard, TenantSlotError> {
2013 664 : tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode)
2014 664 : }
2015 :
2016 748 : fn tenant_map_acquire_slot_impl(
2017 748 : tenant_shard_id: &TenantShardId,
2018 748 : tenants: &std::sync::RwLock<TenantsMap>,
2019 748 : mode: TenantSlotAcquireMode,
2020 748 : ) -> Result<SlotGuard, TenantSlotError> {
2021 748 : use TenantSlotAcquireMode::*;
2022 748 : METRICS.tenant_slot_writes.inc();
2023 748 :
2024 748 : let mut locked = tenants.write().unwrap();
2025 748 : let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard = %tenant_shard_id.shard_slug());
2026 748 : let _guard = span.enter();
2027 :
2028 748 : let m = match &mut *locked {
2029 UBC 0 : TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
2030 0 : TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
2031 CBC 748 : TenantsMap::Open(m) => m,
2032 748 : };
2033 748 :
2034 748 : use std::collections::btree_map::Entry;
2035 748 :
2036 748 : let entry = m.entry(*tenant_shard_id);
2037 748 :
2038 748 : match entry {
2039 521 : Entry::Vacant(v) => match mode {
2040 : MustExist => {
2041 14 : tracing::debug!("Vacant && MustExist: return NotFound");
2042 14 : Err(TenantSlotError::NotFound(*tenant_shard_id))
2043 : }
2044 : _ => {
2045 507 : let (completion, barrier) = utils::completion::channel();
2046 507 : v.insert(TenantSlot::InProgress(barrier));
2047 507 : tracing::debug!("Vacant, inserted InProgress");
2048 507 : Ok(SlotGuard::new(*tenant_shard_id, None, completion))
2049 : }
2050 : },
2051 227 : Entry::Occupied(mut o) => {
2052 227 : // Apply mode-driven checks
2053 227 : match (o.get(), mode) {
2054 : (TenantSlot::InProgress(_), _) => {
2055 UBC 0 : tracing::debug!("Occupied, failing for InProgress");
2056 0 : Err(TenantSlotError::InProgress)
2057 : }
2058 CBC 14 : (slot, MustNotExist) => match slot {
2059 14 : TenantSlot::Attached(tenant) => {
2060 14 : tracing::debug!("Attached && MustNotExist, return AlreadyExists");
2061 14 : Err(TenantSlotError::AlreadyExists(
2062 14 : *tenant_shard_id,
2063 14 : tenant.current_state(),
2064 14 : ))
2065 : }
2066 : _ => {
2067 : // FIXME: the AlreadyExists error assumes that we have a Tenant
2068 : // to get the state from
2069 UBC 0 : tracing::debug!("Occupied & MustNotExist, return AlreadyExists");
2070 0 : Err(TenantSlotError::AlreadyExists(
2071 0 : *tenant_shard_id,
2072 0 : TenantState::Broken {
2073 0 : reason: "Present but not attached".to_string(),
2074 0 : backtrace: "".to_string(),
2075 0 : },
2076 0 : ))
2077 : }
2078 : },
2079 : _ => {
2080 : // Happy case: the slot was not in any state that violated our mode
2081 CBC 213 : let (completion, barrier) = utils::completion::channel();
2082 213 : let old_value = o.insert(TenantSlot::InProgress(barrier));
2083 213 : tracing::debug!("Occupied, replaced with InProgress");
2084 213 : Ok(SlotGuard::new(
2085 213 : *tenant_shard_id,
2086 213 : Some(old_value),
2087 213 : completion,
2088 213 : ))
2089 : }
2090 : }
2091 : }
2092 : }
2093 748 : }
2094 :
2095 : /// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
2096 : /// Allows to remove other tenant resources manually, via `tenant_cleanup`.
2097 : /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
2098 : /// operation would be needed to remove it.
2099 84 : async fn remove_tenant_from_memory<V, F>(
2100 84 : tenants: &std::sync::RwLock<TenantsMap>,
2101 84 : tenant_shard_id: TenantShardId,
2102 84 : tenant_cleanup: F,
2103 84 : ) -> Result<V, TenantStateError>
2104 84 : where
2105 84 : F: std::future::Future<Output = anyhow::Result<V>>,
2106 84 : {
2107 : use utils::completion;
2108 :
2109 71 : let mut slot_guard =
2110 84 : tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
2111 :
2112 : // allow pageserver shutdown to await for our completion
2113 71 : let (_guard, progress) = completion::channel();
2114 :
2115 : // The SlotGuard allows us to manipulate the Tenant object without fear of some
2116 : // concurrent API request doing something else for the same tenant ID.
2117 71 : let attached_tenant = match slot_guard.get_old_value() {
2118 66 : Some(TenantSlot::Attached(tenant)) => {
2119 66 : // whenever we remove a tenant from memory, we don't want to flush and wait for upload
2120 66 : let freeze_and_flush = false;
2121 66 :
2122 66 : // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
2123 66 : // that we can continue safely to cleanup.
2124 146 : match tenant.shutdown(progress, freeze_and_flush).await {
2125 66 : Ok(()) => {}
2126 UBC 0 : Err(_other) => {
2127 0 : // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
2128 0 : // wait for it but return an error right away because these are distinct requests.
2129 0 : slot_guard.revert();
2130 0 : return Err(TenantStateError::IsStopping(tenant_shard_id.tenant_id));
2131 : }
2132 : }
2133 CBC 66 : Some(tenant)
2134 : }
2135 5 : Some(TenantSlot::Secondary(secondary_state)) => {
2136 5 : tracing::info!("Shutting down in secondary mode");
2137 5 : secondary_state.shutdown().await;
2138 5 : None
2139 : }
2140 : Some(TenantSlot::InProgress(_)) => {
2141 : // Acquiring a slot guarantees its old value was not InProgress
2142 UBC 0 : unreachable!();
2143 : }
2144 0 : None => None,
2145 : };
2146 :
2147 CBC 71 : match tenant_cleanup
2148 199 : .await
2149 71 : .with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
2150 : {
2151 71 : Ok(hook_value) => {
2152 71 : // Success: drop the old TenantSlot::Attached.
2153 71 : slot_guard
2154 71 : .drop_old_value()
2155 71 : .expect("We just called shutdown");
2156 71 :
2157 71 : Ok(hook_value)
2158 : }
2159 UBC 0 : Err(e) => {
2160 : // If we had a Tenant, set it to Broken and put it back in the TenantsMap
2161 0 : if let Some(attached_tenant) = attached_tenant {
2162 0 : attached_tenant.set_broken(e.to_string()).await;
2163 0 : }
2164 : // Leave the broken tenant in the map
2165 0 : slot_guard.revert();
2166 0 :
2167 0 : Err(TenantStateError::Other(e))
2168 : }
2169 : }
2170 CBC 84 : }
2171 :
2172 : use {
2173 : crate::repository::GcResult, pageserver_api::models::TimelineGcRequest,
2174 : utils::http::error::ApiError,
2175 : };
2176 :
2177 322 : pub(crate) async fn immediate_gc(
2178 322 : tenant_shard_id: TenantShardId,
2179 322 : timeline_id: TimelineId,
2180 322 : gc_req: TimelineGcRequest,
2181 322 : cancel: CancellationToken,
2182 322 : ctx: &RequestContext,
2183 322 : ) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
2184 322 : let guard = TENANTS.read().unwrap();
2185 :
2186 322 : let tenant = guard
2187 322 : .get(&tenant_shard_id)
2188 322 : .map(Arc::clone)
2189 322 : .with_context(|| format!("tenant {tenant_shard_id}"))
2190 322 : .map_err(|e| ApiError::NotFound(e.into()))?;
2191 :
2192 321 : let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
2193 321 : // Use tenant's pitr setting
2194 321 : let pitr = tenant.get_pitr_interval();
2195 321 :
2196 321 : // Run in task_mgr to avoid race with tenant_detach operation
2197 321 : let ctx = ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
2198 321 : let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
2199 321 : // TODO: spawning is redundant now, need to hold the gate
2200 321 : task_mgr::spawn(
2201 321 : &tokio::runtime::Handle::current(),
2202 321 : TaskKind::GarbageCollector,
2203 321 : Some(tenant_shard_id),
2204 321 : Some(timeline_id),
2205 321 : &format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"),
2206 321 : false,
2207 321 : async move {
2208 UBC 0 : fail::fail_point!("immediate_gc_task_pre");
2209 :
2210 : #[allow(unused_mut)]
2211 CBC 321 : let mut result = tenant
2212 321 : .gc_iteration(Some(timeline_id), gc_horizon, pitr, &cancel, &ctx)
2213 321 : .instrument(info_span!("manual_gc", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id))
2214 274 : .await;
2215 : // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
2216 : // better once the types support it.
2217 :
2218 : #[cfg(feature = "testing")]
2219 : {
2220 321 : if let Ok(result) = result.as_mut() {
2221 : // why not futures unordered? it seems it needs very much the same task structure
2222 : // but would only run on single task.
2223 320 : let mut js = tokio::task::JoinSet::new();
2224 702 : for layer in std::mem::take(&mut result.doomed_layers) {
2225 702 : js.spawn(layer.wait_drop());
2226 702 : }
2227 320 : tracing::info!(total = js.len(), "starting to wait for the gc'd layers to be dropped");
2228 1022 : while let Some(res) = js.join_next().await {
2229 702 : res.expect("wait_drop should not panic");
2230 702 : }
2231 1 : }
2232 :
2233 321 : let timeline = tenant.get_timeline(timeline_id, false).ok();
2234 321 : let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref());
2235 :
2236 321 : if let Some(rtc) = rtc {
2237 : // layer drops schedule actions on remote timeline client to actually do the
2238 : // deletions; don't care just exit fast about the shutdown error
2239 320 : drop(rtc.wait_completion().await);
2240 1 : }
2241 : }
2242 :
2243 321 : match task_done.send(result) {
2244 321 : Ok(_) => (),
2245 UBC 0 : Err(result) => error!("failed to send gc result: {result:?}"),
2246 : }
2247 CBC 321 : Ok(())
2248 321 : }
2249 321 : );
2250 321 :
2251 321 : // drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
2252 321 : drop(guard);
2253 321 :
2254 321 : Ok(wait_task_done)
2255 322 : }
2256 :
2257 : #[cfg(test)]
2258 : mod tests {
2259 : use pageserver_api::shard::TenantShardId;
2260 : use std::collections::BTreeMap;
2261 : use std::sync::Arc;
2262 : use tracing::{info_span, Instrument};
2263 :
2264 : use crate::tenant::mgr::TenantSlot;
2265 :
2266 : use super::{super::harness::TenantHarness, TenantsMap};
2267 :
2268 1 : #[tokio::test(start_paused = true)]
2269 1 : async fn shutdown_awaits_in_progress_tenant() {
2270 : // Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully
2271 : // wait for it to complete before proceeding.
2272 :
2273 1 : let (t, _ctx) = TenantHarness::create("shutdown_awaits_in_progress_tenant")
2274 1 : .unwrap()
2275 1 : .load()
2276 1 : .await;
2277 :
2278 : // harness loads it to active, which is forced and nothing is running on the tenant
2279 :
2280 1 : let id = TenantShardId::unsharded(t.tenant_id());
2281 :
2282 : // tenant harness configures the logging and we cannot escape it
2283 1 : let _e = info_span!("testing", tenant_id = %id).entered();
2284 1 :
2285 1 : let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
2286 1 : let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
2287 1 :
2288 1 : // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
2289 1 : // permit it to proceed: that will stick the tenant in InProgress
2290 1 :
2291 1 : let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
2292 1 : let (until_cleanup_started, cleanup_started) = utils::completion::channel();
2293 1 : let mut remove_tenant_from_memory_task = {
2294 1 : let jh = tokio::spawn({
2295 1 : let tenants = tenants.clone();
2296 1 : async move {
2297 1 : let cleanup = async move {
2298 1 : drop(until_cleanup_started);
2299 1 : can_complete_cleanup.wait().await;
2300 1 : anyhow::Ok(())
2301 1 : };
2302 1 : super::remove_tenant_from_memory(&tenants, id, cleanup).await
2303 1 : }
2304 1 : .instrument(info_span!("foobar", tenant_id = %id))
2305 : });
2306 :
2307 : // now the long cleanup should be in place, with the stopping state
2308 1 : cleanup_started.wait().await;
2309 1 : jh
2310 : };
2311 :
2312 1 : let mut shutdown_task = {
2313 1 : let (until_shutdown_started, shutdown_started) = utils::completion::channel();
2314 1 :
2315 1 : let shutdown_task = tokio::spawn(async move {
2316 1 : drop(until_shutdown_started);
2317 2 : super::shutdown_all_tenants0(&tenants).await;
2318 1 : });
2319 1 :
2320 1 : shutdown_started.wait().await;
2321 1 : shutdown_task
2322 1 : };
2323 1 :
2324 1 : let long_time = std::time::Duration::from_secs(15);
2325 2 : tokio::select! {
2326 2 : _ = &mut shutdown_task => unreachable!("shutdown should block on remove_tenant_from_memory completing"),
2327 2 : _ = &mut remove_tenant_from_memory_task => unreachable!("remove_tenant_from_memory_task should not complete until explicitly unblocked"),
2328 2 : _ = tokio::time::sleep(long_time) => {},
2329 2 : }
2330 :
2331 1 : drop(until_cleanup_completed);
2332 1 :
2333 1 : // Now that we allow it to proceed, shutdown should complete immediately
2334 1 : remove_tenant_from_memory_task.await.unwrap().unwrap();
2335 1 : shutdown_task.await.unwrap();
2336 : }
2337 : }
|