Line data Source code
1 : //! This module acts as a switchboard to access different repositories managed by this
2 : //! page server.
3 :
4 : use std::collections::{hash_map, HashMap};
5 : use std::ffi::OsStr;
6 : use std::path::Path;
7 : use std::sync::Arc;
8 : use tokio::fs;
9 :
10 : use anyhow::Context;
11 : use once_cell::sync::Lazy;
12 : use tokio::sync::RwLock;
13 : use tokio::task::JoinSet;
14 : use tracing::*;
15 :
16 : use remote_storage::GenericRemoteStorage;
17 : use utils::crashsafe;
18 :
19 : use crate::config::PageServerConf;
20 : use crate::context::{DownloadBehavior, RequestContext};
21 : use crate::task_mgr::{self, TaskKind};
22 : use crate::tenant::config::TenantConfOpt;
23 : use crate::tenant::delete::DeleteTenantFlow;
24 : use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
25 : use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
26 :
27 : use utils::crashsafe::path_with_suffix_extension;
28 : use utils::fs_ext::PathExt;
29 : use utils::generation::Generation;
30 : use utils::id::{TenantId, TimelineId};
31 :
32 : use super::delete::DeleteTenantError;
33 : use super::timeline::delete::DeleteTimelineFlow;
34 : use super::TenantSharedResources;
35 :
36 : /// The tenants known to the pageserver.
37 : /// The enum variants are used to distinguish the different states that the pageserver can be in.
38 : pub(crate) enum TenantsMap {
39 : /// [`init_tenant_mgr`] is not done yet.
40 : Initializing,
41 : /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
42 : /// New tenants can be added using [`tenant_map_insert`].
43 : Open(HashMap<TenantId, Arc<Tenant>>),
44 : /// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
45 : /// Existing tenants are still accessible, but no new tenants can be created.
46 : ShuttingDown(HashMap<TenantId, Arc<Tenant>>),
47 : }
48 :
49 : impl TenantsMap {
50 10222 : pub(crate) fn get(&self, tenant_id: &TenantId) -> Option<&Arc<Tenant>> {
51 10222 : match self {
52 0 : TenantsMap::Initializing => None,
53 10222 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.get(tenant_id),
54 : }
55 10222 : }
56 125 : pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option<Arc<Tenant>> {
57 125 : match self {
58 0 : TenantsMap::Initializing => None,
59 125 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
60 : }
61 125 : }
62 : }
63 :
64 : /// This is "safe" in that that it won't leave behind a partially deleted directory
65 : /// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting
66 : /// the contents.
67 : ///
68 : /// This is pageserver-specific, as it relies on future processes after a crash to check
69 : /// for TEMP_FILE_SUFFIX when loading things.
70 37 : async fn safe_remove_tenant_dir_all(path: impl AsRef<Path>) -> std::io::Result<()> {
71 37 : let parent = path
72 37 : .as_ref()
73 37 : .parent()
74 37 : // It is invalid to call this function with a relative path. Tenant directories
75 37 : // should always have a parent.
76 37 : .ok_or(std::io::Error::new(
77 37 : std::io::ErrorKind::InvalidInput,
78 37 : "Path must be absolute",
79 37 : ))?;
80 :
81 37 : let tmp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX);
82 37 : fs::rename(&path, &tmp_path).await?;
83 37 : fs::File::open(parent).await?.sync_all().await?;
84 37 : fs::remove_dir_all(tmp_path).await
85 37 : }
86 :
87 575 : static TENANTS: Lazy<RwLock<TenantsMap>> = Lazy::new(|| RwLock::new(TenantsMap::Initializing));
88 :
89 : /// Initialize repositories with locally available timelines.
90 : /// Timelines that are only partially available locally (remote storage has more data than this pageserver)
91 : /// are scheduled for download and added to the tenant once download is completed.
92 2875 : #[instrument(skip_all)]
93 : pub async fn init_tenant_mgr(
94 : conf: &'static PageServerConf,
95 : resources: TenantSharedResources,
96 : init_order: InitializationOrder,
97 : ) -> anyhow::Result<()> {
98 : // Scan local filesystem for attached tenants
99 : let tenants_dir = conf.tenants_path();
100 :
101 : let mut tenants = HashMap::new();
102 :
103 : let mut dir_entries = fs::read_dir(&tenants_dir)
104 : .await
105 0 : .with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
106 :
107 : let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
108 :
109 : loop {
110 : match dir_entries.next_entry().await {
111 : Ok(None) => break,
112 : Ok(Some(dir_entry)) => {
113 : let tenant_dir_path = dir_entry.path();
114 : if crate::is_temporary(&tenant_dir_path) {
115 0 : info!(
116 0 : "Found temporary tenant directory, removing: {}",
117 0 : tenant_dir_path.display()
118 0 : );
119 : // No need to use safe_remove_tenant_dir_all because this is already
120 : // a temporary path
121 : if let Err(e) = fs::remove_dir_all(&tenant_dir_path).await {
122 0 : error!(
123 0 : "Failed to remove temporary directory '{}': {:?}",
124 0 : tenant_dir_path.display(),
125 0 : e
126 0 : );
127 : }
128 : } else {
129 : // This case happens if we:
130 : // * crash during attach before creating the attach marker file
131 : // * crash during tenant delete before removing tenant directory
132 0 : let is_empty = tenant_dir_path.is_empty_dir().with_context(|| {
133 0 : format!("Failed to check whether {tenant_dir_path:?} is an empty dir")
134 0 : })?;
135 : if is_empty {
136 3 : info!("removing empty tenant directory {tenant_dir_path:?}");
137 : if let Err(e) = fs::remove_dir(&tenant_dir_path).await {
138 0 : error!(
139 0 : "Failed to remove empty tenant directory '{}': {e:#}",
140 0 : tenant_dir_path.display()
141 0 : )
142 : }
143 : continue;
144 : }
145 :
146 : let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME);
147 : if tenant_ignore_mark_file.exists() {
148 2 : info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant");
149 : continue;
150 : }
151 :
152 : match schedule_local_tenant_processing(
153 : conf,
154 : &tenant_dir_path,
155 : resources.clone(),
156 : Some(init_order.clone()),
157 : &TENANTS,
158 : &ctx,
159 : ) {
160 : Ok(tenant) => {
161 : tenants.insert(tenant.tenant_id(), tenant);
162 : }
163 : Err(e) => {
164 0 : error!("Failed to collect tenant files from dir {tenants_dir:?} for entry {dir_entry:?}, reason: {e:#}");
165 : }
166 : }
167 : }
168 : }
169 : Err(e) => {
170 : // On error, print it, but continue with the other tenants. If we error out
171 : // here, the pageserver startup fails altogether, causing outage for *all*
172 : // tenants. That seems worse.
173 0 : error!(
174 0 : "Failed to list tenants dir entry in directory {tenants_dir:?}, reason: {e:?}"
175 0 : );
176 : }
177 : }
178 : }
179 :
180 575 : info!("Processed {} local tenants at startup", tenants.len());
181 :
182 : let mut tenants_map = TENANTS.write().await;
183 : assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
184 : *tenants_map = TenantsMap::Open(tenants);
185 : Ok(())
186 : }
187 :
188 739 : pub(crate) fn schedule_local_tenant_processing(
189 739 : conf: &'static PageServerConf,
190 739 : tenant_path: &Path,
191 739 : resources: TenantSharedResources,
192 739 : init_order: Option<InitializationOrder>,
193 739 : tenants: &'static tokio::sync::RwLock<TenantsMap>,
194 739 : ctx: &RequestContext,
195 739 : ) -> anyhow::Result<Arc<Tenant>> {
196 739 : anyhow::ensure!(
197 739 : tenant_path.is_dir(),
198 0 : "Cannot load tenant from path {tenant_path:?}, it either does not exist or not a directory"
199 : );
200 739 : anyhow::ensure!(
201 739 : !crate::is_temporary(tenant_path),
202 0 : "Cannot load tenant from temporary path {tenant_path:?}"
203 : );
204 : anyhow::ensure!(
205 739 : !tenant_path.is_empty_dir().with_context(|| {
206 0 : format!("Failed to check whether {tenant_path:?} is an empty dir")
207 739 : })?,
208 0 : "Cannot load tenant from empty directory {tenant_path:?}"
209 : );
210 :
211 739 : let tenant_id = tenant_path
212 739 : .file_name()
213 739 : .and_then(OsStr::to_str)
214 739 : .unwrap_or_default()
215 739 : .parse::<TenantId>()
216 739 : .with_context(|| {
217 0 : format!("Could not parse tenant id out of the tenant dir name in path {tenant_path:?}")
218 739 : })?;
219 :
220 739 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
221 739 : anyhow::ensure!(
222 739 : !conf.tenant_ignore_mark_file_path(&tenant_id).exists(),
223 0 : "Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}"
224 : );
225 :
226 739 : let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() {
227 42 : info!("tenant {tenant_id} has attaching mark file, resuming its attach operation");
228 42 : if let Some(remote_storage) = resources.remote_storage {
229 42 : match Tenant::spawn_attach(
230 42 : conf,
231 42 : tenant_id,
232 42 : Generation::none(),
233 42 : resources.broker_client,
234 42 : tenants,
235 42 : remote_storage,
236 42 : ctx,
237 42 : ) {
238 42 : Ok(tenant) => tenant,
239 0 : Err(e) => {
240 0 : error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
241 0 : Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"))
242 : }
243 : }
244 : } else {
245 0 : warn!("tenant {tenant_id} has attaching mark file, but pageserver has no remote storage configured");
246 0 : Tenant::create_broken_tenant(
247 0 : conf,
248 0 : tenant_id,
249 0 : "attaching mark file present but no remote storage configured".to_string(),
250 0 : )
251 : }
252 : } else {
253 697 : info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
254 : // Start loading the tenant into memory. It will initially be in Loading state.
255 697 : Tenant::spawn_load(
256 697 : conf,
257 697 : tenant_id,
258 697 : Generation::none(),
259 697 : resources,
260 697 : init_order,
261 697 : tenants,
262 697 : ctx,
263 697 : )
264 : };
265 739 : Ok(tenant)
266 739 : }
267 :
268 : ///
269 : /// Shut down all tenants. This runs as part of pageserver shutdown.
270 : ///
271 : /// NB: We leave the tenants in the map, so that they remain accessible through
272 : /// the management API until we shut it down. If we removed the shut-down tenants
273 : /// from the tenants map, the management API would return 404 for these tenants,
274 : /// because TenantsMap::get() now returns `None`.
275 : /// That could be easily misinterpreted by control plane, the consumer of the
276 : /// management API. For example, it could attach the tenant on a different pageserver.
277 : /// We would then be in split-brain once this pageserver restarts.
278 444 : #[instrument(skip_all)]
279 : pub async fn shutdown_all_tenants() {
280 : shutdown_all_tenants0(&TENANTS).await
281 : }
282 :
283 149 : async fn shutdown_all_tenants0(tenants: &tokio::sync::RwLock<TenantsMap>) {
284 : use utils::completion;
285 :
286 : // Prevent new tenants from being created.
287 149 : let tenants_to_shut_down = {
288 149 : let mut m = tenants.write().await;
289 149 : match &mut *m {
290 : TenantsMap::Initializing => {
291 0 : *m = TenantsMap::ShuttingDown(HashMap::default());
292 0 : info!("tenants map is empty");
293 0 : return;
294 : }
295 149 : TenantsMap::Open(tenants) => {
296 149 : let tenants_clone = tenants.clone();
297 149 : *m = TenantsMap::ShuttingDown(std::mem::take(tenants));
298 149 : tenants_clone
299 : }
300 : TenantsMap::ShuttingDown(_) => {
301 : // TODO: it is possible that detach and shutdown happen at the same time. as a
302 : // result, during shutdown we do not wait for detach.
303 0 : error!("already shutting down, this function isn't supposed to be called more than once");
304 0 : return;
305 : }
306 : }
307 : };
308 :
309 149 : let started_at = std::time::Instant::now();
310 149 : let mut join_set = JoinSet::new();
311 313 : for (tenant_id, tenant) in tenants_to_shut_down {
312 164 : join_set.spawn(
313 164 : async move {
314 164 : let freeze_and_flush = true;
315 :
316 164 : let res = {
317 164 : let (_guard, shutdown_progress) = completion::channel();
318 471 : tenant.shutdown(shutdown_progress, freeze_and_flush).await
319 : };
320 :
321 164 : if let Err(other_progress) = res {
322 : // join the another shutdown in progress
323 1 : other_progress.wait().await;
324 163 : }
325 :
326 : // we cannot afford per tenant logging here, because if s3 is degraded, we are
327 : // going to log too many lines
328 :
329 164 : debug!("tenant successfully stopped");
330 164 : }
331 164 : .instrument(info_span!("shutdown", %tenant_id)),
332 : );
333 : }
334 :
335 149 : let total = join_set.len();
336 149 : let mut panicked = 0;
337 149 : let mut buffering = true;
338 149 : const BUFFER_FOR: std::time::Duration = std::time::Duration::from_millis(500);
339 149 : let mut buffered = std::pin::pin!(tokio::time::sleep(BUFFER_FOR));
340 :
341 318 : while !join_set.is_empty() {
342 169 : tokio::select! {
343 164 : Some(joined) = join_set.join_next() => {
344 : match joined {
345 : Ok(()) => {}
346 : Err(join_error) if join_error.is_cancelled() => {
347 : unreachable!("we are not cancelling any of the futures");
348 : }
349 : Err(join_error) if join_error.is_panic() => {
350 : // cannot really do anything, as this panic is likely a bug
351 : panicked += 1;
352 : }
353 : Err(join_error) => {
354 0 : warn!("unknown kind of JoinError: {join_error}");
355 : }
356 : }
357 : if !buffering {
358 : // buffer so that every 500ms since the first update (or starting) we'll log
359 : // how far away we are; this is because we will get SIGKILL'd at 10s, and we
360 : // are not able to log *then*.
361 : buffering = true;
362 : buffered.as_mut().reset(tokio::time::Instant::now() + BUFFER_FOR);
363 : }
364 : },
365 : _ = &mut buffered, if buffering => {
366 : buffering = false;
367 5 : info!(remaining = join_set.len(), total, elapsed_ms = started_at.elapsed().as_millis(), "waiting for tenants to shutdown");
368 : }
369 : }
370 : }
371 :
372 149 : if panicked > 0 {
373 0 : warn!(
374 0 : panicked,
375 0 : total, "observed panicks while shutting down tenants"
376 0 : );
377 149 : }
378 :
379 : // caller will log how long we took
380 149 : }
381 :
382 480 : pub async fn create_tenant(
383 480 : conf: &'static PageServerConf,
384 480 : tenant_conf: TenantConfOpt,
385 480 : tenant_id: TenantId,
386 480 : broker_client: storage_broker::BrokerClientChannel,
387 480 : remote_storage: Option<GenericRemoteStorage>,
388 480 : ctx: &RequestContext,
389 480 : ) -> Result<Arc<Tenant>, TenantMapInsertError> {
390 480 : tenant_map_insert(tenant_id, || async {
391 : // We're holding the tenants lock in write mode while doing local IO.
392 : // If this section ever becomes contentious, introduce a new `TenantState::Creating`
393 : // and do the work in that state.
394 480 : let tenant_directory = super::create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Create).await?;
395 : // TODO: tenant directory remains on disk if we bail out from here on.
396 : // See https://github.com/neondatabase/neon/issues/4233
397 :
398 479 : let tenant_resources = TenantSharedResources {
399 479 : broker_client,
400 479 : remote_storage,
401 479 : };
402 479 : let created_tenant =
403 479 : schedule_local_tenant_processing(conf, &tenant_directory, tenant_resources, None, &TENANTS, ctx)?;
404 : // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
405 : // See https://github.com/neondatabase/neon/issues/4233
406 :
407 479 : let crated_tenant_id = created_tenant.tenant_id();
408 479 : anyhow::ensure!(
409 479 : tenant_id == crated_tenant_id,
410 0 : "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {crated_tenant_id})",
411 : );
412 479 : Ok(created_tenant)
413 480 : }).await
414 480 : }
415 :
416 0 : #[derive(Debug, thiserror::Error)]
417 : pub enum SetNewTenantConfigError {
418 : #[error(transparent)]
419 : GetTenant(#[from] GetTenantError),
420 : #[error(transparent)]
421 : Persist(anyhow::Error),
422 : }
423 :
424 27 : pub async fn set_new_tenant_config(
425 27 : conf: &'static PageServerConf,
426 27 : new_tenant_conf: TenantConfOpt,
427 27 : tenant_id: TenantId,
428 27 : ) -> Result<(), SetNewTenantConfigError> {
429 27 : info!("configuring tenant {tenant_id}");
430 27 : let tenant = get_tenant(tenant_id, true).await?;
431 :
432 27 : let tenant_config_path = conf.tenant_config_path(&tenant_id);
433 27 : Tenant::persist_tenant_config(&tenant_id, &tenant_config_path, new_tenant_conf)
434 0 : .await
435 27 : .map_err(SetNewTenantConfigError::Persist)?;
436 27 : tenant.set_new_tenant_config(new_tenant_conf);
437 27 : Ok(())
438 27 : }
439 :
440 16 : #[derive(Debug, thiserror::Error)]
441 : pub enum GetTenantError {
442 : #[error("Tenant {0} not found")]
443 : NotFound(TenantId),
444 : #[error("Tenant {0} is not active")]
445 : NotActive(TenantId),
446 : }
447 :
448 : /// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
449 : /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
450 9536 : pub async fn get_tenant(
451 9536 : tenant_id: TenantId,
452 9536 : active_only: bool,
453 9536 : ) -> Result<Arc<Tenant>, GetTenantError> {
454 9536 : let m = TENANTS.read().await;
455 9536 : let tenant = m
456 9536 : .get(&tenant_id)
457 9536 : .ok_or(GetTenantError::NotFound(tenant_id))?;
458 9443 : if active_only && !tenant.is_active() {
459 4 : Err(GetTenantError::NotActive(tenant_id))
460 : } else {
461 9439 : Ok(Arc::clone(tenant))
462 : }
463 9536 : }
464 :
465 132 : pub async fn delete_tenant(
466 132 : conf: &'static PageServerConf,
467 132 : remote_storage: Option<GenericRemoteStorage>,
468 132 : tenant_id: TenantId,
469 132 : ) -> Result<(), DeleteTenantError> {
470 675 : DeleteTenantFlow::run(conf, remote_storage, &TENANTS, tenant_id).await
471 132 : }
472 :
473 0 : #[derive(Debug, thiserror::Error)]
474 : pub enum DeleteTimelineError {
475 : #[error("Tenant {0}")]
476 : Tenant(#[from] GetTenantError),
477 :
478 : #[error("Timeline {0}")]
479 : Timeline(#[from] crate::tenant::DeleteTimelineError),
480 : }
481 :
482 121 : pub async fn delete_timeline(
483 121 : tenant_id: TenantId,
484 121 : timeline_id: TimelineId,
485 121 : _ctx: &RequestContext,
486 121 : ) -> Result<(), DeleteTimelineError> {
487 121 : let tenant = get_tenant(tenant_id, true).await?;
488 543 : DeleteTimelineFlow::run(&tenant, timeline_id, false).await?;
489 103 : Ok(())
490 121 : }
491 :
492 0 : #[derive(Debug, thiserror::Error)]
493 : pub enum TenantStateError {
494 : #[error("Tenant {0} not found")]
495 : NotFound(TenantId),
496 : #[error("Tenant {0} is stopping")]
497 : IsStopping(TenantId),
498 : #[error("Tenant {0} is not active")]
499 : NotActive(TenantId),
500 : #[error(transparent)]
501 : Other(#[from] anyhow::Error),
502 : }
503 :
504 40 : pub async fn detach_tenant(
505 40 : conf: &'static PageServerConf,
506 40 : tenant_id: TenantId,
507 40 : detach_ignored: bool,
508 40 : ) -> Result<(), TenantStateError> {
509 270 : detach_tenant0(conf, &TENANTS, tenant_id, detach_ignored).await
510 40 : }
511 :
512 40 : async fn detach_tenant0(
513 40 : conf: &'static PageServerConf,
514 40 : tenants: &tokio::sync::RwLock<TenantsMap>,
515 40 : tenant_id: TenantId,
516 40 : detach_ignored: bool,
517 40 : ) -> Result<(), TenantStateError> {
518 40 : let local_files_cleanup_operation = |tenant_id_to_clean| async move {
519 37 : let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
520 37 : safe_remove_tenant_dir_all(&local_tenant_directory)
521 148 : .await
522 37 : .with_context(|| {
523 0 : format!("local tenant directory {local_tenant_directory:?} removal")
524 37 : })?;
525 37 : Ok(())
526 40 : };
527 :
528 40 : let removal_result =
529 40 : remove_tenant_from_memory(tenants, tenant_id, local_files_cleanup_operation(tenant_id))
530 266 : .await;
531 :
532 : // Ignored tenants are not present in memory and will bail the removal from memory operation.
533 : // Before returning the error, check for ignored tenant removal case — we only need to clean its local files then.
534 40 : if detach_ignored && matches!(removal_result, Err(TenantStateError::NotFound(_))) {
535 1 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
536 1 : if tenant_ignore_mark.exists() {
537 1 : info!("Detaching an ignored tenant");
538 1 : local_files_cleanup_operation(tenant_id)
539 4 : .await
540 1 : .with_context(|| format!("Ignored tenant {tenant_id} local files cleanup"))?;
541 1 : return Ok(());
542 0 : }
543 39 : }
544 :
545 39 : removal_result
546 40 : }
547 :
548 7 : pub async fn load_tenant(
549 7 : conf: &'static PageServerConf,
550 7 : tenant_id: TenantId,
551 7 : broker_client: storage_broker::BrokerClientChannel,
552 7 : remote_storage: Option<GenericRemoteStorage>,
553 7 : ctx: &RequestContext,
554 7 : ) -> Result<(), TenantMapInsertError> {
555 7 : tenant_map_insert(tenant_id, || async {
556 6 : let tenant_path = conf.tenant_path(&tenant_id);
557 6 : let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
558 6 : if tenant_ignore_mark.exists() {
559 6 : std::fs::remove_file(&tenant_ignore_mark)
560 6 : .with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
561 0 : }
562 :
563 6 : let resources = TenantSharedResources {
564 6 : broker_client,
565 6 : remote_storage,
566 6 : };
567 6 : let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, resources, None, &TENANTS, ctx)
568 6 : .with_context(|| {
569 0 : format!("Failed to schedule tenant processing in path {tenant_path:?}")
570 6 : })?;
571 :
572 6 : Ok(new_tenant)
573 7 : }).await?;
574 6 : Ok(())
575 7 : }
576 :
577 8 : pub async fn ignore_tenant(
578 8 : conf: &'static PageServerConf,
579 8 : tenant_id: TenantId,
580 8 : ) -> Result<(), TenantStateError> {
581 34 : ignore_tenant0(conf, &TENANTS, tenant_id).await
582 8 : }
583 :
584 8 : async fn ignore_tenant0(
585 8 : conf: &'static PageServerConf,
586 8 : tenants: &tokio::sync::RwLock<TenantsMap>,
587 8 : tenant_id: TenantId,
588 8 : ) -> Result<(), TenantStateError> {
589 8 : remove_tenant_from_memory(tenants, tenant_id, async {
590 8 : let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_id);
591 8 : fs::File::create(&ignore_mark_file)
592 8 : .await
593 8 : .context("Failed to create ignore mark file")
594 8 : .and_then(|_| {
595 8 : crashsafe::fsync_file_and_parent(&ignore_mark_file)
596 8 : .context("Failed to fsync ignore mark file")
597 8 : })
598 8 : .with_context(|| format!("Failed to crate ignore mark for tenant {tenant_id}"))?;
599 8 : Ok(())
600 8 : })
601 34 : .await
602 8 : }
603 :
604 0 : #[derive(Debug, thiserror::Error)]
605 : pub enum TenantMapListError {
606 : #[error("tenant map is still initiailizing")]
607 : Initializing,
608 : }
609 :
610 : ///
611 : /// Get list of tenants, for the mgmt API
612 : ///
613 144 : pub async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
614 144 : let tenants = TENANTS.read().await;
615 144 : let m = match &*tenants {
616 0 : TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
617 144 : TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
618 144 : };
619 144 : Ok(m.iter()
620 200 : .map(|(id, tenant)| (*id, tenant.current_state()))
621 144 : .collect())
622 144 : }
623 :
624 : /// Execute Attach mgmt API command.
625 : ///
626 : /// Downloading all the tenant data is performed in the background, this merely
627 : /// spawns the background task and returns quickly.
628 48 : pub async fn attach_tenant(
629 48 : conf: &'static PageServerConf,
630 48 : tenant_id: TenantId,
631 48 : tenant_conf: TenantConfOpt,
632 48 : broker_client: storage_broker::BrokerClientChannel,
633 48 : remote_storage: GenericRemoteStorage,
634 48 : ctx: &RequestContext,
635 48 : ) -> Result<(), TenantMapInsertError> {
636 48 : tenant_map_insert(tenant_id, || async {
637 41 : let tenant_dir = create_tenant_files(conf, tenant_conf, &tenant_id, CreateTenantFilesMode::Attach).await?;
638 : // TODO: tenant directory remains on disk if we bail out from here on.
639 : // See https://github.com/neondatabase/neon/issues/4233
640 :
641 : // Without the attach marker, schedule_local_tenant_processing will treat the attached tenant as fully attached
642 39 : let marker_file_exists = conf
643 39 : .tenant_attaching_mark_file_path(&tenant_id)
644 39 : .try_exists()
645 39 : .context("check for attach marker file existence")?;
646 39 : anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
647 :
648 39 : let resources = TenantSharedResources {
649 39 : broker_client,
650 39 : remote_storage: Some(remote_storage),
651 39 : };
652 39 : let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, resources, None, &TENANTS, ctx)?;
653 : // TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
654 : // See https://github.com/neondatabase/neon/issues/4233
655 :
656 39 : let attached_tenant_id = attached_tenant.tenant_id();
657 39 : anyhow::ensure!(
658 39 : tenant_id == attached_tenant_id,
659 0 : "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})",
660 : );
661 39 : Ok(attached_tenant)
662 48 : })
663 9 : .await?;
664 39 : Ok(())
665 48 : }
666 :
667 0 : #[derive(Debug, thiserror::Error)]
668 : pub enum TenantMapInsertError {
669 : #[error("tenant map is still initializing")]
670 : StillInitializing,
671 : #[error("tenant map is shutting down")]
672 : ShuttingDown,
673 : #[error("tenant {0} already exists, state: {1:?}")]
674 : TenantAlreadyExists(TenantId, TenantState),
675 : #[error(transparent)]
676 : Closure(#[from] anyhow::Error),
677 : }
678 :
679 : /// Give the given closure access to the tenants map entry for the given `tenant_id`, iff that
680 : /// entry is vacant. The closure is responsible for creating the tenant object and inserting
681 : /// it into the tenants map through the vacnt entry that it receives as argument.
682 : ///
683 : /// NB: the closure should return quickly because the current implementation of tenants map
684 : /// serializes access through an `RwLock`.
685 535 : async fn tenant_map_insert<F, R>(
686 535 : tenant_id: TenantId,
687 535 : insert_fn: F,
688 535 : ) -> Result<Arc<Tenant>, TenantMapInsertError>
689 535 : where
690 535 : F: FnOnce() -> R,
691 535 : R: std::future::Future<Output = anyhow::Result<Arc<Tenant>>>,
692 535 : {
693 535 : let mut guard = TENANTS.write().await;
694 535 : let m = match &mut *guard {
695 0 : TenantsMap::Initializing => return Err(TenantMapInsertError::StillInitializing),
696 0 : TenantsMap::ShuttingDown(_) => return Err(TenantMapInsertError::ShuttingDown),
697 535 : TenantsMap::Open(m) => m,
698 535 : };
699 535 : match m.entry(tenant_id) {
700 8 : hash_map::Entry::Occupied(e) => Err(TenantMapInsertError::TenantAlreadyExists(
701 8 : tenant_id,
702 8 : e.get().current_state(),
703 8 : )),
704 527 : hash_map::Entry::Vacant(v) => match insert_fn().await {
705 524 : Ok(tenant) => {
706 524 : v.insert(tenant.clone());
707 524 : Ok(tenant)
708 : }
709 3 : Err(e) => Err(TenantMapInsertError::Closure(e)),
710 : },
711 : }
712 535 : }
713 :
714 : /// Stops and removes the tenant from memory, if it's not [`TenantState::Stopping`] already, bails otherwise.
715 : /// Allows to remove other tenant resources manually, via `tenant_cleanup`.
716 : /// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
717 : /// operation would be needed to remove it.
718 49 : async fn remove_tenant_from_memory<V, F>(
719 49 : tenants: &tokio::sync::RwLock<TenantsMap>,
720 49 : tenant_id: TenantId,
721 49 : tenant_cleanup: F,
722 49 : ) -> Result<V, TenantStateError>
723 49 : where
724 49 : F: std::future::Future<Output = anyhow::Result<V>>,
725 49 : {
726 : use utils::completion;
727 :
728 : // It's important to keep the tenant in memory after the final cleanup, to avoid cleanup races.
729 : // The exclusive lock here ensures we don't miss the tenant state updates before trying another removal.
730 : // tenant-wde cleanup operations may take some time (removing the entire tenant directory), we want to
731 : // avoid holding the lock for the entire process.
732 45 : let tenant = {
733 49 : tenants
734 49 : .write()
735 0 : .await
736 49 : .get(&tenant_id)
737 49 : .cloned()
738 49 : .ok_or(TenantStateError::NotFound(tenant_id))?
739 : };
740 :
741 : // allow pageserver shutdown to await for our completion
742 45 : let (_guard, progress) = completion::channel();
743 45 :
744 45 : // whenever we remove a tenant from memory, we don't want to flush and wait for upload
745 45 : let freeze_and_flush = false;
746 45 :
747 45 : // shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
748 45 : // that we can continue safely to cleanup.
749 148 : match tenant.shutdown(progress, freeze_and_flush).await {
750 45 : Ok(()) => {}
751 0 : Err(_other) => {
752 0 : // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
753 0 : // wait for it but return an error right away because these are distinct requests.
754 0 : return Err(TenantStateError::IsStopping(tenant_id));
755 : }
756 : }
757 :
758 45 : match tenant_cleanup
759 153 : .await
760 45 : .with_context(|| format!("Failed to run cleanup for tenant {tenant_id}"))
761 : {
762 45 : Ok(hook_value) => {
763 45 : let mut tenants_accessor = tenants.write().await;
764 45 : if tenants_accessor.remove(&tenant_id).is_none() {
765 0 : warn!("Tenant {tenant_id} got removed from memory before operation finished");
766 45 : }
767 45 : Ok(hook_value)
768 : }
769 0 : Err(e) => {
770 0 : let tenants_accessor = tenants.read().await;
771 0 : match tenants_accessor.get(&tenant_id) {
772 0 : Some(tenant) => {
773 0 : tenant.set_broken(e.to_string()).await;
774 : }
775 : None => {
776 0 : warn!("Tenant {tenant_id} got removed from memory");
777 0 : return Err(TenantStateError::NotFound(tenant_id));
778 : }
779 : }
780 0 : Err(TenantStateError::Other(e))
781 : }
782 : }
783 49 : }
784 :
785 : use {
786 : crate::repository::GcResult, pageserver_api::models::TimelineGcRequest,
787 : utils::http::error::ApiError,
788 : };
789 :
790 505 : pub async fn immediate_gc(
791 505 : tenant_id: TenantId,
792 505 : timeline_id: TimelineId,
793 505 : gc_req: TimelineGcRequest,
794 505 : ctx: &RequestContext,
795 505 : ) -> Result<tokio::sync::oneshot::Receiver<Result<GcResult, anyhow::Error>>, ApiError> {
796 505 : let guard = TENANTS.read().await;
797 505 : let tenant = guard
798 505 : .get(&tenant_id)
799 505 : .map(Arc::clone)
800 505 : .with_context(|| format!("tenant {tenant_id}"))
801 505 : .map_err(|e| ApiError::NotFound(e.into()))?;
802 :
803 504 : let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon());
804 504 : // Use tenant's pitr setting
805 504 : let pitr = tenant.get_pitr_interval();
806 504 :
807 504 : // Run in task_mgr to avoid race with tenant_detach operation
808 504 : let ctx = ctx.detached_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
809 504 : let (task_done, wait_task_done) = tokio::sync::oneshot::channel();
810 504 : task_mgr::spawn(
811 504 : &tokio::runtime::Handle::current(),
812 504 : TaskKind::GarbageCollector,
813 504 : Some(tenant_id),
814 504 : Some(timeline_id),
815 504 : &format!("timeline_gc_handler garbage collection run for tenant {tenant_id} timeline {timeline_id}"),
816 504 : false,
817 504 : async move {
818 0 : fail::fail_point!("immediate_gc_task_pre");
819 504 : let result = tenant
820 504 : .gc_iteration(Some(timeline_id), gc_horizon, pitr, &ctx)
821 504 : .instrument(info_span!("manual_gc", %tenant_id, %timeline_id))
822 165 : .await;
823 : // FIXME: `gc_iteration` can return an error for multiple reasons; we should handle it
824 : // better once the types support it.
825 504 : match task_done.send(result) {
826 504 : Ok(_) => (),
827 0 : Err(result) => error!("failed to send gc result: {result:?}"),
828 : }
829 504 : Ok(())
830 504 : }
831 504 : );
832 504 :
833 504 : // drop the guard until after we've spawned the task so that timeline shutdown will wait for the task
834 504 : drop(guard);
835 504 :
836 504 : Ok(wait_task_done)
837 505 : }
838 :
839 : #[cfg(test)]
840 : mod tests {
841 : use std::collections::HashMap;
842 : use std::sync::Arc;
843 : use tracing::{info_span, Instrument};
844 :
845 : use super::{super::harness::TenantHarness, TenantsMap};
846 :
847 1 : #[tokio::test(start_paused = true)]
848 1 : async fn shutdown_joins_remove_tenant_from_memory() {
849 : // the test is a bit ugly with the lockstep together with spawned tasks. the aim is to make
850 : // sure `shutdown_all_tenants0` per-tenant processing joins in any active
851 : // remove_tenant_from_memory calls, which is enforced by making the operation last until
852 : // we've ran `shutdown_all_tenants0` for a long time.
853 :
854 1 : let (t, _ctx) = TenantHarness::create("shutdown_joins_detach")
855 1 : .unwrap()
856 1 : .load()
857 1 : .await;
858 :
859 : // harness loads it to active, which is forced and nothing is running on the tenant
860 :
861 1 : let id = t.tenant_id();
862 :
863 : // tenant harness configures the logging and we cannot escape it
864 1 : let _e = info_span!("testing", tenant_id = %id).entered();
865 1 :
866 1 : let tenants = HashMap::from([(id, t.clone())]);
867 1 : let tenants = Arc::new(tokio::sync::RwLock::new(TenantsMap::Open(tenants)));
868 1 :
869 1 : let (until_cleanup_completed, can_complete_cleanup) = utils::completion::channel();
870 1 : let (until_cleanup_started, cleanup_started) = utils::completion::channel();
871 :
872 : // start a "detaching operation", which will take a while, until can_complete_cleanup
873 1 : let cleanup_task = {
874 1 : let jh = tokio::spawn({
875 1 : let tenants = tenants.clone();
876 1 : async move {
877 1 : let cleanup = async move {
878 1 : drop(until_cleanup_started);
879 1 : can_complete_cleanup.wait().await;
880 1 : anyhow::Ok(())
881 1 : };
882 1 : super::remove_tenant_from_memory(&tenants, id, cleanup).await
883 1 : }
884 1 : .instrument(info_span!("foobar", tenant_id = %id))
885 : });
886 :
887 : // now the long cleanup should be in place, with the stopping state
888 1 : cleanup_started.wait().await;
889 1 : jh
890 : };
891 :
892 1 : let mut cleanup_progress = std::pin::pin!(t
893 1 : .shutdown(utils::completion::Barrier::default(), false)
894 0 : .await
895 1 : .unwrap_err()
896 1 : .wait());
897 :
898 1 : let mut shutdown_task = {
899 1 : let (until_shutdown_started, shutdown_started) = utils::completion::channel();
900 1 :
901 1 : let shutdown_task = tokio::spawn(async move {
902 1 : drop(until_shutdown_started);
903 2 : super::shutdown_all_tenants0(&tenants).await;
904 1 : });
905 1 :
906 1 : shutdown_started.wait().await;
907 1 : shutdown_task
908 1 : };
909 1 :
910 1 : // if the joining in is removed from shutdown_all_tenants0, the shutdown_task should always
911 1 : // get to complete within timeout and fail the test. it is expected to continue awaiting
912 1 : // until completion or SIGKILL during normal shutdown.
913 1 : //
914 1 : // the timeout is long to cover anything that shutdown_task could be doing, but it is
915 1 : // handled instantly because we use tokio's time pausing in this test. 100s is much more than
916 1 : // what we get from systemd on shutdown (10s).
917 1 : let long_time = std::time::Duration::from_secs(100);
918 2 : tokio::select! {
919 2 : _ = &mut shutdown_task => unreachable!("shutdown must continue, until_cleanup_completed is not dropped"),
920 2 : _ = &mut cleanup_progress => unreachable!("cleanup progress must continue, until_cleanup_completed is not dropped"),
921 2 : _ = tokio::time::sleep(long_time) => {},
922 2 : }
923 :
924 : // allow the remove_tenant_from_memory and thus eventually the shutdown to continue
925 1 : drop(until_cleanup_completed);
926 :
927 1 : let (je, ()) = tokio::join!(shutdown_task, cleanup_progress);
928 1 : je.expect("Tenant::shutdown shutdown not have panicked");
929 1 : cleanup_task
930 0 : .await
931 1 : .expect("no panicking")
932 1 : .expect("remove_tenant_from_memory failed");
933 1 :
934 1 : futures::future::poll_immediate(
935 1 : t.shutdown(utils::completion::Barrier::default(), false)
936 0 : .await
937 1 : .unwrap_err()
938 1 : .wait(),
939 : )
940 0 : .await
941 1 : .expect("the stopping progress must still be complete");
942 : }
943 : }
|