TLA Line data Source code
1 : //! This module acts as a switchboard to access different repositories managed by this
2 : //! page server.
3 :
4 : use camino::{Utf8Path, Utf8PathBuf};
5 : use rand::{distributions::Alphanumeric, Rng};
6 : use std::collections::{hash_map, HashMap};
7 : use std::sync::Arc;
8 : use tokio::fs;
9 :
10 : use anyhow::Context;
11 : use once_cell::sync::Lazy;
12 : use tokio::sync::RwLock;
13 : use tokio::task::JoinSet;
14 : use tokio_util::sync::CancellationToken;
15 : use tracing::*;
16 :
17 : use remote_storage::GenericRemoteStorage;
18 : use utils::crashsafe;
19 :
20 : use crate::config::PageServerConf;
21 : use crate::context::{DownloadBehavior, RequestContext};
22 : use crate::control_plane_client::{
23 : ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError,
24 : };
25 : use crate::deletion_queue::DeletionQueueClient;
26 : use crate::task_mgr::{self, TaskKind};
27 : use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
28 : use crate::tenant::delete::DeleteTenantFlow;
29 : use crate::tenant::{
30 : create_tenant_files, AttachMarkerMode, AttachedTenantConf, CreateTenantFilesMode, Tenant,
31 : TenantState,
32 : };
33 : use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
34 :
35 : use utils::crashsafe::path_with_suffix_extension;
36 : use utils::fs_ext::PathExt;
37 : use utils::generation::Generation;
38 : use utils::id::{TenantId, TimelineId};
39 :
40 : use super::delete::DeleteTenantError;
41 : use super::timeline::delete::DeleteTimelineFlow;
42 : use super::TenantSharedResources;
43 :
44 : /// For a tenant that appears in TenantsMap, it may either be
45 : /// - `Attached`: has a full Tenant object, is elegible to service
46 : /// reads and ingest WAL.
47 : /// - `Secondary`: is only keeping a local cache warm.
48 : ///
49 : /// Secondary is a totally distinct state rather than being a mode of a `Tenant`, because
50 : /// that way we avoid having to carefully switch a tenant's ingestion etc on and off during
51 : /// its lifetime, and we can preserve some important safety invariants like `Tenant` always
52 : /// having a properly acquired generation (Secondary doesn't need a generation)
53 CBC 156 : #[derive(Clone)]
54 : pub(crate) enum TenantSlot {
55 : Attached(Arc<Tenant>),
56 : Secondary,
57 : }
58 :
59 : impl TenantSlot {
60 : /// Return the `Tenant` in this slot if attached, else None
61 10296 : fn get_attached(&self) -> Option<&Arc<Tenant>> {
62 10296 : match self {
63 10296 : Self::Attached(t) => Some(t),
64 UBC 0 : Self::Secondary => None,
65 : }
66 CBC 10296 : }
67 :
68 : /// Consume self and return the `Tenant` that was in this slot if attached, else None
69 98 : fn into_attached(self) -> Option<Arc<Tenant>> {
70 98 : match self {
71 98 : Self::Attached(t) => Some(t),
72 UBC 0 : Self::Secondary => None,
73 : }
74 CBC 98 : }
75 : }
76 :
77 : /// The tenants known to the pageserver.
78 : /// The enum variants are used to distinguish the different states that the pageserver can be in.
79 : pub(crate) enum TenantsMap {
80 : /// [`init_tenant_mgr`] is not done yet.
81 : Initializing,
82 : /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
83 : /// New tenants can be added using [`tenant_map_insert`].
84 : Open(HashMap<TenantId, TenantSlot>),
85 : /// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
86 : /// Existing tenants are still accessible, but no new tenants can be created.
87 : ShuttingDown(HashMap<TenantId, TenantSlot>),
88 : }
89 :
90 : impl TenantsMap {
91 : /// Convenience function for typical usage, where we want to get a `Tenant` object, for
92 : /// working with attached tenants. If the TenantId is in the map but in Secondary state,
93 : /// None is returned.
94 10368 : pub(crate) fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
95 10368 : match self {
96 UBC 0 : TenantsMap::Initializing => None,
97 CBC 10368 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
98 10368 : m.get(tenant_id).and_then(TenantSlot::get_attached)
99 : }
100 : }
101 10368 : }
102 :
103 : /// Get the contents of the map at this tenant ID, even if it is in secondary state.
104 46 : pub(crate) fn get_slot(&self, tenant_id: &TenantId) -> Option<&TenantSlot> {
105 46 : match self {
106 UBC 0 : TenantsMap::Initializing => None,
107 CBC 46 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id),
108 : }
109 46 : }
110 98 : pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
111 98 : match self {
112 UBC 0 : TenantsMap::Initializing => None,
113 CBC 97 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
114 98 : m.remove(tenant_id).and_then(TenantSlot::into_attached)
115 : }
116 : }
117 98 : }
118 : }
119 :
120 : /// This is "safe" in that that it won't leave behind a partially deleted directory
121 : /// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting
122 : /// the contents.
123 : ///
124 : /// This is pageserver-specific, as it relies on future processes after a crash to check
125 : /// for TEMP_FILE_SUFFIX when loading things.
126 2 : async fn safe_remove_tenant_dir_all(path: impl AsRef<Utf8Path>) -> std::io::Result<()> {
127 6 : let tmp_path = safe_rename_tenant_dir(path).await?;
128 2 : fs::remove_dir_all(tmp_path).await
129 2 : }
130 :
131 37 : async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<Utf8PathBuf> {
132 37 : let parent = path
133 37 : .as_ref()
134 37 : .parent()
135 37 : // It is invalid to call this function with a relative path. Tenant directories
136 37 : // should always have a parent.
137 37 : .ok_or(std::io::Error::new(
138 37 : std::io::ErrorKind::InvalidInput,
139 37 : "Path must be absolute",
140 37 : ))?;
141 37 : let rand_suffix = rand::thread_rng()
142 37 : .sample_iter(&Alphanumeric)
143 37 : .take(8)
144 37 : .map(char::from)
145 37 : .collect::<String>()
146 37 : + TEMP_FILE_SUFFIX;
147 37 : let tmp_path = path_with_suffix_extension(&path, &rand_suffix);
148 37 : fs::rename(path.as_ref(), &tmp_path).await?;
149 37 : fs::File::open(parent).await?.sync_all().await?;
150 37 : Ok(tmp_path)
151 37 : }
152 :
153 560 : static TENANTS: Lazy<RwLock<TenantsMap>> = Lazy::new(|| RwLock::new(TenantsMap::Initializing));
154 :
155 : /// Create a directory, including parents. This does no fsyncs and makes
156 : /// no guarantees about the persistence of the resulting metadata: for
157 : /// use when creating dirs for use as cache.
158 UBC 0 : async fn unsafe_create_dir_all(path: &Utf8PathBuf) -> std::io::Result<()> {
159 0 : let mut dirs_to_create = Vec::new();
160 0 : let mut path: &Utf8Path = path.as_ref();
161 :
162 : // Figure out which directories we need to create.
163 : loop {
164 0 : let meta = tokio::fs::metadata(path).await;
165 0 : match meta {
166 0 : Ok(metadata) if metadata.is_dir() => break,
167 : Ok(_) => {
168 0 : return Err(std::io::Error::new(
169 0 : std::io::ErrorKind::AlreadyExists,
170 0 : format!("non-directory found in path: {path}"),
171 0 : ));
172 : }
173 0 : Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {}
174 0 : Err(e) => return Err(e),
175 : }
176 :
177 0 : dirs_to_create.push(path);
178 0 :
179 0 : match path.parent() {
180 0 : Some(parent) => path = parent,
181 : None => {
182 0 : return Err(std::io::Error::new(
183 0 : std::io::ErrorKind::InvalidInput,
184 0 : format!("can't find parent of path '{path}'"),
185 0 : ));
186 : }
187 : }
188 : }
189 :
190 : // Create directories from parent to child.
191 0 : for &path in dirs_to_create.iter().rev() {
192 0 : tokio::fs::create_dir(path).await?;
193 : }
194 :
195 0 : Ok(())
196 0 : }
197 :
198 CBC 1 : fn emergency_generations(
199 1 : tenant_confs: &HashMap<TenantId, anyhow::Result<LocationConf>>,
200 1 : ) -> HashMap<TenantId, Generation> {
201 1 : tenant_confs
202 1 : .iter()
203 1 : .filter_map(|(tid, lc)| {
204 1 : let lc = match lc {
205 1 : Ok(lc) => lc,
206 UBC 0 : Err(_) => return None,
207 : };
208 CBC 1 : let gen = match &lc.mode {
209 1 : LocationMode::Attached(alc) => Some(alc.generation),
210 UBC 0 : LocationMode::Secondary(_) => None,
211 : };
212 :
213 CBC 1 : gen.map(|g| (*tid, g))
214 1 : })
215 1 : .collect()
216 1 : }
217 :
218 560 : async fn init_load_generations(
219 560 : conf: &'static PageServerConf,
220 560 : tenant_confs: &HashMap<TenantId, anyhow::Result<LocationConf>>,
221 560 : resources: &TenantSharedResources,
222 560 : cancel: &CancellationToken,
223 560 : ) -> anyhow::Result<Option<HashMap<TenantId, Generation>>> {
224 560 : let generations = if conf.control_plane_emergency_mode {
225 1 : error!(
226 1 : "Emergency mode! Tenants will be attached unsafely using their last known generation"
227 1 : );
228 1 : emergency_generations(tenant_confs)
229 559 : } else if let Some(client) = ControlPlaneClient::new(conf, cancel) {
230 30 : info!("Calling control plane API to re-attach tenants");
231 : // If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
232 88 : match client.re_attach().await {
233 30 : Ok(tenants) => tenants,
234 : Err(RetryForeverError::ShuttingDown) => {
235 UBC 0 : anyhow::bail!("Shut down while waiting for control plane re-attach response")
236 : }
237 : }
238 : } else {
239 CBC 529 : info!("Control plane API not configured, tenant generations are disabled");
240 529 : return Ok(None);
241 : };
242 :
243 : // The deletion queue needs to know about the startup attachment state to decide which (if any) stored
244 : // deletion list entries may still be valid. We provide that by pushing a recovery operation into
245 : // the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions
246 : // are processed, even though we don't block on recovery completing here.
247 : //
248 : // Must only do this if remote storage is enabled, otherwise deletion queue
249 : // is not running and channel push will fail.
250 31 : if resources.remote_storage.is_some() {
251 31 : resources
252 31 : .deletion_queue_client
253 31 : .recover(generations.clone())?;
254 UBC 0 : }
255 :
256 CBC 31 : Ok(Some(generations))
257 560 : }
258 :
259 : /// Initial stage of load: walk the local tenants directory, clean up any temp files,
260 : /// and load configurations for the tenants we found.
261 560 : async fn init_load_tenant_configs(
262 560 : conf: &'static PageServerConf,
263 560 : ) -> anyhow::Result<HashMap<TenantId, anyhow::Result<LocationConf>>> {
264 560 : let tenants_dir = conf.tenants_path();
265 :
266 560 : let mut dir_entries = tenants_dir
267 560 : .read_dir_utf8()
268 560 : .with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
269 :
270 560 : let mut configs = HashMap::new();
271 :
272 : loop {
273 775 : match dir_entries.next() {
274 560 : None => break,
275 215 : Some(Ok(dentry)) => {
276 215 : let tenant_dir_path = dentry.path().to_path_buf();
277 215 : if crate::is_temporary(&tenant_dir_path) {
278 UBC 0 : info!("Found temporary tenant directory, removing: {tenant_dir_path}");
279 : // No need to use safe_remove_tenant_dir_all because this is already
280 : // a temporary path
281 0 : if let Err(e) = fs::remove_dir_all(&tenant_dir_path).await {
282 0 : error!(
283 0 : "Failed to remove temporary directory '{}': {:?}",
284 0 : tenant_dir_path, e
285 0 : );
286 0 : }
287 0 : continue;
288 CBC 215 : }
289 :
290 : // This case happens if we:
291 : // * crash during attach before creating the attach marker file
292 : // * crash during tenant delete before removing tenant directory
293 215 : let is_empty = tenant_dir_path.is_empty_dir().with_context(|| {
294 UBC 0 : format!("Failed to check whether {tenant_dir_path:?} is an empty dir")
295 CBC 215 : })?;
296 215 : if is_empty {
297 2 : info!("removing empty tenant directory {tenant_dir_path:?}");
298 2 : if let Err(e) = fs::remove_dir(&tenant_dir_path).await {
299 UBC 0 : error!(
300 0 : "Failed to remove empty tenant directory '{}': {e:#}",
301 0 : tenant_dir_path
302 0 : )
303 CBC 2 : }
304 2 : continue;
305 213 : }
306 213 :
307 213 : let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
308 213 : if tenant_ignore_mark_file.exists() {
309 1 : info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
310 1 : continue;
311 212 : }
312 :
313 212 : let tenant_id = match tenant_dir_path
314 212 : .file_name()
315 212 : .unwrap_or_default()
316 212 : .parse::<TenantId>()
317 : {
318 212 : Ok(id) => id,
319 : Err(_) => {
320 UBC 0 : warn!(
321 0 : "Invalid tenant path (garbage in our repo directory?): {tenant_dir_path}",
322 0 : );
323 0 : continue;
324 : }
325 : };
326 :
327 CBC 212 : configs.insert(tenant_id, Tenant::load_tenant_config(conf, &tenant_id));
328 : }
329 UBC 0 : Some(Err(e)) => {
330 0 : // An error listing the top level directory indicates serious problem
331 0 : // with local filesystem: we will fail to load, and fail to start.
332 0 : anyhow::bail!(e);
333 : }
334 : }
335 : }
336 CBC 560 : Ok(configs)
337 560 : }
338 :
339 : /// Initialize repositories with locally available timelines.
340 : /// Timelines that are only partially available locally (remote storage has more data than this pageserver)
341 : /// are scheduled for download and added to the tenant once download is completed.
342 1680 : #[instrument(skip_all)]
343 : pub async fn init_tenant_mgr(
344 : conf: &'static PageServerConf,
345 : resources: TenantSharedResources,
346 : init_order: InitializationOrder,
347 : cancel: CancellationToken,
348 : ) -> anyhow::Result<()> {
349 : let mut tenants = HashMap::new();
350 :
351 : let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
352 :
353 : // Scan local filesystem for attached tenants
354 : let tenant_configs = init_load_tenant_configs(conf).await?;
355 :
356 : // Determine which tenants are to be attached
357 : let tenant_generations =
358 : init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
359 :
360 : // Construct `Tenant` objects and start them running
361 : for (tenant_id, location_conf) in tenant_configs {
362 : let tenant_dir_path = conf.tenant_path(&tenant_id);
363 :
364 : let mut location_conf = match location_conf {
365 : Ok(l) => l,
366 : Err(e) => {
367 UBC 0 : warn!(%tenant_id, "Marking tenant broken, failed to {e:#}");
368 :
369 : tenants.insert(
370 : tenant_id,
371 : TenantSlot::Attached(Tenant::create_broken_tenant(
372 : conf,
373 : tenant_id,
374 : format!("{}", e),
375 : )),
376 : );
377 : continue;
378 : }
379 : };
380 :
381 : let generation = if let Some(generations) = &tenant_generations {
382 : // We have a generation map: treat it as the authority for whether
383 : // this tenant is really attached.
384 : if let Some(gen) = generations.get(&tenant_id) {
385 : *gen
386 : } else {
387 : match &location_conf.mode {
388 : LocationMode::Secondary(_) => {
389 : // We do not require the control plane's permission for secondary mode
390 : // tenants, because they do no remote writes and hence require no
391 : // generation number
392 0 : info!(%tenant_id, "Loaded tenant in secondary mode");
393 : tenants.insert(tenant_id, TenantSlot::Secondary);
394 : }
395 : LocationMode::Attached(_) => {
396 : // TODO: augment re-attach API to enable the control plane to
397 : // instruct us about secondary attachments. That way, instead of throwing
398 : // away local state, we can gracefully fall back to secondary here, if the control
399 : // plane tells us so.
400 : // (https://github.com/neondatabase/neon/issues/5377)
401 CBC 2 : info!(%tenant_id, "Detaching tenant, control plane omitted it in re-attach response");
402 : if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await {
403 UBC 0 : error!(%tenant_id,
404 0 : "Failed to remove detached tenant directory '{tenant_dir_path}': {e:?}",
405 0 : );
406 : }
407 : }
408 : };
409 :
410 : continue;
411 : }
412 : } else {
413 : // Legacy mode: no generation information, any tenant present
414 : // on local disk may activate
415 CBC 197 : info!(%tenant_id, "Starting tenant in legacy mode, no generation",);
416 : Generation::none()
417 : };
418 :
419 : // Presence of a generation number implies attachment: attach the tenant
420 : // if it wasn't already, and apply the generation number.
421 : location_conf.attach_in_generation(generation);
422 : Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?;
423 :
424 : match schedule_local_tenant_processing(
425 : conf,
426 : tenant_id,
427 : &tenant_dir_path,
428 : AttachedTenantConf::try_from(location_conf)?,
429 : resources.clone(),
430 : Some(init_order.clone()),
431 : &TENANTS,
432 : &ctx,
433 : ) {
434 : Ok(tenant) => {
435 : tenants.insert(tenant.tenant_id(), TenantSlot::Attached(tenant));
436 : }
437 : Err(e) => {
438 UBC 0 : error!(%tenant_id, "Failed to start tenant: {e:#}");
439 : }
440 : }
441 : }
442 :
443 CBC 560 : info!("Processed {} local tenants at startup", tenants.len());
444 :
445 : let mut tenants_map = TENANTS.write().await;
446 : assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
447 : *tenants_map = TenantsMap::Open(tenants);
448 : Ok(())
449 : }
450 :
451 : #[allow(clippy::too_many_arguments)]
452 706 : pub(crate) fn schedule_local_tenant_processing(
453 706 : conf: &'static PageServerConf,
454 706 : tenant_id: TenantId,
455 706 : tenant_path: &Utf8Path,
456 706 : location_conf: AttachedTenantConf,
457 706 : resources: TenantSharedResources,
458 706 : init_order: Option<InitializationOrder>,
459 706 : tenants: &'static tokio::sync::RwLock<TenantsMap>,
460 706 : ctx: &RequestContext,
461 706 : ) -> anyhow::Result<Arc<Tenant>> {
462 706 : anyhow::ensure!(
463 706 : tenant_path.is_dir(),
464 UBC 0 : "Cannot load tenant from path {tenant_path:?}, it either does not exist or not a directory"
465 : );
466 CBC 706 : anyhow::ensure!(
467 706 : !crate::is_temporary(tenant_path),
468 UBC 0 : "Cannot load tenant from temporary path {tenant_path:?}"
469 : );
470 : anyhow::ensure!(
471 CBC 706 : !tenant_path.is_empty_dir().with_context(|| {
472 UBC 0 : format!("Failed to check whether {tenant_path:?} is an empty dir")
473 CBC 706 : })?,
474 UBC 0 : "Cannot load tenant from empty directory {tenant_path:?}"
475 : );
476 :
477 CBC 706 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
478 706 : anyhow::ensure!(
479 706 : !conf.tenant_ignore_mark_file_path(&tenant_id).exists(),
480 UBC 0 : "Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
481 : );
482 :
483 CBC 706 : let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() {
484 48 : info!("tenant {tenant_id} has attaching mark file, resuming its attach operation");
485 48 : if resources.remote_storage.is_none() {
486 UBC 0 : warn!("tenant {tenant_id} has attaching mark file, but pageserver has no remote storage configured");
487 0 : Tenant::create_broken_tenant(
488 0 : conf,
489 0 : tenant_id,
490 0 : "attaching mark file present but no remote storage configured".to_string(),
491 0 : )
492 : } else {
493 CBC 48 : match Tenant::spawn_attach(
494 48 : conf,
495 48 : tenant_id,
496 48 : resources,
497 48 : location_conf,
498 48 : tenants,
499 48 : AttachMarkerMode::Expect,
500 48 : ctx,
501 48 : ) {
502 48 : Ok(tenant) => tenant,
503 UBC 0 : Err(e) => {
504 0 : error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
505 0 : Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"))
506 : }
507 : }
508 : }
509 : } else {
510 CBC 658 : info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
511 : // Start loading the tenant into memory. It will initially be in Loading state.
512 658 : Tenant::spawn_load(
513 658 : conf,
514 658 : tenant_id,
515 658 : location_conf,
516 658 : resources,
517 658 : init_order,
518 658 : tenants,
519 658 : ctx,
520 658 : )
521 : };
522 706 : Ok(tenant)
523 706 : }
524 :
525 : ///
526 : /// Shut down all tenants. This runs as part of pageserver shutdown.
527 : ///
528 : /// NB: We leave the tenants in the map, so that they remain accessible through
529 : /// the management API until we shut it down. If we removed the shut-down tenants
530 : /// from the tenants map, the management API would return 404 for these tenants,
531 : /// because TenantsMap::get() now returns `None`.
532 : /// That could be easily misinterpreted by control plane, the consumer of the
533 : /// management API. For example, it could attach the tenant on a different pageserver.
534 : /// We would then be in split-brain once this pageserver restarts.
535 432 : #[instrument(skip_all)]
536 : pub(crate) async fn shutdown_all_tenants() {
537 : shutdown_all_tenants0(&TENANTS).await
538 : }
539 :
540 145 : async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock<TenantsMap>) {
541 : use utils::completion;
542 :
543 : // Prevent new tenants from being created.
544 145 : let tenants_to_shut_down = {
545 145 : let mut m = tenants.write().await;
546 145 : match &mut *m {
547 : TenantsMap::Initializing => {
548 UBC 0 : *m = TenantsMap::ShuttingDown(HashMap::default());
549 0 : info!("tenants map is empty");
550 0 : return;
551 : }
552 CBC 145 : TenantsMap::Open(tenants) => {
553 145 : let tenants_clone = tenants.clone();
554 145 : *m = TenantsMap::ShuttingDown(std::mem::take(tenants));
555 145 : tenants_clone
556 : }
557 : TenantsMap::ShuttingDown(_) => {
558 : // TODO: it is possible that detach and shutdown happen at the same time. as a
559 : // result, during shutdown we do not wait for detach.
560 UBC 0 : error!("already shutting down, this function isn't supposed to be called more than once");
561 0 : return;
562 : }
563 : }
564 : };
565 :
566 CBC 145 : let started_at = std::time::Instant::now();
567 145 : let mut join_set = JoinSet::new();
568 301 : for (tenant_id, tenant) in tenants_to_shut_down {
569 156 : join_set.spawn(
570 156 : async move {
571 156 : let freeze_and_flush = true;
572 :
573 156 : let res = {
574 156 : let (_guard, shutdown_progress) = completion::channel();
575 156 : match tenant {
576 156 : TenantSlot::Attached(t) => {
577 464 : t.shutdown(shutdown_progress, freeze_and_flush).await
578 : }
579 : TenantSlot::Secondary => {
580 : // TODO: once secondary mode downloads are implemented,
581 : // ensure they have all stopped before we reach this point.
582 UBC 0 : Ok(())
583 : }
584 : }
585 : };
586 :
587 CBC 156 : if let Err(other_progress) = res {
588 : // join the another shutdown in progress
589 1 : other_progress.wait().await;
590 155 : }
591 :
592 : // we cannot afford per tenant logging here, because if s3 is degraded, we are
593 : // going to log too many lines
594 :
595 156 : debug!("tenant successfully stopped");
596 156 : }
597 156 : .instrument(info_span!("shutdown", %tenant_id)),
598 : );
599 : }
600 :
601 145 : let total = join_set.len();
602 145 : let mut panicked = 0;
603 145 : let mut buffering = true;
604 145 : const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
605 145 : let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
606 :
607 306 : while !join_set.is_empty() {
608 161 : tokio::select! {
609 156 : Some(joined) = join_set.join_next() => {
610 : match joined {
611 : Ok(()) => {}
612 : Err(join_error) if join_error.is_cancelled() => {
613 : unreachable!("we are not cancelling any of the futures");
614 : }
615 : Err(join_error) if join_error.is_panic() => {
616 : // cannot really do anything, as this panic is likely a bug
617 : panicked += 1;
618 : }
619 : Err(join_error) => {
620 UBC 0 : warn!("unknown kind of JoinError: {join_error}");
621 : }
622 : }
623 : if !buffering {
624 : // buffer so that every 500ms since the first update (or starting) we'll log
625 : // how far away we are; this is because we will get SIGKILL'd at 10s, and we
626 : // are not able to log *then*.
627 : buffering = true;
628 : buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
629 : }
630 : },
631 : _ = &mut buffered, if buffering => {
632 : buffering = false;
633 CBC 5 : info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
634 : }
635 : }
636 : }
637 :
638 145 : if panicked > 0 {
639 UBC 0 : warn!(
640 0 : panicked,
641 0 : total, "observed panicks while shutting down tenants"
642 0 : );
643 CBC 145 : }
644 :
645 : // caller will log how long we took
646 145 : }
647 :
648 450 : pub(crate) async fn create_tenant(
649 450 : conf: &'static PageServerConf,
650 450 : tenant_conf: TenantConfOpt,
651 450 : tenant_id: TenantId,
652 450 : generation: Generation,
653 450 : resources: TenantSharedResources,
654 450 : ctx: &RequestContext,
655 450 : ) -> Result<Arc<Tenant>, TenantMapInsertError> {
656 450 : tenant_map_insert(tenant_id, || async {
657 450 :
658 450 : let location_conf = LocationConf::attached_single(tenant_conf, generation);
659 :
660 : // We're holding the tenants lock in write mode while doing local IO.
661 : // If this section ever becomes contentious, introduce a new `TenantState::Creating`
662 : // and do the work in that state.
663 450 : let tenant_directory = super::create_tenant_files(conf, &location_conf, &tenant_id, CreateTenantFilesMode::Create).await?;
664 : // TODO: tenant directory remains on disk if we bail out from here on.
665 : // See https://github.com/neondatabase/neon/issues/4233
666 :
667 449 : let created_tenant =
668 449 : schedule_local_tenant_processing(conf, tenant_id, &tenant_directory,
669 449 : AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)?;
670 : // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
671 : // See https://github.com/neondatabase/neon/issues/4233
672 :
673 449 : let crated_tenant_id = created_tenant.tenant_id();
674 449 : anyhow::ensure!(
675 449 : tenant_id == crated_tenant_id,
676 UBC 0 : "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {crated_tenant_id})",
677 : );
678 CBC 449 : Ok(created_tenant)
679 450 : }).await
680 450 : }
681 :
682 UBC 0 : #[derive(Debug, thiserror::Error)]
683 : pub(crate) enum SetNewTenantConfigError {
684 : #[error(transparent)]
685 : GetTenant(#[from] GetTenantError),
686 : #[error(transparent)]
687 : Persist(anyhow::Error),
688 : }
689 :
690 CBC 27 : pub(crate) async fn set_new_tenant_config(
691 27 : conf: &'static PageServerConf,
692 27 : new_tenant_conf: TenantConfOpt,
693 27 : tenant_id: TenantId,
694 27 : ) -> Result<(), SetNewTenantConfigError> {
695 27 : info!("configuring tenant {tenant_id}");
696 27 : let tenant = get_tenant(tenant_id, true).await?;
697 :
698 : // This is a legacy API that only operates on attached tenants: the preferred
699 : // API to use is the location_config/ endpoint, which lets the caller provide
700 : // the full LocationConf.
701 27 : let location_conf = LocationConf::attached_single(new_tenant_conf, tenant.generation);
702 27 :
703 27 : Tenant::persist_tenant_config(conf, &tenant_id, &location_conf)
704 UBC 0 : .await
705 CBC 27 : .map_err(SetNewTenantConfigError::Persist)?;
706 27 : tenant.set_new_tenant_config(new_tenant_conf);
707 27 : Ok(())
708 27 : }
709 :
710 UBC 0 : #[instrument(skip_all, fields(%tenant_id))]
711 : pub(crate) async fn upsert_location(
712 : conf: &'static PageServerConf,
713 : tenant_id: TenantId,
714 : new_location_config: LocationConf,
715 : broker_client: storage_broker::BrokerClientChannel,
716 : remote_storage: Option<GenericRemoteStorage>,
717 : deletion_queue_client: DeletionQueueClient,
718 : ctx: &RequestContext,
719 : ) -> Result<(), anyhow::Error> {
720 0 : info!("configuring tenant location {tenant_id} to state {new_location_config:?}");
721 :
722 : let mut existing_tenant = match get_tenant(tenant_id, false).await {
723 : Ok(t) => Some(t),
724 : Err(GetTenantError::NotFound(_)) => None,
725 : Err(e) => anyhow::bail!(e),
726 : };
727 :
728 : // If we need to shut down a Tenant, do that first
729 : let shutdown_tenant = match (&new_location_config.mode, &existing_tenant) {
730 : (LocationMode::Secondary(_), Some(t)) => Some(t),
731 : (LocationMode::Attached(attach_conf), Some(t)) => {
732 : if attach_conf.generation != t.generation {
733 : Some(t)
734 : } else {
735 : None
736 : }
737 : }
738 : _ => None,
739 : };
740 :
741 : // TODO: currently we risk concurrent operations interfering with the tenant
742 : // while we await shutdown, but we also should not hold the TenantsMap lock
743 : // across the whole operation. Before we start using this function in production,
744 : // a follow-on change will revise how concurrency is handled in TenantsMap.
745 : // (https://github.com/neondatabase/neon/issues/5378)
746 :
747 : if let Some(tenant) = shutdown_tenant {
748 : let (_guard, progress) = utils::completion::channel();
749 :
750 : match tenant.get_attach_mode() {
751 : AttachmentMode::Single | AttachmentMode::Multi => {
752 : // Before we leave our state as the presumed holder of the latest generation,
753 : // flush any outstanding deletions to reduce the risk of leaking objects.
754 : deletion_queue_client.flush_advisory()
755 : }
756 : AttachmentMode::Stale => {
757 : // If we're stale there's not point trying to flush deletions
758 : }
759 : };
760 :
761 0 : info!("Shutting down attached tenant");
762 : match tenant.shutdown(progress, false).await {
763 : Ok(()) => {}
764 : Err(barrier) => {
765 0 : info!("Shutdown already in progress, waiting for it to complete");
766 : barrier.wait().await;
767 : }
768 : }
769 : existing_tenant = None;
770 : }
771 :
772 : if let Some(tenant) = existing_tenant {
773 : // Update the existing tenant
774 : Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config)
775 : .await
776 : .map_err(SetNewTenantConfigError::Persist)?;
777 : tenant.set_new_location_config(AttachedTenantConf::try_from(new_location_config)?);
778 : } else {
779 : // Upsert a fresh TenantSlot into TenantsMap. Do it within the map write lock,
780 : // and re-check that the state of anything we are replacing is as expected.
781 : tenant_map_upsert_slot(tenant_id, |old_value| async move {
782 0 : if let Some(TenantSlot::Attached(t)) = old_value {
783 0 : if !matches!(t.current_state(), TenantState::Stopping { .. }) {
784 0 : anyhow::bail!("Tenant state changed during location configuration update");
785 0 : }
786 0 : }
787 :
788 0 : let new_slot = match &new_location_config.mode {
789 : LocationMode::Secondary(_) => {
790 0 : let tenant_path = conf.tenant_path(&tenant_id);
791 0 : // Directory doesn't need to be fsync'd because if we crash it can
792 0 : // safely be recreated next time this tenant location is configured.
793 0 : unsafe_create_dir_all(&tenant_path)
794 0 : .await
795 0 : .with_context(|| format!("Creating {tenant_path}"))?;
796 :
797 0 : Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config)
798 0 : .await
799 0 : .map_err(SetNewTenantConfigError::Persist)?;
800 :
801 0 : TenantSlot::Secondary
802 : }
803 0 : LocationMode::Attached(_attach_config) => {
804 0 : // FIXME: should avoid doing this disk I/O inside the TenantsMap lock,
805 0 : // we have the same problem in load_tenant/attach_tenant. Probably
806 0 : // need a lock in TenantSlot to fix this.
807 0 : let timelines_path = conf.timelines_path(&tenant_id);
808 0 :
809 0 : // Directory doesn't need to be fsync'd because we do not depend on
810 0 : // it to exist after crashes: it may be recreated when tenant is
811 0 : // re-attached, see https://github.com/neondatabase/neon/issues/5550
812 0 : unsafe_create_dir_all(&timelines_path)
813 0 : .await
814 0 : .with_context(|| format!("Creating {timelines_path}"))?;
815 :
816 0 : Tenant::persist_tenant_config(conf, &tenant_id, &new_location_config)
817 0 : .await
818 0 : .map_err(SetNewTenantConfigError::Persist)?;
819 :
820 0 : let tenant = match Tenant::spawn_attach(
821 0 : conf,
822 0 : tenant_id,
823 0 : TenantSharedResources {
824 0 : broker_client,
825 0 : remote_storage,
826 0 : deletion_queue_client,
827 0 : },
828 0 : AttachedTenantConf::try_from(new_location_config)?,
829 0 : &TENANTS,
830 0 : // The LocationConf API does not use marker files, because we have Secondary
831 0 : // locations where the directory's existence is not a signal that it contains
832 0 : // all timelines. See https://github.com/neondatabase/neon/issues/5550
833 0 : AttachMarkerMode::Ignore,
834 0 : ctx,
835 : ) {
836 0 : Ok(tenant) => tenant,
837 0 : Err(e) => {
838 0 : error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
839 0 : Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"))
840 : }
841 : };
842 :
843 0 : TenantSlot::Attached(tenant)
844 : }
845 : };
846 :
847 0 : Ok(new_slot)
848 0 : })
849 : .await?;
850 : }
851 : Ok(())
852 : }
853 :
854 CBC 10 : #[derive(Debug, thiserror::Error)]
855 : pub(crate) enum GetTenantError {
856 : #[error("Tenant {0} not found")]
857 : NotFound(TenantId),
858 : #[error("Tenant {0} is not active")]
859 : NotActive(TenantId),
860 : /// Broken is logically a subset of NotActive, but a distinct error is useful as
861 : /// NotActive is usually a retryable state for API purposes, whereas Broken
862 : /// is a stuck error state
863 : #[error("Tenant is broken: {0}")]
864 : Broken(String),
865 : }
866 :
867 : /// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
868 : /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
869 : ///
870 : /// This method is cancel-safe.
871 9906 : pub(crate) async fn get_tenant(
872 9906 : tenant_id: TenantId,
873 9906 : active_only: bool,
874 9906 : ) -> Result<Arc<Tenant>, GetTenantError> {
875 9906 : let m = TENANTS.read().await;
876 9906 : let tenant = m
877 9906 : .get(&tenant_id)
878 9906 : .ok_or(GetTenantError::NotFound(tenant_id))?;
879 :
880 9838 : match tenant.current_state() {
881 : TenantState::Broken {
882 1 : reason,
883 1 : backtrace: _,
884 1 : } if active_only => Err(GetTenantError::Broken(reason)),
885 9413 : TenantState::Active => Ok(Arc::clone(tenant)),
886 : _ => {
887 424 : if active_only {
888 14 : Err(GetTenantError::NotActive(tenant_id))
889 : } else {
890 410 : Ok(Arc::clone(tenant))
891 : }
892 : }
893 : }
894 9906 : }
895 :
896 91 : pub(crate) async fn delete_tenant(
897 91 : conf: &'static PageServerConf,
898 91 : remote_storage: Option<GenericRemoteStorage>,
899 91 : tenant_id: TenantId,
900 91 : ) -> Result<(), DeleteTenantError> {
901 547 : DeleteTenantFlow::run(conf, remote_storage, &TENANTS, tenant_id).await
902 91 : }
903 :
904 UBC 0 : #[derive(Debug, thiserror::Error)]
905 : pub(crate) enum DeleteTimelineError {
906 : #[error("Tenant {0}")]
907 : Tenant(#[from] GetTenantError),
908 :
909 : #[error("Timeline {0}")]
910 : Timeline(#[from] crate::tenant::DeleteTimelineError),
911 : }
912 :
913 CBC 99 : pub(crate) async fn delete_timeline(
914 99 : tenant_id: TenantId,
915 99 : timeline_id: TimelineId,
916 99 : _ctx: &RequestContext,
917 99 : ) -> Result<(), DeleteTimelineError> {
918 99 : let tenant = get_tenant(tenant_id, true).await?;
919 605 : DeleteTimelineFlow::run(&tenant, timeline_id, false).await?;
920 85 : Ok(())
921 99 : }
922 :
923 UBC 0 : #[derive(Debug, thiserror::Error)]
924 : pub(crate) enum TenantStateError {
925 : #[error("Tenant {0} not found")]
926 : NotFound(TenantId),
927 : #[error("Tenant {0} is stopping")]
928 : IsStopping(TenantId),
929 : #[error(transparent)]
930 : Other(#[from] anyhow::Error),
931 : }
932 :
933 CBC 38 : pub(crate) async fn detach_tenant(
934 38 : conf: &'static PageServerConf,
935 38 : tenant_id: TenantId,
936 38 : detach_ignored: bool,
937 38 : deletion_queue_client: &DeletionQueueClient,
938 38 : ) -> Result<(), TenantStateError> {
939 38 : let tmp_path = detach_tenant0(
940 38 : conf,
941 38 : &TENANTS,
942 38 : tenant_id,
943 38 : detach_ignored,
944 38 : deletion_queue_client,
945 38 : )
946 217 : .await?;
947 : // Although we are cleaning up the tenant, this task is not meant to be bound by the lifetime of the tenant in memory.
948 : // After a tenant is detached, there are no more task_mgr tasks for that tenant_id.
949 35 : let task_tenant_id = None;
950 35 : task_mgr::spawn(
951 35 : task_mgr::BACKGROUND_RUNTIME.handle(),
952 35 : TaskKind::MgmtRequest,
953 35 : task_tenant_id,
954 35 : None,
955 35 : "tenant_files_delete",
956 35 : false,
957 35 : async move {
958 35 : fs::remove_dir_all(tmp_path.as_path())
959 35 : .await
960 35 : .with_context(|| format!("tenant directory {:?} deletion", tmp_path))
961 35 : },
962 35 : );
963 35 : Ok(())
964 38 : }
965 :
966 38 : async fn detach_tenant0(
967 38 : conf: &'static PageServerConf,
968 38 : tenants: &tokio::sync::RwLock<TenantsMap>,
969 38 : tenant_id: TenantId,
970 38 : detach_ignored: bool,
971 38 : deletion_queue_client: &DeletionQueueClient,
972 38 : ) -> Result<Utf8PathBuf, TenantStateError> {
973 38 : let tenant_dir_rename_operation = |tenant_id_to_clean| async move {
974 35 : let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
975 35 : safe_rename_tenant_dir(&local_tenant_directory)
976 105 : .await
977 35 : .with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))
978 38 : };
979 :
980 38 : let removal_result =
981 214 : remove_tenant_from_memory(tenants, tenant_id, tenant_dir_rename_operation(tenant_id)).await;
982 :
983 : // Flush pending deletions, so that they have a good chance of passing validation
984 : // before this tenant is potentially re-attached elsewhere.
985 38 : deletion_queue_client.flush_advisory();
986 38 :
987 38 : // Ignored tenants are not present in memory and will bail the removal from memory operation.
988 38 : // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
989 38 : if detach_ignored && matches!(removal_result, Err(TenantStateError::NotFound(_))) {
990 1 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
991 1 : if tenant_ignore_mark.exists() {
992 1 : info!("Detaching an ignored tenant");
993 1 : let tmp_path = tenant_dir_rename_operation(tenant_id)
994 3 : .await
995 1 : .with_context(|| format!("Ignored tenant {tenant_id} local directory rename"))?;
996 1 : return Ok(tmp_path);
997 UBC 0 : }
998 CBC 37 : }
999 :
1000 37 : removal_result
1001 38 : }
1002 :
1003 6 : pub(crate) async fn load_tenant(
1004 6 : conf: &'static PageServerConf,
1005 6 : tenant_id: TenantId,
1006 6 : generation: Generation,
1007 6 : broker_client: storage_broker::BrokerClientChannel,
1008 6 : remote_storage: Option<GenericRemoteStorage>,
1009 6 : deletion_queue_client: DeletionQueueClient,
1010 6 : ctx: &RequestContext,
1011 6 : ) -> Result<(), TenantMapInsertError> {
1012 6 : tenant_map_insert(tenant_id, || async {
1013 5 : let tenant_path = conf.tenant_path(&tenant_id);
1014 5 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
1015 5 : if tenant_ignore_mark.exists() {
1016 5 : std::fs::remove_file(&tenant_ignore_mark)
1017 5 : .with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
1018 UBC 0 : }
1019 :
1020 CBC 5 : let resources = TenantSharedResources {
1021 5 : broker_client,
1022 5 : remote_storage,
1023 5 : deletion_queue_client
1024 5 : };
1025 :
1026 5 : let mut location_conf = Tenant::load_tenant_config(conf, &tenant_id).map_err( TenantMapInsertError::Other)?;
1027 5 : location_conf.attach_in_generation(generation);
1028 5 : Tenant::persist_tenant_config(conf, &tenant_id, &location_conf).await?;
1029 :
1030 5 : let new_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_path, AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)
1031 5 : .with_context(|| {
1032 UBC 0 : format!("Failed to schedule tenant processing in path {tenant_path:?}")
1033 CBC 5 : })?;
1034 :
1035 5 : Ok(new_tenant)
1036 6 : }).await?;
1037 5 : Ok(())
1038 6 : }
1039 :
1040 7 : pub(crate) async fn ignore_tenant(
1041 7 : conf: &'static PageServerConf,
1042 7 : tenant_id: TenantId,
1043 7 : ) -> Result<(), TenantStateError> {
1044 27 : ignore_tenant0(conf, &TENANTS, tenant_id).await
1045 7 : }
1046 :
1047 7 : async fn ignore_tenant0(
1048 7 : conf: &'static PageServerConf,
1049 7 : tenants: &tokio::sync::RwLock<TenantsMap>,
1050 7 : tenant_id: TenantId,
1051 7 : ) -> Result<(), TenantStateError> {
1052 7 : remove_tenant_from_memory(tenants, tenant_id, async {
1053 7 : let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_id);
1054 7 : fs::File::create(&ignore_mark_file)
1055 7 : .await
1056 7 : .context("Failed to create ignore mark file")
1057 7 : .and_then(|_| {
1058 7 : crashsafe::fsync_file_and_parent(&ignore_mark_file)
1059 7 : .context("Failed to fsync ignore mark file")
1060 7 : })
1061 7 : .with_context(|| format!("Failed to crate ignore mark for tenant {tenant_id}"))?;
1062 7 : Ok(())
1063 7 : })
1064 27 : .await
1065 7 : }
1066 :
1067 UBC 0 : #[derive(Debug, thiserror::Error)]
1068 : pub(crate) enum TenantMapListError {
1069 : #[error("tenant map is still initiailizing")]
1070 : Initializing,
1071 : }
1072 :
1073 : ///
1074 : /// Get list of tenants, for the mgmt API
1075 : ///
1076 CBC 113 : pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
1077 113 : let tenants = TENANTS.read().await;
1078 113 : let m = match &*tenants {
1079 UBC 0 : TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
1080 CBC 113 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
1081 113 : };
1082 113 : Ok(m.iter()
1083 160 : .filter_map(|(id, tenant)| match tenant {
1084 160 : TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
1085 UBC 0 : TenantSlot::Secondary => None,
1086 CBC 160 : })
1087 113 : .collect())
1088 113 : }
1089 :
1090 : /// Execute Attach mgmt API command.
1091 : ///
1092 : /// Downloading all the tenant data is performed in the background, this merely
1093 : /// spawns the background task and returns quickly.
1094 57 : pub(crate) async fn attach_tenant(
1095 57 : conf: &'static PageServerConf,
1096 57 : tenant_id: TenantId,
1097 57 : generation: Generation,
1098 57 : tenant_conf: TenantConfOpt,
1099 57 : resources: TenantSharedResources,
1100 57 : ctx: &RequestContext,
1101 57 : ) -> Result<(), TenantMapInsertError> {
1102 57 : tenant_map_insert(tenant_id, || async {
1103 44 : let location_conf = LocationConf::attached_single(tenant_conf, generation);
1104 44 : let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id, CreateTenantFilesMode::Attach).await?;
1105 : // TODO: tenant directory remains on disk if we bail out from here on.
1106 : // See https://github.com/neondatabase/neon/issues/4233
1107 :
1108 : // Without the attach marker, schedule_local_tenant_processing will treat the attached tenant as fully attached
1109 42 : let marker_file_exists = conf
1110 42 : .tenant_attaching_mark_file_path(&tenant_id)
1111 42 : .try_exists()
1112 42 : .context("check for attach marker file existence")?;
1113 42 : anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
1114 :
1115 42 : let attached_tenant = schedule_local_tenant_processing(conf, tenant_id, &tenant_dir, AttachedTenantConf::try_from(location_conf)?, resources, None, &TENANTS, ctx)?;
1116 : // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
1117 : // See https://github.com/neondatabase/neon/issues/4233
1118 :
1119 42 : let attached_tenant_id = attached_tenant.tenant_id();
1120 42 : anyhow::ensure!(
1121 42 : tenant_id == attached_tenant_id,
1122 UBC 0 : "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})",
1123 : );
1124 CBC 42 : Ok(attached_tenant)
1125 57 : })
1126 15 : .await?;
1127 42 : Ok(())
1128 57 : }
1129 :
1130 UBC 0 : #[derive(Debug, thiserror::Error)]
1131 : pub(crate) enum TenantMapInsertError {
1132 : #[error("tenant map is still initializing")]
1133 : StillInitializing,
1134 : #[error("tenant map is shutting down")]
1135 : ShuttingDown,
1136 : #[error("tenant {0} already exists, state: {1:?}")]
1137 : TenantAlreadyExists(TenantId, TenantState),
1138 : #[error("tenant {0} already exists in secondary state")]
1139 : TenantExistsSecondary(TenantId),
1140 : #[error(transparent)]
1141 : Other(#[from] anyhow::Error),
1142 : }
1143 :
1144 : /// Give the given closure access to the tenants map entry for the given `tenant_id`, iff that
1145 : /// entry is vacant. The closure is responsible for creating the tenant object and inserting
1146 : /// it into the tenants map through the vacnt entry that it receives as argument.
1147 : ///
1148 : /// NB: the closure should return quickly because the current implementation of tenants map
1149 : /// serializes access through an `RwLock`.
1150 CBC 513 : async fn tenant_map_insert<F, R>(
1151 513 : tenant_id: TenantId,
1152 513 : insert_fn: F,
1153 513 : ) -> Result<Arc<Tenant>, TenantMapInsertError>
1154 513 : where
1155 513 : F: FnOnce() -> R,
1156 513 : R: std::future::Future<Output = anyhow::Result<Arc<Tenant>>>,
1157 513 : {
1158 513 : let mut guard = TENANTS.write().await;
1159 513 : let m = match &mut *guard {
1160 UBC 0 : TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing),
1161 0 : TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown),
1162 CBC 513 : TenantsMap::Open(m) => m,
1163 513 : };
1164 513 : match m.entry(tenant_id) {
1165 14 : hash_map::Entry::Occupied(e) => match e.get() {
1166 14 : TenantSlot::Attached(t) => Err(TenantMapInsertError::TenantAlreadyExists(
1167 14 : tenant_id,
1168 14 : t.current_state(),
1169 14 : )),
1170 UBC 0 : TenantSlot::Secondary => Err(TenantMapInsertError::TenantExistsSecondary(tenant_id)),
1171 : },
1172 CBC 499 : hash_map::Entry::Vacant(v) => match insert_fn().await {
1173 496 : Ok(tenant) => {
1174 496 : v.insert(TenantSlot::Attached(tenant.clone()));
1175 496 : Ok(tenant)
1176 : }
1177 3 : Err(e) => Err(TenantMapInsertError::Other(e)),
1178 : },
1179 : }
1180 513 : }
1181 :
1182 UBC 0 : async fn tenant_map_upsert_slot<'a, F, R>(
1183 0 : tenant_id: TenantId,
1184 0 : upsert_fn: F,
1185 0 : ) -> Result<(), TenantMapInsertError>
1186 0 : where
1187 0 : F: FnOnce(Option<TenantSlot>) -> R,
1188 0 : R: std::future::Future<Output = anyhow::Result<TenantSlot>>,
1189 0 : {
1190 0 : let mut guard = TENANTS.write().await;
1191 0 : let m = match &mut *guard {
1192 0 : TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing),
1193 0 : TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown),
1194 0 : TenantsMap::Open(m) => m,
1195 0 : };
1196 0 :
1197 0 : match upsert_fn(m.remove(&tenant_id)).await {
1198 0 : Ok(upsert_val) => {
1199 0 : m.insert(tenant_id, upsert_val);
1200 0 : Ok(())
1201 : }
1202 0 : Err(e) => Err(TenantMapInsertError::Other(e)),
1203 : }
1204 0 : }
1205 :
1206 : /// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
1207 : /// Allows to remove other tenant resources manually, via `tenant_cleanup`.
1208 : /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
1209 : /// operation would be needed to remove it.
1210 CBC 46 : async fn remove_tenant_from_memory<V, F>(
1211 46 : tenants: &tokio::sync::RwLock<TenantsMap>,
1212 46 : tenant_id: TenantId,
1213 46 : tenant_cleanup: F,
1214 46 : ) -> Result<V, TenantStateError>
1215 46 : where
1216 46 : F: std::future::Future<Output = anyhow::Result<V>>,
1217 46 : {
1218 : use utils::completion;
1219 :
1220 : // It's important to keep the tenant in memory after the final cleanup, to avoid cleanup races.
1221 : // The exclusive lock here ensures we don't miss the tenant state updates before trying another removal.
1222 : // tenant-wde cleanup operations may take some time (removing the entire tenant directory), we want to
1223 : // avoid holding the lock for the entire process.
1224 42 : let tenant = {
1225 46 : match tenants
1226 46 : .write()
1227 UBC 0 : .await
1228 CBC 46 : .get_slot(&tenant_id)
1229 46 : .ok_or(TenantStateError::NotFound(tenant_id))?
1230 : {
1231 42 : TenantSlot::Attached(t) => Some(t.clone()),
1232 UBC 0 : TenantSlot::Secondary => None,
1233 : }
1234 : };
1235 :
1236 : // allow pageserver shutdown to await for our completion
1237 CBC 42 : let (_guard, progress) = completion::channel();
1238 42 :
1239 42 : // If the tenant was attached, shut it down gracefully. For secondary
1240 42 : // locations this part is not necessary
1241 42 : match tenant {
1242 42 : Some(attached_tenant) => {
1243 42 : // whenever we remove a tenant from memory, we don't want to flush and wait for upload
1244 42 : let freeze_and_flush = false;
1245 42 :
1246 42 : // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
1247 42 : // that we can continue safely to cleanup.
1248 132 : match attached_tenant.shutdown(progress, freeze_and_flush).await {
1249 42 : Ok(()) => {}
1250 UBC 0 : Err(_other) => {
1251 0 : // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
1252 0 : // wait for it but return an error right away because these are distinct requests.
1253 0 : return Err(TenantStateError::IsStopping(tenant_id));
1254 : }
1255 : }
1256 : }
1257 0 : None => {
1258 0 : // Nothing to wait on when not attached, proceed.
1259 0 : }
1260 : }
1261 :
1262 CBC 42 : match tenant_cleanup
1263 110 : .await
1264 42 : .with_context(|| format!("Failed to run cleanup for tenant {tenant_id}"))
1265 : {
1266 42 : Ok(hook_value) => {
1267 42 : let mut tenants_accessor = tenants.write().await;
1268 42 : if tenants_accessor.remove(&tenant_id).is_none() {
1269 UBC 0 : warn!("Tenant {tenant_id} got removed from memory before operation finished");
1270 CBC 42 : }
1271 42 : Ok(hook_value)
1272 : }
1273 UBC 0 : Err(e) => {
1274 0 : let tenants_accessor = tenants.read().await;
1275 0 : match tenants_accessor.get(&tenant_id) {
1276 0 : Some(tenant) => {
1277 0 : tenant.set_broken(e.to_string()).await;
1278 : }
1279 : None => {
1280 0 : warn!("Tenant {tenant_id} got removed from memory");
1281 0 : return Err(TenantStateError::NotFound(tenant_id));
1282 : }
1283 : }
1284 0 : Err(TenantStateError::Other(e))
1285 : }
1286 : }
1287 CBC 46 : }
1288 :
1289 : use {
1290 : crate::repository::GcResult, pageserver_api::models::TimelineGcRequest,
1291 : utils::http::error::ApiError,
1292 : };
1293 :
1294 371 : pub(crate) async fn immediate_gc(
1295 371 : tenant_id: TenantId,
1296 371 : timeline_id: TimelineId,
1297 371 : gc_req: TimelineGcRequest,
1298 371 : ctx: &RequestContext,
1299 371 : ) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
1300 371 : let guard = TENANTS.read().await;
1301 371 : let tenant = guard
1302 371 : .get(&tenant_id)
1303 371 : .map(Arc::clone)
1304 371 : .with_context(|| format!("tenant {tenant_id}"))
1305 371 : .map_err(|e| ApiError::NotFound(e.into()))?;
1306 :
1307 370 : let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
1308 370 : // Use tenant's pitr setting
1309 370 : let pitr = tenant.get_pitr_interval();
1310 370 :
1311 370 : // Run in task_mgr to avoid race with tenant_detach operation
1312 370 : let ctx = ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
1313 370 : let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
1314 370 : task_mgr::spawn(
1315 370 : &tokio::runtime::Handle::current(),
1316 370 : TaskKind::GarbageCollector,
1317 370 : Some(tenant_id),
1318 370 : Some(timeline_id),
1319 370 : &format!("timeline_gc_handler garbage collection run for tenant {tenant_id} timeline {timeline_id}"),
1320 370 : false,
1321 370 : async move {
1322 UBC 0 : fail::fail_point!("immediate_gc_task_pre");
1323 CBC 370 : let result = tenant
1324 370 : .gc_iteration(Some(timeline_id), gc_horizon, pitr, &ctx)
1325 370 : .instrument(info_span!("manual_gc", %tenant_id, %timeline_id))
1326 182 : .await;
1327 : // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
1328 : // better once the types support it.
1329 370 : match task_done.send(result) {
1330 370 : Ok(_) => (),
1331 UBC 0 : Err(result) => error!("failed to send gc result: {result:?}"),
1332 : }
1333 CBC 370 : Ok(())
1334 370 : }
1335 370 : );
1336 370 :
1337 370 : // drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
1338 370 : drop(guard);
1339 370 :
1340 370 : Ok(wait_task_done)
1341 371 : }
1342 :
1343 : #[cfg(test)]
1344 : mod tests {
1345 : use std::collections::HashMap;
1346 : use std::sync::Arc;
1347 : use tracing::{info_span, Instrument};
1348 :
1349 : use crate::tenant::mgr::TenantSlot;
1350 :
1351 : use super::{super::harness::TenantHarness, TenantsMap};
1352 :
1353 1 : #[tokio::test(start_paused = true)]
1354 1 : async fn shutdown_joins_remove_tenant_from_memory() {
1355 : // the test is a bit ugly with the lockstep together with spawned tasks. the aim is to make
1356 : // sure `shutdown_all_tenants0` per-tenant processing joins in any active
1357 : // remove_tenant_from_memory calls, which is enforced by making the operation last until
1358 : // we've ran `shutdown_all_tenants0` for a long time.
1359 :
1360 1 : let (t, _ctx) = TenantHarness::create("shutdown_joins_detach")
1361 1 : .unwrap()
1362 1 : .load()
1363 1 : .await;
1364 :
1365 : // harness loads it to active, which is forced and nothing is running on the tenant
1366 :
1367 1 : let id = t.tenant_id();
1368 :
1369 : // tenant harness configures the logging and we cannot escape it
1370 1 : let _e = info_span!("testing", tenant_id = %id).entered();
1371 1 :
1372 1 : let tenants = HashMap::from([(id, TenantSlot::Attached(t.clone()))]);
1373 1 : let tenants = Arc::new(tokio::sync::RwLock::new(TenantsMap::Open(tenants)));
1374 1 :
1375 1 : let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
1376 1 : let (until_cleanup_started, cleanup_started) = utils::completion::channel();
1377 :
1378 : // start a "detaching operation", which will take a while, until can_complete_cleanup
1379 1 : let cleanup_task = {
1380 1 : let jh = tokio::spawn({
1381 1 : let tenants = tenants.clone();
1382 1 : async move {
1383 1 : let cleanup = async move {
1384 1 : drop(until_cleanup_started);
1385 1 : can_complete_cleanup.wait().await;
1386 1 : anyhow::Ok(())
1387 1 : };
1388 1 : super::remove_tenant_from_memory(&tenants, id, cleanup).await
1389 1 : }
1390 1 : .instrument(info_span!("foobar", tenant_id = %id))
1391 : });
1392 :
1393 : // now the long cleanup should be in place, with the stopping state
1394 1 : cleanup_started.wait().await;
1395 1 : jh
1396 : };
1397 :
1398 1 : let mut cleanup_progress = std::pin::pin!(t
1399 1 : .shutdown(utils::completion::Barrier::default(), false)
1400 UBC 0 : .await
1401 CBC 1 : .unwrap_err()
1402 1 : .wait());
1403 :
1404 1 : let mut shutdown_task = {
1405 1 : let (until_shutdown_started, shutdown_started) = utils::completion::channel();
1406 1 :
1407 1 : let shutdown_task = tokio::spawn(async move {
1408 1 : drop(until_shutdown_started);
1409 2 : super::shutdown_all_tenants0(&tenants).await;
1410 1 : });
1411 1 :
1412 1 : shutdown_started.wait().await;
1413 1 : shutdown_task
1414 1 : };
1415 1 :
1416 1 : // if the joining in is removed from shutdown_all_tenants0, the shutdown_task should always
1417 1 : // get to complete within timeout and fail the test. it is expected to continue awaiting
1418 1 : // until completion or SIGKILL during normal shutdown.
1419 1 : //
1420 1 : // the timeout is long to cover anything that shutdown_task could be doing, but it is
1421 1 : // handled instantly because we use tokio's time pausing in this test. 100s is much more than
1422 1 : // what we get from systemd on shutdown (10s).
1423 1 : let long_time = std::time::Duration::from_secs(100);
1424 2 : tokio::select! {
1425 2 : _ = &mut shutdown_task => unreachable!("shutdown must continue, until_cleanup_completed is not dropped"),
1426 2 : _ = &mut cleanup_progress => unreachable!("cleanup progress must continue, until_cleanup_completed is not dropped"),
1427 2 : _ = tokio::time::sleep(long_time) => {},
1428 2 : }
1429 :
1430 : // allow the remove_tenant_from_memory and thus eventually the shutdown to continue
1431 1 : drop(until_cleanup_completed);
1432 :
1433 1 : let (je, ()) = tokio::join!(shutdown_task, cleanup_progress);
1434 1 : je.expect("Tenant::shutdown shutdown not have panicked");
1435 1 : cleanup_task
1436 UBC 0 : .await
1437 CBC 1 : .expect("no panicking")
1438 1 : .expect("remove_tenant_from_memory failed");
1439 1 :
1440 1 : futures::future::poll_immediate(
1441 1 : t.shutdown(utils::completion::Barrier::default(), false)
1442 UBC 0 : .await
1443 CBC 1 : .unwrap_err()
1444 1 : .wait(),
1445 : )
1446 UBC 0 : .await
1447 CBC 1 : .expect("the stopping progress must still be complete");
1448 : }
1449 : }
|