Line data Source code
1 : //!
2 : //! Timeline repository implementation that keeps old data in files on disk, and
3 : //! the recent changes in memory. See tenant/*_layer.rs files.
4 : //! The functions here are responsible for locating the correct layer for the
5 : //! get/put call, walking back the timeline branching history as needed.
6 : //!
7 : //! The files are stored in the .neon/tenants/<tenant_id>/timelines/<timeline_id>
8 : //! directory. See docs/pageserver-storage.md for how the files are managed.
9 : //! In addition to the layer files, there is a metadata file in the same
10 : //! directory that contains information about the timeline, in particular its
11 : //! parent timeline, and the last LSN that has been written to disk.
12 : //!
13 :
14 : use anyhow::{bail, Context};
15 : use futures::FutureExt;
16 : use pageserver_api::models::TimelineState;
17 : use remote_storage::DownloadError;
18 : use remote_storage::GenericRemoteStorage;
19 : use storage_broker::BrokerClientChannel;
20 : use tokio::sync::watch;
21 : use tokio::task::JoinSet;
22 : use tokio_util::sync::CancellationToken;
23 : use tracing::*;
24 : use utils::completion;
25 : use utils::crashsafe::path_with_suffix_extension;
26 :
27 : use std::cmp::min;
28 : use std::collections::hash_map::Entry;
29 : use std::collections::BTreeSet;
30 : use std::collections::HashMap;
31 : use std::fmt::Debug;
32 : use std::fmt::Display;
33 : use std::fs;
34 : use std::fs::File;
35 : use std::io;
36 : use std::ops::Bound::Included;
37 : use std::path::Path;
38 : use std::path::PathBuf;
39 : use std::process::Command;
40 : use std::process::Stdio;
41 : use std::sync::atomic::AtomicU64;
42 : use std::sync::atomic::Ordering;
43 : use std::sync::Arc;
44 : use std::sync::MutexGuard;
45 : use std::sync::{Mutex, RwLock};
46 : use std::time::{Duration, Instant};
47 :
48 : use self::config::TenantConf;
49 : use self::delete::DeleteTenantFlow;
50 : use self::metadata::LoadMetadataError;
51 : use self::metadata::TimelineMetadata;
52 : use self::mgr::TenantsMap;
53 : use self::remote_timeline_client::RemoteTimelineClient;
54 : use self::timeline::uninit::TimelineUninitMark;
55 : use self::timeline::uninit::UninitializedTimeline;
56 : use self::timeline::EvictionTaskTenantState;
57 : use self::timeline::TimelineResources;
58 : use crate::config::PageServerConf;
59 : use crate::context::{DownloadBehavior, RequestContext};
60 : use crate::import_datadir;
61 : use crate::is_uninit_mark;
62 : use crate::metrics::TENANT_ACTIVATION;
63 : use crate::metrics::{remove_tenant_metrics, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC};
64 : use crate::repository::GcResult;
65 : use crate::task_mgr;
66 : use crate::task_mgr::TaskKind;
67 : use crate::tenant::config::TenantConfOpt;
68 : use crate::tenant::metadata::load_metadata;
69 : pub use crate::tenant::remote_timeline_client::index::IndexPart;
70 : use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
71 : use crate::tenant::storage_layer::DeltaLayer;
72 : use crate::tenant::storage_layer::ImageLayer;
73 : use crate::InitializationOrder;
74 :
75 : use crate::tenant::timeline::delete::DeleteTimelineFlow;
76 : use crate::tenant::timeline::uninit::cleanup_timeline_directory;
77 : use crate::virtual_file::VirtualFile;
78 : use crate::walredo::PostgresRedoManager;
79 : use crate::walredo::WalRedoManager;
80 : use crate::TEMP_FILE_SUFFIX;
81 : pub use pageserver_api::models::TenantState;
82 :
83 : use toml_edit;
84 : use utils::{
85 : crashsafe,
86 : generation::Generation,
87 : id::{TenantId, TimelineId},
88 : lsn::{Lsn, RecordLsn},
89 : };
90 :
91 : /// Declare a failpoint that can use the `pause` failpoint action.
92 : /// We don't want to block the executor thread, hence, spawn_blocking + await.
93 : macro_rules! pausable_failpoint {
94 : ($name:literal) => {
95 : if cfg!(feature = "testing") {
96 : tokio::task::spawn_blocking({
97 : let current = tracing::Span::current();
98 : move || {
99 : let _entered = current.entered();
100 : tracing::info!("at failpoint {}", $name);
101 : fail::fail_point!($name);
102 : }
103 : })
104 : .await
105 : .expect("spawn_blocking");
106 : }
107 : };
108 : }
109 :
110 : pub mod blob_io;
111 : pub mod block_io;
112 :
113 : pub mod disk_btree;
114 : pub(crate) mod ephemeral_file;
115 : pub mod layer_map;
116 : mod span;
117 :
118 : pub mod metadata;
119 : mod par_fsync;
120 : mod remote_timeline_client;
121 : pub mod storage_layer;
122 :
123 : pub mod config;
124 : pub mod delete;
125 : pub mod mgr;
126 : pub mod tasks;
127 : pub mod upload_queue;
128 :
129 : pub(crate) mod timeline;
130 :
131 : pub mod size;
132 :
133 : pub(crate) use timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
134 : pub use timeline::{
135 : LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline,
136 : };
137 :
138 : // re-export for use in remote_timeline_client.rs
139 : pub use crate::tenant::metadata::save_metadata;
140 :
141 : // re-export for use in walreceiver
142 : pub use crate::tenant::timeline::WalReceiverInfo;
143 :
144 : /// The "tenants" part of `tenants/<tenant>/timelines...`
145 : pub const TENANTS_SEGMENT_NAME: &str = "tenants";
146 :
147 : /// Parts of the `.neon/tenants/<tenant_id>/timelines/<timeline_id>` directory prefix.
148 : pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
149 :
150 : pub const TENANT_ATTACHING_MARKER_FILENAME: &str = "attaching";
151 :
152 : pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
153 :
154 : /// References to shared objects that are passed into each tenant, such
155 : /// as the shared remote storage client and process initialization state.
156 215 : #[derive(Clone)]
157 : pub struct TenantSharedResources {
158 : pub broker_client: storage_broker::BrokerClientChannel,
159 : pub remote_storage: Option<GenericRemoteStorage>,
160 : }
161 :
162 : ///
163 : /// Tenant consists of multiple timelines. Keep them in a hash table.
164 : ///
165 : pub struct Tenant {
166 : // Global pageserver config parameters
167 : pub conf: &'static PageServerConf,
168 :
169 : /// The value creation timestamp, used to measure activation delay, see:
170 : /// <https://github.com/neondatabase/neon/issues/4025>
171 : loading_started_at: Instant,
172 :
173 : state: watch::Sender<TenantState>,
174 :
175 : // Overridden tenant-specific config parameters.
176 : // We keep TenantConfOpt sturct here to preserve the information
177 : // about parameters that are not set.
178 : // This is necessary to allow global config updates.
179 : tenant_conf: Arc<RwLock<TenantConfOpt>>,
180 :
181 : tenant_id: TenantId,
182 :
183 : /// The remote storage generation, used to protect S3 objects from split-brain.
184 : /// Does not change over the lifetime of the [`Tenant`] object.
185 : generation: Generation,
186 :
187 : timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
188 : // This mutex prevents creation of new timelines during GC.
189 : // Adding yet another mutex (in addition to `timelines`) is needed because holding
190 : // `timelines` mutex during all GC iteration
191 : // may block for a long time `get_timeline`, `get_timelines_state`,... and other operations
192 : // with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
193 : // timeout...
194 : gc_cs: tokio::sync::Mutex<()>,
195 : walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
196 :
197 : // provides access to timeline data sitting in the remote storage
198 : pub(crate) remote_storage: Option<GenericRemoteStorage>,
199 :
200 : /// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
201 : cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
202 : cached_synthetic_tenant_size: Arc<AtomicU64>,
203 :
204 : eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
205 :
206 : pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
207 : }
208 :
209 : // We should not blindly overwrite local metadata with remote one.
210 : // For example, consider the following case:
211 : // Image layer is flushed to disk as a new delta layer, we update local metadata and start upload task but after that
212 : // pageserver crashes. During startup we'll load new metadata, and then reset it
213 : // to the state of remote one. But current layermap will have layers from the old
214 : // metadata which is inconsistent.
215 : // And with current logic it wont disgard them during load because during layermap
216 : // load it sees local disk consistent lsn which is ahead of layer lsns.
217 : // If we treat remote as source of truth we need to completely sync with it,
218 : // i e delete local files which are missing on the remote. This will add extra work,
219 : // wal for these layers needs to be reingested for example
220 : //
221 : // So the solution is to take remote metadata only when we're attaching.
222 : pub fn merge_local_remote_metadata<'a>(
223 : local: Option<&'a TimelineMetadata>,
224 : remote: Option<&'a TimelineMetadata>,
225 : ) -> anyhow::Result<(&'a TimelineMetadata, bool)> {
226 326 : match (local, remote) {
227 0 : (None, None) => anyhow::bail!("we should have either local metadata or remote"),
228 148 : (Some(local), None) => Ok((local, true)),
229 : // happens if we crash during attach, before writing out the metadata file
230 49 : (None, Some(remote)) => Ok((remote, false)),
231 : // This is the regular case where we crash/exit before finishing queued uploads.
232 : // Also, it happens if we crash during attach after writing the metadata file
233 : // but before removing the attaching marker file.
234 129 : (Some(local), Some(remote)) => {
235 129 : let consistent_lsn_cmp = local
236 129 : .disk_consistent_lsn()
237 129 : .cmp(&remote.disk_consistent_lsn());
238 129 : let gc_cutoff_lsn_cmp = local
239 129 : .latest_gc_cutoff_lsn()
240 129 : .cmp(&remote.latest_gc_cutoff_lsn());
241 129 : use std::cmp::Ordering::*;
242 129 : match (consistent_lsn_cmp, gc_cutoff_lsn_cmp) {
243 : // It wouldn't matter, but pick the local one so that we don't rewrite the metadata file.
244 127 : (Equal, Equal) => Ok((local, true)),
245 : // Local state is clearly ahead of the remote.
246 0 : (Greater, Greater) => Ok((local, true)),
247 : // We have local layer files that aren't on the remote, but GC horizon is on par.
248 2 : (Greater, Equal) => Ok((local, true)),
249 : // Local GC started running but we couldn't sync it to the remote.
250 0 : (Equal, Greater) => Ok((local, true)),
251 :
252 : // We always update the local value first, so something else must have
253 : // updated the remote value, probably a different pageserver.
254 : // The control plane is supposed to prevent this from happening.
255 : // Bail out.
256 : (Less, Less)
257 : | (Less, Equal)
258 : | (Equal, Less)
259 : | (Less, Greater)
260 : | (Greater, Less) => {
261 0 : anyhow::bail!(
262 0 : r#"remote metadata appears to be ahead of local metadata:
263 0 : local:
264 0 : {local:#?}
265 0 : remote:
266 0 : {remote:#?}
267 0 : "#
268 0 : );
269 : }
270 : }
271 : }
272 : }
273 326 : }
274 :
275 176 : #[derive(Debug, thiserror::Error, PartialEq, Eq)]
276 : pub enum GetTimelineError {
277 : #[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")]
278 : NotActive {
279 : tenant_id: TenantId,
280 : timeline_id: TimelineId,
281 : state: TimelineState,
282 : },
283 : #[error("Timeline {tenant_id}/{timeline_id} was not found")]
284 : NotFound {
285 : tenant_id: TenantId,
286 : timeline_id: TimelineId,
287 : },
288 : }
289 :
290 0 : #[derive(Debug, thiserror::Error)]
291 : pub enum LoadLocalTimelineError {
292 : #[error("FailedToLoad")]
293 : Load(#[source] anyhow::Error),
294 : #[error("FailedToResumeDeletion")]
295 : ResumeDeletion(#[source] anyhow::Error),
296 : }
297 :
298 239 : #[derive(thiserror::Error)]
299 : pub enum DeleteTimelineError {
300 : #[error("NotFound")]
301 : NotFound,
302 :
303 : #[error("HasChildren")]
304 : HasChildren(Vec<TimelineId>),
305 :
306 : #[error("Timeline deletion is already in progress")]
307 : AlreadyInProgress(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>),
308 :
309 : #[error(transparent)]
310 : Other(#[from] anyhow::Error),
311 : }
312 :
313 : impl Debug for DeleteTimelineError {
314 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
315 0 : match self {
316 0 : Self::NotFound => write!(f, "NotFound"),
317 0 : Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(),
318 0 : Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(),
319 0 : Self::Other(e) => f.debug_tuple("Other").field(e).finish(),
320 : }
321 0 : }
322 : }
323 :
324 : pub enum SetStoppingError {
325 : AlreadyStopping(completion::Barrier),
326 : Broken,
327 : }
328 :
329 : impl Debug for SetStoppingError {
330 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331 0 : match self {
332 0 : Self::AlreadyStopping(_) => f.debug_tuple("AlreadyStopping").finish(),
333 0 : Self::Broken => write!(f, "Broken"),
334 : }
335 0 : }
336 : }
337 :
338 : struct RemoteStartupData {
339 : index_part: IndexPart,
340 : remote_metadata: TimelineMetadata,
341 : }
342 :
343 0 : #[derive(Debug, thiserror::Error)]
344 : pub(crate) enum WaitToBecomeActiveError {
345 : WillNotBecomeActive {
346 : tenant_id: TenantId,
347 : state: TenantState,
348 : },
349 : TenantDropped {
350 : tenant_id: TenantId,
351 : },
352 : }
353 :
354 : impl std::fmt::Display for WaitToBecomeActiveError {
355 132 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
356 132 : match self {
357 132 : WaitToBecomeActiveError::WillNotBecomeActive { tenant_id, state } => {
358 132 : write!(
359 132 : f,
360 132 : "Tenant {} will not become active. Current state: {:?}",
361 132 : tenant_id, state
362 132 : )
363 : }
364 0 : WaitToBecomeActiveError::TenantDropped { tenant_id } => {
365 0 : write!(f, "Tenant {tenant_id} will not become active (dropped)")
366 : }
367 : }
368 132 : }
369 : }
370 :
371 0 : #[derive(thiserror::Error, Debug)]
372 : pub enum CreateTimelineError {
373 : #[error("a timeline with the given ID already exists")]
374 : AlreadyExists,
375 : #[error(transparent)]
376 : AncestorLsn(anyhow::Error),
377 : #[error(transparent)]
378 : Other(#[from] anyhow::Error),
379 : }
380 :
381 : struct TenantDirectoryScan {
382 : sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
383 : timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
384 : }
385 :
386 : enum CreateTimelineCause {
387 : Load,
388 : Delete,
389 : }
390 :
391 : impl Tenant {
392 : /// Yet another helper for timeline initialization.
393 : /// Contains the common part of `load_local_timeline` and `load_remote_timeline`.
394 : ///
395 : /// - Initializes the Timeline struct and inserts it into the tenant's hash map
396 : /// - Scans the local timeline directory for layer files and builds the layer map
397 : /// - Downloads remote index file and adds remote files to the layer map
398 : /// - Schedules remote upload tasks for any files that are present locally but missing from remote storage.
399 : ///
400 : /// If the operation fails, the timeline is left in the tenant's hash map in Broken state. On success,
401 : /// it is marked as Active.
402 : #[allow(clippy::too_many_arguments)]
403 326 : async fn timeline_init_and_sync(
404 326 : &self,
405 326 : timeline_id: TimelineId,
406 326 : resources: TimelineResources,
407 326 : remote_startup_data: Option<RemoteStartupData>,
408 326 : local_metadata: Option<TimelineMetadata>,
409 326 : ancestor: Option<Arc<Timeline>>,
410 326 : init_order: Option<&InitializationOrder>,
411 326 : _ctx: &RequestContext,
412 326 : ) -> anyhow::Result<()> {
413 326 : let tenant_id = self.tenant_id;
414 :
415 326 : let (up_to_date_metadata, picked_local) = merge_local_remote_metadata(
416 326 : local_metadata.as_ref(),
417 326 : remote_startup_data.as_ref().map(|r| &r.remote_metadata),
418 326 : )
419 326 : .context("merge_local_remote_metadata")?
420 326 : .to_owned();
421 :
422 326 : let timeline = self.create_timeline_struct(
423 326 : timeline_id,
424 326 : up_to_date_metadata,
425 326 : ancestor.clone(),
426 326 : resources,
427 326 : init_order,
428 326 : CreateTimelineCause::Load,
429 326 : )?;
430 326 : let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
431 326 : anyhow::ensure!(
432 326 : disk_consistent_lsn.is_valid(),
433 0 : "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
434 : );
435 326 : assert_eq!(
436 326 : disk_consistent_lsn,
437 326 : up_to_date_metadata.disk_consistent_lsn(),
438 0 : "these are used interchangeably"
439 : );
440 :
441 : // Save the metadata file to local disk.
442 326 : if !picked_local {
443 49 : save_metadata(self.conf, &tenant_id, &timeline_id, up_to_date_metadata)
444 0 : .await
445 49 : .context("save_metadata")?;
446 277 : }
447 :
448 326 : let index_part = remote_startup_data.as_ref().map(|x| &x.index_part);
449 :
450 326 : if let Some(index_part) = index_part {
451 178 : timeline
452 178 : .remote_client
453 178 : .as_ref()
454 178 : .unwrap()
455 178 : .init_upload_queue(index_part)?;
456 148 : } else if self.remote_storage.is_some() {
457 : // No data on the remote storage, but we have local metadata file. We can end up
458 : // here with timeline_create being interrupted before finishing index part upload.
459 : // By doing what we do here, the index part upload is retried.
460 : // If control plane retries timeline creation in the meantime, the mgmt API handler
461 : // for timeline creation will coalesce on the upload we queue here.
462 1 : let rtc = timeline.remote_client.as_ref().unwrap();
463 1 : rtc.init_upload_queue_for_empty_remote(up_to_date_metadata)?;
464 1 : rtc.schedule_index_upload_for_metadata_update(up_to_date_metadata)?;
465 147 : }
466 :
467 326 : timeline
468 326 : .load_layer_map(
469 326 : disk_consistent_lsn,
470 326 : remote_startup_data.map(|x| x.index_part),
471 326 : )
472 324 : .await
473 326 : .with_context(|| {
474 0 : format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}")
475 326 : })?;
476 :
477 : {
478 : // avoiding holding it across awaits
479 326 : let mut timelines_accessor = self.timelines.lock().unwrap();
480 326 : match timelines_accessor.entry(timeline_id) {
481 : Entry::Occupied(_) => {
482 : // The uninit mark file acts as a lock that prevents another task from
483 : // initializing the timeline at the same time.
484 0 : unreachable!(
485 0 : "Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
486 0 : );
487 : }
488 326 : Entry::Vacant(v) => {
489 326 : v.insert(Arc::clone(&timeline));
490 326 : timeline.maybe_spawn_flush_loop();
491 326 : }
492 326 : }
493 326 : };
494 326 :
495 326 : // Sanity check: a timeline should have some content.
496 326 : anyhow::ensure!(
497 326 : ancestor.is_some()
498 274 : || timeline
499 274 : .layers
500 274 : .read()
501 0 : .await
502 274 : .layer_map()
503 274 : .iter_historic_layers()
504 274 : .next()
505 274 : .is_some(),
506 1 : "Timeline has no ancestor and no layer files"
507 : );
508 :
509 325 : Ok(())
510 326 : }
511 :
512 : ///
513 : /// Attach a tenant that's available in cloud storage.
514 : ///
515 : /// This returns quickly, after just creating the in-memory object
516 : /// Tenant struct and launching a background task to download
517 : /// the remote index files. On return, the tenant is most likely still in
518 : /// Attaching state, and it will become Active once the background task
519 : /// finishes. You can use wait_until_active() to wait for the task to
520 : /// complete.
521 : ///
522 42 : pub(crate) fn spawn_attach(
523 42 : conf: &'static PageServerConf,
524 42 : tenant_id: TenantId,
525 42 : generation: Generation,
526 42 : broker_client: storage_broker::BrokerClientChannel,
527 42 : tenants: &'static tokio::sync::RwLock<TenantsMap>,
528 42 : remote_storage: GenericRemoteStorage,
529 42 : ctx: &RequestContext,
530 42 : ) -> anyhow::Result<Arc<Tenant>> {
531 : // TODO dedup with spawn_load
532 42 : let tenant_conf =
533 42 : Self::load_tenant_config(conf, &tenant_id).context("load tenant config")?;
534 :
535 42 : let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
536 42 : let tenant = Arc::new(Tenant::new(
537 42 : TenantState::Attaching,
538 42 : conf,
539 42 : tenant_conf,
540 42 : wal_redo_manager,
541 42 : tenant_id,
542 42 : generation,
543 42 : Some(remote_storage.clone()),
544 42 : ));
545 42 :
546 42 : // Do all the hard work in the background
547 42 : let tenant_clone = Arc::clone(&tenant);
548 42 :
549 42 : let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
550 42 : task_mgr::spawn(
551 42 : &tokio::runtime::Handle::current(),
552 42 : TaskKind::Attach,
553 42 : Some(tenant_id),
554 42 : None,
555 42 : "attach tenant",
556 : false,
557 42 : async move {
558 42 : // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
559 42 : let make_broken = |t: &Tenant, err: anyhow::Error| {
560 3 : error!("attach failed, setting tenant state to Broken: {err:?}");
561 3 : t.state.send_modify(|state| {
562 3 : assert_eq!(
563 : *state,
564 : TenantState::Attaching,
565 0 : "the attach task owns the tenant state until activation is complete"
566 : );
567 3 : *state = TenantState::broken_from_reason(err.to_string());
568 3 : });
569 3 : };
570 :
571 42 : let pending_deletion = {
572 42 : match DeleteTenantFlow::should_resume_deletion(
573 42 : conf,
574 42 : Some(&remote_storage),
575 42 : &tenant_clone,
576 42 : )
577 121 : .await
578 : {
579 42 : Ok(should_resume_deletion) => should_resume_deletion,
580 0 : Err(err) => {
581 0 : make_broken(&tenant_clone, anyhow::anyhow!(err));
582 0 : return Ok(());
583 : }
584 : }
585 : };
586 :
587 42 : info!("pending_deletion {}", pending_deletion.is_some());
588 :
589 42 : if let Some(deletion) = pending_deletion {
590 3 : match DeleteTenantFlow::resume_from_attach(
591 3 : deletion,
592 3 : &tenant_clone,
593 3 : tenants,
594 3 : &ctx,
595 3 : )
596 199 : .await
597 : {
598 0 : Err(err) => {
599 0 : make_broken(&tenant_clone, anyhow::anyhow!(err));
600 0 : return Ok(());
601 : }
602 3 : Ok(()) => return Ok(()),
603 : }
604 39 : }
605 39 :
606 296 : match tenant_clone.attach(&ctx).await {
607 : Ok(()) => {
608 36 : info!("attach finished, activating");
609 36 : tenant_clone.activate(broker_client, None, &ctx);
610 : }
611 3 : Err(e) => {
612 3 : make_broken(&tenant_clone, anyhow::anyhow!(e));
613 3 : }
614 : }
615 39 : Ok(())
616 42 : }
617 42 : .instrument({
618 42 : let span = tracing::info_span!(parent: None, "attach", tenant_id=%tenant_id);
619 42 : span.follows_from(Span::current());
620 42 : span
621 42 : }),
622 42 : );
623 42 : Ok(tenant)
624 42 : }
625 :
626 : ///
627 : /// Background task that downloads all data for a tenant and brings it to Active state.
628 : ///
629 : /// No background tasks are started as part of this routine.
630 : ///
631 42 : async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
632 42 : span::debug_assert_current_span_has_tenant_id();
633 42 :
634 42 : let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
635 42 : if !tokio::fs::try_exists(&marker_file)
636 42 : .await
637 42 : .context("check for existence of marker file")?
638 : {
639 0 : anyhow::bail!(
640 0 : "implementation error: marker file should exist at beginning of this function"
641 0 : );
642 42 : }
643 :
644 : // Get list of remote timelines
645 : // download index files for every tenant timeline
646 42 : info!("listing remote timelines");
647 :
648 42 : let remote_storage = self
649 42 : .remote_storage
650 42 : .as_ref()
651 42 : .ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
652 :
653 39 : let remote_timeline_ids =
654 132 : remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
655 :
656 39 : info!("found {} timelines", remote_timeline_ids.len());
657 :
658 : // Download & parse index parts
659 39 : let mut part_downloads = JoinSet::new();
660 94 : for timeline_id in remote_timeline_ids {
661 55 : let client = RemoteTimelineClient::new(
662 55 : remote_storage.clone(),
663 55 : self.conf,
664 55 : self.tenant_id,
665 55 : timeline_id,
666 55 : self.generation,
667 55 : );
668 55 : part_downloads.spawn(
669 55 : async move {
670 55 : debug!("starting index part download");
671 :
672 55 : let index_part = client
673 55 : .download_index_file()
674 211 : .await
675 55 : .context("download index file")?;
676 :
677 55 : debug!("finished index part download");
678 :
679 55 : Result::<_, anyhow::Error>::Ok((timeline_id, client, index_part))
680 55 : }
681 55 : .map(move |res| {
682 55 : res.with_context(|| format!("download index part for timeline {timeline_id}"))
683 55 : })
684 55 : .instrument(info_span!("download_index_part", %timeline_id)),
685 : );
686 : }
687 :
688 39 : let mut timelines_to_resume_deletions = vec![];
689 39 :
690 39 : // Wait for all the download tasks to complete & collect results.
691 39 : let mut remote_index_and_client = HashMap::new();
692 39 : let mut timeline_ancestors = HashMap::new();
693 94 : while let Some(result) = part_downloads.join_next().await {
694 : // NB: we already added timeline_id as context to the error
695 55 : let result: Result<_, anyhow::Error> = result.context("joinset task join")?;
696 55 : let (timeline_id, client, index_part) = result?;
697 0 : debug!("successfully downloaded index part for timeline {timeline_id}");
698 55 : match index_part {
699 49 : MaybeDeletedIndexPart::IndexPart(index_part) => {
700 49 : timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
701 49 : remote_index_and_client.insert(timeline_id, (index_part, client));
702 49 : }
703 6 : MaybeDeletedIndexPart::Deleted(index_part) => {
704 6 : info!(
705 6 : "timeline {} is deleted, picking to resume deletion",
706 6 : timeline_id
707 6 : );
708 6 : timelines_to_resume_deletions.push((timeline_id, index_part, client));
709 : }
710 : }
711 : }
712 :
713 : // For every timeline, download the metadata file, scan the local directory,
714 : // and build a layer map that contains an entry for each remote and local
715 : // layer file.
716 49 : let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
717 88 : for (timeline_id, remote_metadata) in sorted_timelines {
718 49 : let (index_part, remote_client) = remote_index_and_client
719 49 : .remove(&timeline_id)
720 49 : .expect("just put it in above");
721 49 :
722 49 : // TODO again handle early failure
723 49 : self.load_remote_timeline(
724 49 : timeline_id,
725 49 : index_part,
726 49 : remote_metadata,
727 49 : TimelineResources {
728 49 : remote_client: Some(remote_client),
729 49 : },
730 49 : ctx,
731 49 : )
732 98 : .await
733 49 : .with_context(|| {
734 0 : format!(
735 0 : "failed to load remote timeline {} for tenant {}",
736 0 : timeline_id, self.tenant_id
737 0 : )
738 49 : })?;
739 : }
740 :
741 : // Walk through deleted timelines, resume deletion
742 45 : for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
743 6 : remote_timeline_client
744 6 : .init_upload_queue_stopped_to_continue_deletion(&index_part)
745 6 : .context("init queue stopped")
746 6 : .map_err(LoadLocalTimelineError::ResumeDeletion)?;
747 :
748 6 : DeleteTimelineFlow::resume_deletion(
749 6 : Arc::clone(self),
750 6 : timeline_id,
751 6 : &index_part.metadata,
752 6 : Some(remote_timeline_client),
753 6 : None,
754 6 : )
755 0 : .await
756 6 : .context("resume_deletion")
757 6 : .map_err(LoadLocalTimelineError::ResumeDeletion)?;
758 : }
759 :
760 39 : std::fs::remove_file(&marker_file)
761 39 : .with_context(|| format!("unlink attach marker file {}", marker_file.display()))?;
762 39 : crashsafe::fsync(marker_file.parent().expect("marker file has parent dir"))
763 39 : .context("fsync tenant directory after unlinking attach marker file")?;
764 :
765 21 : crate::failpoint_support::sleep_millis_async!("attach-before-activate");
766 :
767 39 : info!("Done");
768 :
769 39 : Ok(())
770 42 : }
771 :
772 : /// Get sum of all remote timelines sizes
773 : ///
774 : /// This function relies on the index_part instead of listing the remote storage
775 40 : pub fn remote_size(&self) -> u64 {
776 40 : let mut size = 0;
777 :
778 46 : for timeline in self.list_timelines() {
779 46 : if let Some(remote_client) = &timeline.remote_client {
780 38 : size += remote_client.get_remote_physical_size();
781 38 : }
782 : }
783 :
784 40 : size
785 40 : }
786 :
787 147 : #[instrument(skip_all, fields(timeline_id=%timeline_id))]
788 : async fn load_remote_timeline(
789 : &self,
790 : timeline_id: TimelineId,
791 : index_part: IndexPart,
792 : remote_metadata: TimelineMetadata,
793 : resources: TimelineResources,
794 : ctx: &RequestContext,
795 : ) -> anyhow::Result<()> {
796 : span::debug_assert_current_span_has_tenant_id();
797 :
798 49 : info!("downloading index file for timeline {}", timeline_id);
799 : tokio::fs::create_dir_all(self.conf.timeline_path(&self.tenant_id, &timeline_id))
800 : .await
801 : .context("Failed to create new timeline directory")?;
802 :
803 : let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
804 : let timelines = self.timelines.lock().unwrap();
805 : Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
806 0 : || {
807 0 : anyhow::anyhow!(
808 0 : "cannot find ancestor timeline {ancestor_id} for timeline {timeline_id}"
809 0 : )
810 0 : },
811 : )?))
812 : } else {
813 : None
814 : };
815 :
816 : // Even if there is local metadata it cannot be ahead of the remote one
817 : // since we're attaching. Even if we resume interrupted attach remote one
818 : // cannot be older than the local one
819 : let local_metadata = None;
820 :
821 : self.timeline_init_and_sync(
822 : timeline_id,
823 : resources,
824 : Some(RemoteStartupData {
825 : index_part,
826 : remote_metadata,
827 : }),
828 : local_metadata,
829 : ancestor,
830 : None,
831 : ctx,
832 : )
833 : .await
834 : }
835 :
836 : /// Create a placeholder Tenant object for a broken tenant
837 0 : pub fn create_broken_tenant(
838 0 : conf: &'static PageServerConf,
839 0 : tenant_id: TenantId,
840 0 : reason: String,
841 0 : ) -> Arc<Tenant> {
842 0 : let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
843 0 : Arc::new(Tenant::new(
844 0 : TenantState::Broken {
845 0 : reason,
846 0 : backtrace: String::new(),
847 0 : },
848 0 : conf,
849 0 : TenantConfOpt::default(),
850 0 : wal_redo_manager,
851 0 : tenant_id,
852 0 : Generation::broken(),
853 0 : None,
854 0 : ))
855 0 : }
856 :
857 : /// Load a tenant that's available on local disk
858 : ///
859 : /// This is used at pageserver startup, to rebuild the in-memory
860 : /// structures from on-disk state. This is similar to attaching a tenant,
861 : /// but the index files already exist on local disk, as well as some layer
862 : /// files.
863 : ///
864 : /// If the loading fails for some reason, the Tenant will go into Broken
865 : /// state.
866 697 : #[instrument(skip_all, fields(tenant_id=%tenant_id))]
867 : pub(crate) fn spawn_load(
868 : conf: &'static PageServerConf,
869 : tenant_id: TenantId,
870 : generation: Generation,
871 : resources: TenantSharedResources,
872 : init_order: Option<InitializationOrder>,
873 : tenants: &'static tokio::sync::RwLock<TenantsMap>,
874 : ctx: &RequestContext,
875 : ) -> Arc<Tenant> {
876 : span::debug_assert_current_span_has_tenant_id();
877 :
878 : let tenant_conf = match Self::load_tenant_config(conf, &tenant_id) {
879 : Ok(conf) => conf,
880 : Err(e) => {
881 0 : error!("load tenant config failed: {:?}", e);
882 : return Tenant::create_broken_tenant(conf, tenant_id, format!("{e:#}"));
883 : }
884 : };
885 :
886 : let broker_client = resources.broker_client;
887 : let remote_storage = resources.remote_storage;
888 :
889 : let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
890 : let tenant = Tenant::new(
891 : TenantState::Loading,
892 : conf,
893 : tenant_conf,
894 : wal_redo_manager,
895 : tenant_id,
896 : generation,
897 : remote_storage.clone(),
898 : );
899 : let tenant = Arc::new(tenant);
900 :
901 : // Do all the hard work in a background task
902 : let tenant_clone = Arc::clone(&tenant);
903 :
904 : let ctx = ctx.detached_child(TaskKind::InitialLoad, DownloadBehavior::Warn);
905 : let _ = task_mgr::spawn(
906 : &tokio::runtime::Handle::current(),
907 : TaskKind::InitialLoad,
908 : Some(tenant_id),
909 : None,
910 : "initial tenant load",
911 : false,
912 697 : async move {
913 697 : // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
914 697 : let make_broken = |t: &Tenant, err: anyhow::Error| {
915 7 : error!("load failed, setting tenant state to Broken: {err:?}");
916 7 : t.state.send_modify(|state| {
917 7 : assert!(
918 7 : matches!(*state, TenantState::Loading | TenantState::Stopping { .. }),
919 0 : "the loading task owns the tenant state until activation is complete"
920 : );
921 7 : *state = TenantState::broken_from_reason(err.to_string());
922 7 : });
923 7 : };
924 :
925 697 : let mut init_order = init_order;
926 697 :
927 697 : // take the completion because initial tenant loading will complete when all of
928 697 : // these tasks complete.
929 697 : let _completion = init_order
930 697 : .as_mut()
931 697 : .and_then(|x| x.initial_tenant_load.take());
932 :
933 : // Dont block pageserver startup on figuring out deletion status
934 697 : let pending_deletion = {
935 697 : match DeleteTenantFlow::should_resume_deletion(
936 697 : conf,
937 697 : remote_storage.as_ref(),
938 697 : &tenant_clone,
939 697 : )
940 1498 : .await
941 : {
942 697 : Ok(should_resume_deletion) => should_resume_deletion,
943 0 : Err(err) => {
944 0 : make_broken(&tenant_clone, anyhow::anyhow!(err));
945 0 : return Ok(());
946 : }
947 : }
948 : };
949 :
950 697 : info!("pending deletion {}", pending_deletion.is_some());
951 :
952 697 : if let Some(deletion) = pending_deletion {
953 : // as we are no longer loading, signal completion by dropping
954 : // the completion while we resume deletion
955 28 : drop(_completion);
956 28 : // do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout
957 28 : let _ = init_order
958 28 : .as_mut()
959 28 : .and_then(|x| x.initial_logical_size_attempt.take());
960 28 :
961 28 : match DeleteTenantFlow::resume_from_load(
962 28 : deletion,
963 28 : &tenant_clone,
964 28 : init_order.as_ref(),
965 28 : tenants,
966 28 : &ctx,
967 28 : )
968 1467 : .await
969 : {
970 0 : Err(err) => {
971 0 : make_broken(&tenant_clone, anyhow::anyhow!(err));
972 0 : return Ok(());
973 : }
974 28 : Ok(()) => return Ok(()),
975 : }
976 669 : }
977 669 :
978 669 : let background_jobs_can_start =
979 669 : init_order.as_ref().map(|x| &x.background_jobs_can_start);
980 669 :
981 1304 : match tenant_clone.load(init_order.as_ref(), &ctx).await {
982 : Ok(()) => {
983 662 : debug!("load finished");
984 :
985 662 : tenant_clone.activate(broker_client, background_jobs_can_start, &ctx);
986 : }
987 7 : Err(err) => make_broken(&tenant_clone, err),
988 : }
989 :
990 669 : Ok(())
991 697 : }
992 : .instrument({
993 : let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id);
994 : span.follows_from(Span::current());
995 : span
996 : }),
997 : );
998 :
999 : tenant
1000 : }
1001 :
1002 734 : fn scan_and_sort_timelines_dir(self: Arc<Tenant>) -> anyhow::Result<TenantDirectoryScan> {
1003 734 : let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
1004 734 : // Note timelines_to_resume_deletion needs to be separate because it can be not sortable
1005 734 : // from the point of `tree_sort_timelines`. I e some parents can be missing because deletion
1006 734 : // completed in non topological order (for example because parent has smaller number of layer files in it)
1007 734 : let mut timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)> = vec![];
1008 734 :
1009 734 : let timelines_dir = self.conf.timelines_path(&self.tenant_id);
1010 :
1011 338 : for entry in
1012 734 : std::fs::read_dir(&timelines_dir).context("list timelines directory for tenant")?
1013 : {
1014 338 : let entry = entry.context("read timeline dir entry")?;
1015 338 : let timeline_dir = entry.path();
1016 338 :
1017 338 : if crate::is_temporary(&timeline_dir) {
1018 0 : info!(
1019 0 : "Found temporary timeline directory, removing: {}",
1020 0 : timeline_dir.display()
1021 0 : );
1022 0 : if let Err(e) = std::fs::remove_dir_all(&timeline_dir) {
1023 0 : error!(
1024 0 : "Failed to remove temporary directory '{}': {:?}",
1025 0 : timeline_dir.display(),
1026 0 : e
1027 0 : );
1028 0 : }
1029 338 : } else if is_uninit_mark(&timeline_dir) {
1030 1 : if !timeline_dir.exists() {
1031 0 : warn!(
1032 0 : "Timeline dir entry become invalid: {}",
1033 0 : timeline_dir.display()
1034 0 : );
1035 0 : continue;
1036 1 : }
1037 1 :
1038 1 : let timeline_uninit_mark_file = &timeline_dir;
1039 1 : info!(
1040 1 : "Found an uninit mark file {}, removing the timeline and its uninit mark",
1041 1 : timeline_uninit_mark_file.display()
1042 1 : );
1043 1 : let timeline_id = TimelineId::try_from(timeline_uninit_mark_file.file_stem())
1044 1 : .with_context(|| {
1045 0 : format!(
1046 0 : "Could not parse timeline id out of the timeline uninit mark name {}",
1047 0 : timeline_uninit_mark_file.display()
1048 0 : )
1049 1 : })?;
1050 1 : let timeline_dir = self.conf.timeline_path(&self.tenant_id, &timeline_id);
1051 0 : if let Err(e) =
1052 1 : remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
1053 : {
1054 0 : error!("Failed to clean up uninit marked timeline: {e:?}");
1055 1 : }
1056 337 : } else if crate::is_delete_mark(&timeline_dir) {
1057 : // If metadata exists, load as usual, continue deletion
1058 31 : let timeline_id =
1059 31 : TimelineId::try_from(timeline_dir.file_stem()).with_context(|| {
1060 0 : format!(
1061 0 : "Could not parse timeline id out of the timeline uninit mark name {}",
1062 0 : timeline_dir.display()
1063 0 : )
1064 31 : })?;
1065 :
1066 31 : info!("Found deletion mark for timeline {}", timeline_id);
1067 :
1068 31 : match load_metadata(self.conf, &self.tenant_id, &timeline_id) {
1069 22 : Ok(metadata) => {
1070 22 : timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
1071 : }
1072 9 : Err(e) => match &e {
1073 9 : LoadMetadataError::Read(r) => {
1074 9 : if r.kind() != io::ErrorKind::NotFound {
1075 0 : return Err(anyhow::anyhow!(e)).with_context(|| {
1076 0 : format!("Failed to load metadata for timeline_id {timeline_id}")
1077 0 : });
1078 9 : }
1079 9 :
1080 9 : // If metadata doesnt exist it means that we've crashed without
1081 9 : // completing cleanup_remaining_timeline_fs_traces in DeleteTimelineFlow.
1082 9 : // So save timeline_id for later call to `DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`.
1083 9 : // We cant do it here because the method is async so we'd need block_on
1084 9 : // and here we're in spawn_blocking. cleanup_remaining_timeline_fs_traces uses fs operations
1085 9 : // so that basically results in a cycle:
1086 9 : // spawn_blocking
1087 9 : // - block_on
1088 9 : // - spawn_blocking
1089 9 : // which can lead to running out of threads in blocing pool.
1090 9 : timelines_to_resume_deletion.push((timeline_id, None));
1091 : }
1092 : _ => {
1093 0 : return Err(anyhow::anyhow!(e)).with_context(|| {
1094 0 : format!("Failed to load metadata for timeline_id {timeline_id}")
1095 0 : })
1096 : }
1097 : },
1098 : }
1099 : } else {
1100 306 : if !timeline_dir.exists() {
1101 1 : warn!(
1102 1 : "Timeline dir entry become invalid: {}",
1103 1 : timeline_dir.display()
1104 1 : );
1105 1 : continue;
1106 305 : }
1107 305 : let timeline_id =
1108 305 : TimelineId::try_from(timeline_dir.file_name()).with_context(|| {
1109 0 : format!(
1110 0 : "Could not parse timeline id out of the timeline dir name {}",
1111 0 : timeline_dir.display()
1112 0 : )
1113 305 : })?;
1114 305 : let timeline_uninit_mark_file = self
1115 305 : .conf
1116 305 : .timeline_uninit_mark_file_path(self.tenant_id, timeline_id);
1117 305 : if timeline_uninit_mark_file.exists() {
1118 0 : info!(
1119 0 : %timeline_id,
1120 0 : "Found an uninit mark file, removing the timeline and its uninit mark",
1121 0 : );
1122 0 : if let Err(e) =
1123 0 : remove_timeline_and_uninit_mark(&timeline_dir, &timeline_uninit_mark_file)
1124 : {
1125 0 : error!("Failed to clean up uninit marked timeline: {e:?}");
1126 0 : }
1127 0 : continue;
1128 305 : }
1129 305 :
1130 305 : let timeline_delete_mark_file = self
1131 305 : .conf
1132 305 : .timeline_delete_mark_file_path(self.tenant_id, timeline_id);
1133 305 : if timeline_delete_mark_file.exists() {
1134 : // Cleanup should be done in `is_delete_mark` branch above
1135 25 : continue;
1136 280 : }
1137 280 :
1138 280 : let file_name = entry.file_name();
1139 280 : if let Ok(timeline_id) =
1140 280 : file_name.to_str().unwrap_or_default().parse::<TimelineId>()
1141 277 : {
1142 280 : let metadata = load_metadata(self.conf, &self.tenant_id, &timeline_id)
1143 280 : .context("failed to load metadata")?;
1144 277 : timelines_to_load.insert(timeline_id, metadata);
1145 : } else {
1146 : // A file or directory that doesn't look like a timeline ID
1147 0 : warn!(
1148 0 : "unexpected file or directory in timelines directory: {}",
1149 0 : file_name.to_string_lossy()
1150 0 : );
1151 : }
1152 : }
1153 : }
1154 :
1155 : // Sort the array of timeline IDs into tree-order, so that parent comes before
1156 : // all its children.
1157 727 : tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
1158 727 : TenantDirectoryScan {
1159 727 : sorted_timelines_to_load: sorted_timelines,
1160 727 : timelines_to_resume_deletion,
1161 727 : }
1162 727 : })
1163 734 : }
1164 :
1165 : ///
1166 : /// Background task to load in-memory data structures for this tenant, from
1167 : /// files on disk. Used at pageserver startup.
1168 : ///
1169 : /// No background tasks are started as part of this routine.
1170 734 : async fn load(
1171 734 : self: &Arc<Tenant>,
1172 734 : init_order: Option<&InitializationOrder>,
1173 734 : ctx: &RequestContext,
1174 734 : ) -> anyhow::Result<()> {
1175 734 : span::debug_assert_current_span_has_tenant_id();
1176 :
1177 0 : debug!("loading tenant task");
1178 :
1179 3 : crate::failpoint_support::sleep_millis_async!("before-loading-tenant");
1180 :
1181 : // Load in-memory state to reflect the local files on disk
1182 : //
1183 : // Scan the directory, peek into the metadata file of each timeline, and
1184 : // collect a list of timelines and their ancestors.
1185 734 : let span = info_span!("blocking");
1186 734 : let cloned = Arc::clone(self);
1187 :
1188 734 : let scan = tokio::task::spawn_blocking(move || {
1189 734 : let _g = span.entered();
1190 734 : cloned.scan_and_sort_timelines_dir()
1191 734 : })
1192 734 : .await
1193 734 : .context("load spawn_blocking")
1194 734 : .and_then(|res| res)?;
1195 :
1196 : // FIXME original collect_timeline_files contained one more check:
1197 : // 1. "Timeline has no ancestor and no layer files"
1198 :
1199 : // Process loadable timelines first
1200 1003 : for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
1201 277 : if let Err(e) = self
1202 277 : .load_local_timeline(timeline_id, local_metadata, init_order, ctx, false)
1203 676 : .await
1204 : {
1205 1 : match e {
1206 1 : LoadLocalTimelineError::Load(source) => {
1207 1 : return Err(anyhow::anyhow!(source)).with_context(|| {
1208 1 : format!("Failed to load local timeline: {timeline_id}")
1209 1 : })
1210 : }
1211 0 : LoadLocalTimelineError::ResumeDeletion(source) => {
1212 : // Make sure resumed deletion wont fail loading for entire tenant.
1213 0 : error!("Failed to resume timeline deletion: {source:#}")
1214 : }
1215 : }
1216 276 : }
1217 : }
1218 :
1219 : // Resume deletion ones with deleted_mark
1220 757 : for (timeline_id, maybe_local_metadata) in scan.timelines_to_resume_deletion {
1221 31 : match maybe_local_metadata {
1222 : None => {
1223 : // See comment in `scan_and_sort_timelines_dir`.
1224 0 : if let Err(e) =
1225 9 : DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id)
1226 45 : .await
1227 : {
1228 0 : warn!(
1229 0 : "cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
1230 0 : timeline_id, e
1231 0 : );
1232 9 : }
1233 : }
1234 22 : Some(local_metadata) => {
1235 22 : if let Err(e) = self
1236 22 : .load_local_timeline(timeline_id, local_metadata, init_order, ctx, true)
1237 59 : .await
1238 : {
1239 0 : match e {
1240 0 : LoadLocalTimelineError::Load(source) => {
1241 0 : // We tried to load deleted timeline, this is a bug.
1242 0 : return Err(anyhow::anyhow!(source).context(
1243 0 : "This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}"
1244 0 : ));
1245 : }
1246 0 : LoadLocalTimelineError::ResumeDeletion(source) => {
1247 : // Make sure resumed deletion wont fail loading for entire tenant.
1248 0 : error!("Failed to resume timeline deletion: {source:#}")
1249 : }
1250 : }
1251 22 : }
1252 : }
1253 : }
1254 : }
1255 :
1256 0 : trace!("Done");
1257 :
1258 726 : Ok(())
1259 734 : }
1260 :
1261 : /// Subroutine of `load_tenant`, to load an individual timeline
1262 : ///
1263 : /// NB: The parent is assumed to be already loaded!
1264 598 : #[instrument(skip(self, local_metadata, init_order, ctx))]
1265 : async fn load_local_timeline(
1266 : self: &Arc<Self>,
1267 : timeline_id: TimelineId,
1268 : local_metadata: TimelineMetadata,
1269 : init_order: Option<&InitializationOrder>,
1270 : ctx: &RequestContext,
1271 : found_delete_mark: bool,
1272 : ) -> Result<(), LoadLocalTimelineError> {
1273 : span::debug_assert_current_span_has_tenant_id();
1274 :
1275 : let mut resources = self.build_timeline_resources(timeline_id);
1276 :
1277 : let (remote_startup_data, remote_client) = match resources.remote_client {
1278 : Some(remote_client) => match remote_client.download_index_file().await {
1279 : Ok(index_part) => {
1280 : let index_part = match index_part {
1281 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
1282 : MaybeDeletedIndexPart::Deleted(index_part) => {
1283 : // TODO: we won't reach here if remote storage gets de-configured after start of the deletion operation.
1284 : // Example:
1285 : // start deletion operation
1286 : // finishes upload of index part
1287 : // pageserver crashes
1288 : // remote storage gets de-configured
1289 : // pageserver starts
1290 : //
1291 : // We don't really anticipate remote storage to be de-configured, so, for now, this is fine.
1292 : // Also, maybe we'll remove that option entirely in the future, see https://github.com/neondatabase/neon/issues/4099.
1293 15 : info!("is_deleted is set on remote, resuming removal of timeline data originally done by timeline deletion handler");
1294 :
1295 : remote_client
1296 : .init_upload_queue_stopped_to_continue_deletion(&index_part)
1297 : .context("init queue stopped")
1298 : .map_err(LoadLocalTimelineError::ResumeDeletion)?;
1299 :
1300 : DeleteTimelineFlow::resume_deletion(
1301 : Arc::clone(self),
1302 : timeline_id,
1303 : &local_metadata,
1304 : Some(remote_client),
1305 : init_order,
1306 : )
1307 : .await
1308 : .context("resume deletion")
1309 : .map_err(LoadLocalTimelineError::ResumeDeletion)?;
1310 :
1311 : return Ok(());
1312 : }
1313 : };
1314 :
1315 : let remote_metadata = index_part.metadata.clone();
1316 : (
1317 : Some(RemoteStartupData {
1318 : index_part,
1319 : remote_metadata,
1320 : }),
1321 : Some(remote_client),
1322 : )
1323 : }
1324 : Err(DownloadError::NotFound) => {
1325 3 : info!("no index file was found on the remote, found_delete_mark: {found_delete_mark}");
1326 :
1327 : if found_delete_mark {
1328 : // We could've resumed at a point where remote index was deleted, but metadata file wasnt.
1329 : // Cleanup:
1330 : return DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(
1331 : self,
1332 : timeline_id,
1333 : )
1334 : .await
1335 : .context("cleanup_remaining_timeline_fs_traces")
1336 : .map_err(LoadLocalTimelineError::ResumeDeletion);
1337 : }
1338 :
1339 : // We're loading fresh timeline that didnt yet make it into remote.
1340 : (None, Some(remote_client))
1341 : }
1342 : Err(e) => return Err(LoadLocalTimelineError::Load(anyhow::Error::new(e))),
1343 : },
1344 : None => {
1345 : // No remote client
1346 : if found_delete_mark {
1347 : // There is no remote client, we found local metadata.
1348 : // Continue cleaning up local disk.
1349 : DeleteTimelineFlow::resume_deletion(
1350 : Arc::clone(self),
1351 : timeline_id,
1352 : &local_metadata,
1353 : None,
1354 : init_order,
1355 : )
1356 : .await
1357 : .context("resume deletion")
1358 : .map_err(LoadLocalTimelineError::ResumeDeletion)?;
1359 : return Ok(());
1360 : }
1361 :
1362 : (None, resources.remote_client)
1363 : }
1364 : };
1365 : resources.remote_client = remote_client;
1366 :
1367 : let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
1368 : let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
1369 0 : .with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))
1370 : .map_err(LoadLocalTimelineError::Load)?;
1371 : Some(ancestor_timeline)
1372 : } else {
1373 : None
1374 : };
1375 :
1376 : self.timeline_init_and_sync(
1377 : timeline_id,
1378 : resources,
1379 : remote_startup_data,
1380 : Some(local_metadata),
1381 : ancestor,
1382 : init_order,
1383 : ctx,
1384 : )
1385 : .await
1386 : .map_err(LoadLocalTimelineError::Load)
1387 : }
1388 :
1389 1213 : pub fn tenant_id(&self) -> TenantId {
1390 1213 : self.tenant_id
1391 1213 : }
1392 :
1393 : /// Get Timeline handle for given Neon timeline ID.
1394 : /// This function is idempotent. It doesn't change internal state in any way.
1395 10280 : pub fn get_timeline(
1396 10280 : &self,
1397 10280 : timeline_id: TimelineId,
1398 10280 : active_only: bool,
1399 10280 : ) -> Result<Arc<Timeline>, GetTimelineError> {
1400 10280 : let timelines_accessor = self.timelines.lock().unwrap();
1401 10280 : let timeline = timelines_accessor
1402 10280 : .get(&timeline_id)
1403 10280 : .ok_or(GetTimelineError::NotFound {
1404 10280 : tenant_id: self.tenant_id,
1405 10280 : timeline_id,
1406 10280 : })?;
1407 :
1408 9286 : if active_only && !timeline.is_active() {
1409 0 : Err(GetTimelineError::NotActive {
1410 0 : tenant_id: self.tenant_id,
1411 0 : timeline_id,
1412 0 : state: timeline.current_state(),
1413 0 : })
1414 : } else {
1415 9286 : Ok(Arc::clone(timeline))
1416 : }
1417 10280 : }
1418 :
1419 : /// Lists timelines the tenant contains.
1420 : /// Up to tenant's implementation to omit certain timelines that ar not considered ready for use.
1421 661 : pub fn list_timelines(&self) -> Vec<Arc<Timeline>> {
1422 661 : self.timelines
1423 661 : .lock()
1424 661 : .unwrap()
1425 661 : .values()
1426 661 : .map(Arc::clone)
1427 661 : .collect()
1428 661 : }
1429 :
1430 : /// This is used to create the initial 'main' timeline during bootstrapping,
1431 : /// or when importing a new base backup. The caller is expected to load an
1432 : /// initial image of the datadir to the new timeline after this.
1433 : ///
1434 : /// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
1435 : /// and the timeline will fail to load at a restart.
1436 : ///
1437 : /// That's why we add an uninit mark file, and wrap it together witht the Timeline
1438 : /// in-memory object into UninitializedTimeline.
1439 : /// Once the caller is done setting up the timeline, they should call
1440 : /// `UninitializedTimeline::initialize_with_lock` to remove the uninit mark.
1441 : ///
1442 : /// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
1443 : /// minimum amount of keys required to get a writable timeline.
1444 : /// (Without it, `put` might fail due to `repartition` failing.)
1445 41 : pub async fn create_empty_timeline(
1446 41 : &self,
1447 41 : new_timeline_id: TimelineId,
1448 41 : initdb_lsn: Lsn,
1449 41 : pg_version: u32,
1450 41 : _ctx: &RequestContext,
1451 41 : ) -> anyhow::Result<UninitializedTimeline> {
1452 41 : anyhow::ensure!(
1453 41 : self.is_active(),
1454 0 : "Cannot create empty timelines on inactive tenant"
1455 : );
1456 :
1457 40 : let timeline_uninit_mark = {
1458 41 : let timelines = self.timelines.lock().unwrap();
1459 41 : self.create_timeline_uninit_mark(new_timeline_id, &timelines)?
1460 : };
1461 40 : let new_metadata = TimelineMetadata::new(
1462 40 : // Initialize disk_consistent LSN to 0, The caller must import some data to
1463 40 : // make it valid, before calling finish_creation()
1464 40 : Lsn(0),
1465 40 : None,
1466 40 : None,
1467 40 : Lsn(0),
1468 40 : initdb_lsn,
1469 40 : initdb_lsn,
1470 40 : pg_version,
1471 40 : );
1472 40 : self.prepare_new_timeline(
1473 40 : new_timeline_id,
1474 40 : &new_metadata,
1475 40 : timeline_uninit_mark,
1476 40 : initdb_lsn,
1477 40 : None,
1478 40 : )
1479 0 : .await
1480 41 : }
1481 :
1482 : /// Helper for unit tests to create an empty timeline.
1483 : ///
1484 : /// The timeline is has state value `Active` but its background loops are not running.
1485 : // This makes the various functions which anyhow::ensure! for Active state work in tests.
1486 : // Our current tests don't need the background loops.
1487 : #[cfg(test)]
1488 33 : pub async fn create_test_timeline(
1489 33 : &self,
1490 33 : new_timeline_id: TimelineId,
1491 33 : initdb_lsn: Lsn,
1492 33 : pg_version: u32,
1493 33 : ctx: &RequestContext,
1494 33 : ) -> anyhow::Result<Arc<Timeline>> {
1495 33 : let uninit_tl = self
1496 33 : .create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
1497 0 : .await?;
1498 33 : let tline = uninit_tl.raw_timeline().expect("we just created it");
1499 33 : assert_eq!(tline.get_last_record_lsn(), Lsn(0));
1500 :
1501 : // Setup minimum keys required for the timeline to be usable.
1502 33 : let mut modification = tline.begin_modification(initdb_lsn);
1503 33 : modification
1504 33 : .init_empty_test_timeline()
1505 33 : .context("init_empty_test_timeline")?;
1506 33 : modification
1507 33 : .commit()
1508 0 : .await
1509 33 : .context("commit init_empty_test_timeline modification")?;
1510 :
1511 : // Flush to disk so that uninit_tl's check for valid disk_consistent_lsn passes.
1512 33 : tline.maybe_spawn_flush_loop();
1513 33 : tline.freeze_and_flush().await.context("freeze_and_flush")?;
1514 :
1515 : // Make sure the freeze_and_flush reaches remote storage.
1516 33 : tline
1517 33 : .remote_client
1518 33 : .as_ref()
1519 33 : .unwrap()
1520 33 : .wait_completion()
1521 33 : .await
1522 33 : .unwrap();
1523 :
1524 33 : let tl = uninit_tl.finish_creation()?;
1525 : // The non-test code would call tl.activate() here.
1526 33 : tl.set_state(TimelineState::Active);
1527 33 : Ok(tl)
1528 33 : }
1529 :
1530 : /// Create a new timeline.
1531 : ///
1532 : /// Returns the new timeline ID and reference to its Timeline object.
1533 : ///
1534 : /// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
1535 : /// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
1536 906 : pub async fn create_timeline(
1537 906 : &self,
1538 906 : new_timeline_id: TimelineId,
1539 906 : ancestor_timeline_id: Option<TimelineId>,
1540 906 : mut ancestor_start_lsn: Option<Lsn>,
1541 906 : pg_version: u32,
1542 906 : broker_client: storage_broker::BrokerClientChannel,
1543 906 : ctx: &RequestContext,
1544 906 : ) -> Result<Arc<Timeline>, CreateTimelineError> {
1545 906 : if !self.is_active() {
1546 0 : return Err(CreateTimelineError::Other(anyhow::anyhow!(
1547 0 : "Cannot create timelines on inactive tenant"
1548 0 : )));
1549 906 : }
1550 :
1551 906 : if let Ok(existing) = self.get_timeline(new_timeline_id, false) {
1552 0 : debug!("timeline {new_timeline_id} already exists");
1553 :
1554 1 : if let Some(remote_client) = existing.remote_client.as_ref() {
1555 : // Wait for uploads to complete, so that when we return Ok, the timeline
1556 : // is known to be durable on remote storage. Just like we do at the end of
1557 : // this function, after we have created the timeline ourselves.
1558 : //
1559 : // We only really care that the initial version of `index_part.json` has
1560 : // been uploaded. That's enough to remember that the timeline
1561 : // exists. However, there is no function to wait specifically for that so
1562 : // we just wait for all in-progress uploads to finish.
1563 1 : remote_client
1564 1 : .wait_completion()
1565 1 : .await
1566 1 : .context("wait for timeline uploads to complete")?;
1567 0 : }
1568 :
1569 1 : return Err(CreateTimelineError::AlreadyExists);
1570 905 : }
1571 :
1572 905 : let loaded_timeline = match ancestor_timeline_id {
1573 261 : Some(ancestor_timeline_id) => {
1574 261 : let ancestor_timeline = self
1575 261 : .get_timeline(ancestor_timeline_id, false)
1576 261 : .context("Cannot branch off the timeline that's not present in pageserver")?;
1577 :
1578 261 : if let Some(lsn) = ancestor_start_lsn.as_mut() {
1579 35 : *lsn = lsn.align();
1580 35 :
1581 35 : let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn();
1582 35 : if ancestor_ancestor_lsn > *lsn {
1583 : // can we safely just branch from the ancestor instead?
1584 2 : return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
1585 2 : "invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
1586 2 : lsn,
1587 2 : ancestor_timeline_id,
1588 2 : ancestor_ancestor_lsn,
1589 2 : )));
1590 33 : }
1591 33 :
1592 33 : // Wait for the WAL to arrive and be processed on the parent branch up
1593 33 : // to the requested branch point. The repository code itself doesn't
1594 33 : // require it, but if we start to receive WAL on the new timeline,
1595 33 : // decoding the new WAL might need to look up previous pages, relation
1596 33 : // sizes etc. and that would get confused if the previous page versions
1597 33 : // are not in the repository yet.
1598 33 : ancestor_timeline.wait_lsn(*lsn, ctx).await?;
1599 226 : }
1600 :
1601 259 : self.branch_timeline(&ancestor_timeline, new_timeline_id, ancestor_start_lsn, ctx)
1602 6 : .await?
1603 : }
1604 : None => {
1605 644 : self.bootstrap_timeline(new_timeline_id, pg_version, ctx)
1606 3210901 : .await?
1607 : }
1608 : };
1609 :
1610 895 : loaded_timeline.activate(broker_client, None, ctx);
1611 :
1612 895 : if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
1613 : // Wait for the upload of the 'index_part.json` file to finish, so that when we return
1614 : // Ok, the timeline is durable in remote storage.
1615 418 : let kind = ancestor_timeline_id
1616 418 : .map(|_| "branched")
1617 418 : .unwrap_or("bootstrapped");
1618 418 : remote_client.wait_completion().await.with_context(|| {
1619 0 : format!("wait for {} timeline initial uploads to complete", kind)
1620 417 : })?;
1621 477 : }
1622 :
1623 894 : Ok(loaded_timeline)
1624 905 : }
1625 :
1626 : /// perform one garbage collection iteration, removing old data files from disk.
1627 : /// this function is periodically called by gc task.
1628 : /// also it can be explicitly requested through page server api 'do_gc' command.
1629 : ///
1630 : /// `target_timeline_id` specifies the timeline to GC, or None for all.
1631 : ///
1632 : /// The `horizon` an `pitr` parameters determine how much WAL history needs to be retained.
1633 : /// Also known as the retention period, or the GC cutoff point. `horizon` specifies
1634 : /// the amount of history, as LSN difference from current latest LSN on each timeline.
1635 : /// `pitr` specifies the same as a time difference from the current time. The effective
1636 : /// GC cutoff point is determined conservatively by either `horizon` and `pitr`, whichever
1637 : /// requires more history to be retained.
1638 : //
1639 544 : pub async fn gc_iteration(
1640 544 : &self,
1641 544 : target_timeline_id: Option<TimelineId>,
1642 544 : horizon: u64,
1643 544 : pitr: Duration,
1644 544 : ctx: &RequestContext,
1645 544 : ) -> anyhow::Result<GcResult> {
1646 544 : // there is a global allowed_error for this
1647 544 : anyhow::ensure!(
1648 544 : self.is_active(),
1649 0 : "Cannot run GC iteration on inactive tenant"
1650 : );
1651 :
1652 544 : self.gc_iteration_internal(target_timeline_id, horizon, pitr, ctx)
1653 213123 : .await
1654 539 : }
1655 :
1656 : /// Perform one compaction iteration.
1657 : /// This function is periodically called by compactor task.
1658 : /// Also it can be explicitly requested per timeline through page server
1659 : /// api's 'compact' command.
1660 263 : pub async fn compaction_iteration(
1661 263 : &self,
1662 263 : cancel: &CancellationToken,
1663 263 : ctx: &RequestContext,
1664 263 : ) -> anyhow::Result<()> {
1665 263 : anyhow::ensure!(
1666 263 : self.is_active(),
1667 0 : "Cannot run compaction iteration on inactive tenant"
1668 : );
1669 :
1670 : // Scan through the hashmap and collect a list of all the timelines,
1671 : // while holding the lock. Then drop the lock and actually perform the
1672 : // compactions. We don't want to block everything else while the
1673 : // compaction runs.
1674 263 : let timelines_to_compact = {
1675 263 : let timelines = self.timelines.lock().unwrap();
1676 263 : let timelines_to_compact = timelines
1677 263 : .iter()
1678 435 : .filter_map(|(timeline_id, timeline)| {
1679 435 : if timeline.is_active() {
1680 435 : Some((*timeline_id, timeline.clone()))
1681 : } else {
1682 0 : None
1683 : }
1684 435 : })
1685 263 : .collect::<Vec<_>>();
1686 263 : drop(timelines);
1687 263 : timelines_to_compact
1688 : };
1689 :
1690 683 : for (timeline_id, timeline) in &timelines_to_compact {
1691 430 : timeline
1692 430 : .compact(cancel, ctx)
1693 430 : .instrument(info_span!("compact_timeline", %timeline_id))
1694 820174 : .await?;
1695 : }
1696 :
1697 253 : Ok(())
1698 253 : }
1699 :
1700 7784 : pub fn current_state(&self) -> TenantState {
1701 7784 : self.state.borrow().clone()
1702 7784 : }
1703 :
1704 5433 : pub fn is_active(&self) -> bool {
1705 5433 : self.current_state() == TenantState::Active
1706 5433 : }
1707 :
1708 : /// Changes tenant status to active, unless shutdown was already requested.
1709 : ///
1710 : /// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
1711 : /// to delay background jobs. Background jobs can be started right away when None is given.
1712 698 : fn activate(
1713 698 : self: &Arc<Self>,
1714 698 : broker_client: BrokerClientChannel,
1715 698 : background_jobs_can_start: Option<&completion::Barrier>,
1716 698 : ctx: &RequestContext,
1717 698 : ) {
1718 698 : span::debug_assert_current_span_has_tenant_id();
1719 698 :
1720 698 : let mut activating = false;
1721 698 : self.state.send_modify(|current_state| {
1722 698 : use pageserver_api::models::ActivatingFrom;
1723 698 : match &*current_state {
1724 : TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => {
1725 0 : panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
1726 : }
1727 662 : TenantState::Loading => {
1728 662 : *current_state = TenantState::Activating(ActivatingFrom::Loading);
1729 662 : }
1730 36 : TenantState::Attaching => {
1731 36 : *current_state = TenantState::Activating(ActivatingFrom::Attaching);
1732 36 : }
1733 : }
1734 698 : debug!(tenant_id = %self.tenant_id, "Activating tenant");
1735 698 : activating = true;
1736 698 : // Continue outside the closure. We need to grab timelines.lock()
1737 698 : // and we plan to turn it into a tokio::sync::Mutex in a future patch.
1738 698 : });
1739 698 :
1740 698 : if activating {
1741 698 : let timelines_accessor = self.timelines.lock().unwrap();
1742 698 : let timelines_to_activate = timelines_accessor
1743 698 : .values()
1744 698 : .filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
1745 698 :
1746 698 : // Spawn gc and compaction loops. The loops will shut themselves
1747 698 : // down when they notice that the tenant is inactive.
1748 698 : tasks::start_background_loops(self, background_jobs_can_start);
1749 698 :
1750 698 : let mut activated_timelines = 0;
1751 :
1752 990 : for timeline in timelines_to_activate {
1753 292 : timeline.activate(broker_client.clone(), background_jobs_can_start, ctx);
1754 292 : activated_timelines += 1;
1755 292 : }
1756 :
1757 698 : self.state.send_modify(move |current_state| {
1758 698 : assert!(
1759 698 : matches!(current_state, TenantState::Activating(_)),
1760 0 : "set_stopping and set_broken wait for us to leave Activating state",
1761 : );
1762 698 : *current_state = TenantState::Active;
1763 698 :
1764 698 : let elapsed = self.loading_started_at.elapsed();
1765 698 : let total_timelines = timelines_accessor.len();
1766 698 :
1767 698 : // log a lot of stuff, because some tenants sometimes suffer from user-visible
1768 698 : // times to activate. see https://github.com/neondatabase/neon/issues/4025
1769 698 : info!(
1770 698 : since_creation_millis = elapsed.as_millis(),
1771 698 : tenant_id = %self.tenant_id,
1772 698 : activated_timelines,
1773 698 : total_timelines,
1774 698 : post_state = <&'static str>::from(&*current_state),
1775 698 : "activation attempt finished"
1776 698 : );
1777 :
1778 698 : TENANT_ACTIVATION.observe(elapsed.as_secs_f64());
1779 698 : });
1780 0 : }
1781 698 : }
1782 :
1783 : /// Shutdown the tenant and join all of the spawned tasks.
1784 : ///
1785 : /// The method caters for all use-cases:
1786 : /// - pageserver shutdown (freeze_and_flush == true)
1787 : /// - detach + ignore (freeze_and_flush == false)
1788 : ///
1789 : /// This will attempt to shutdown even if tenant is broken.
1790 : ///
1791 : /// `shutdown_progress` is a [`completion::Barrier`] for the shutdown initiated by this call.
1792 : /// If the tenant is already shutting down, we return a clone of the first shutdown call's
1793 : /// `Barrier` as an `Err`. This not-first caller can use the returned barrier to join with
1794 : /// the ongoing shutdown.
1795 336 : async fn shutdown(
1796 336 : &self,
1797 336 : shutdown_progress: completion::Barrier,
1798 336 : freeze_and_flush: bool,
1799 336 : ) -> Result<(), completion::Barrier> {
1800 336 : span::debug_assert_current_span_has_tenant_id();
1801 336 : // Set tenant (and its timlines) to Stoppping state.
1802 336 : //
1803 336 : // Since we can only transition into Stopping state after activation is complete,
1804 336 : // run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed.
1805 336 : //
1806 336 : // Transitioning tenants to Stopping state has a couple of non-obvious side effects:
1807 336 : // 1. Lock out any new requests to the tenants.
1808 336 : // 2. Signal cancellation to WAL receivers (we wait on it below).
1809 336 : // 3. Signal cancellation for other tenant background loops.
1810 336 : // 4. ???
1811 336 : //
1812 336 : // The waiting for the cancellation is not done uniformly.
1813 336 : // We certainly wait for WAL receivers to shut down.
1814 336 : // That is necessary so that no new data comes in before the freeze_and_flush.
1815 336 : // But the tenant background loops are joined-on in our caller.
1816 336 : // It's mesed up.
1817 336 : // we just ignore the failure to stop
1818 336 :
1819 336 : match self.set_stopping(shutdown_progress, false, false).await {
1820 256 : Ok(()) => {}
1821 77 : Err(SetStoppingError::Broken) => {
1822 77 : // assume that this is acceptable
1823 77 : }
1824 3 : Err(SetStoppingError::AlreadyStopping(other)) => {
1825 3 : // give caller the option to wait for this this shutdown
1826 3 : return Err(other);
1827 : }
1828 : };
1829 :
1830 333 : let mut js = tokio::task::JoinSet::new();
1831 333 : {
1832 333 : let timelines = self.timelines.lock().unwrap();
1833 525 : timelines.values().for_each(|timeline| {
1834 525 : let timeline = Arc::clone(timeline);
1835 525 : let span = Span::current();
1836 525 : js.spawn(async move { timeline.shutdown(freeze_and_flush).instrument(span).await });
1837 525 : })
1838 : };
1839 858 : while let Some(res) = js.join_next().await {
1840 0 : match res {
1841 525 : Ok(()) => {}
1842 0 : Err(je) if je.is_cancelled() => unreachable!("no cancelling used"),
1843 0 : Err(je) if je.is_panic() => { /* logged already */ }
1844 0 : Err(je) => warn!("unexpected JoinError: {je:?}"),
1845 : }
1846 : }
1847 :
1848 : // shutdown all tenant and timeline tasks: gc, compaction, page service
1849 : // No new tasks will be started for this tenant because it's in `Stopping` state.
1850 : //
1851 : // this will additionally shutdown and await all timeline tasks.
1852 616 : task_mgr::shutdown_tasks(None, Some(self.tenant_id), None).await;
1853 :
1854 333 : Ok(())
1855 336 : }
1856 :
1857 : /// Change tenant status to Stopping, to mark that it is being shut down.
1858 : ///
1859 : /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
1860 : ///
1861 : /// This function is not cancel-safe!
1862 : ///
1863 : /// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
1864 : /// `allow_transition_from_attaching` is needed for the special case of attaching deleted tenant.
1865 367 : async fn set_stopping(
1866 367 : &self,
1867 367 : progress: completion::Barrier,
1868 367 : allow_transition_from_loading: bool,
1869 367 : allow_transition_from_attaching: bool,
1870 367 : ) -> Result<(), SetStoppingError> {
1871 367 : let mut rx = self.state.subscribe();
1872 367 :
1873 367 : // cannot stop before we're done activating, so wait out until we're done activating
1874 367 : rx.wait_for(|state| match state {
1875 3 : TenantState::Attaching if allow_transition_from_attaching => true,
1876 : TenantState::Activating(_) | TenantState::Attaching => {
1877 7 : info!(
1878 7 : "waiting for {} to turn Active|Broken|Stopping",
1879 7 : <&'static str>::from(state)
1880 7 : );
1881 7 : false
1882 : }
1883 28 : TenantState::Loading => allow_transition_from_loading,
1884 336 : TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
1885 374 : })
1886 7 : .await
1887 367 : .expect("cannot drop self.state while on a &self method");
1888 367 :
1889 367 : // we now know we're done activating, let's see whether this task is the winner to transition into Stopping
1890 367 : let mut err = None;
1891 367 : let stopping = self.state.send_if_modified(|current_state| match current_state {
1892 : TenantState::Activating(_) => {
1893 0 : unreachable!("1we ensured above that we're done with activation, and, there is no re-activation")
1894 : }
1895 : TenantState::Attaching => {
1896 3 : if !allow_transition_from_attaching {
1897 0 : unreachable!("2we ensured above that we're done with activation, and, there is no re-activation")
1898 3 : };
1899 3 : *current_state = TenantState::Stopping { progress };
1900 3 : true
1901 : }
1902 : TenantState::Loading => {
1903 28 : if !allow_transition_from_loading {
1904 0 : unreachable!("3we ensured above that we're done with activation, and, there is no re-activation")
1905 28 : };
1906 28 : *current_state = TenantState::Stopping { progress };
1907 28 : true
1908 : }
1909 : TenantState::Active => {
1910 : // FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
1911 : // are created after the transition to Stopping. That's harmless, as the Timelines
1912 : // won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
1913 256 : *current_state = TenantState::Stopping { progress };
1914 256 : // Continue stopping outside the closure. We need to grab timelines.lock()
1915 256 : // and we plan to turn it into a tokio::sync::Mutex in a future patch.
1916 256 : true
1917 : }
1918 77 : TenantState::Broken { reason, .. } => {
1919 77 : info!(
1920 77 : "Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
1921 77 : );
1922 77 : err = Some(SetStoppingError::Broken);
1923 77 : false
1924 : }
1925 3 : TenantState::Stopping { progress } => {
1926 3 : info!("Tenant is already in Stopping state");
1927 3 : err = Some(SetStoppingError::AlreadyStopping(progress.clone()));
1928 3 : false
1929 : }
1930 367 : });
1931 367 : match (stopping, err) {
1932 287 : (true, None) => {} // continue
1933 80 : (false, Some(err)) => return Err(err),
1934 0 : (true, Some(_)) => unreachable!(
1935 0 : "send_if_modified closure must error out if not transitioning to Stopping"
1936 0 : ),
1937 0 : (false, None) => unreachable!(
1938 0 : "send_if_modified closure must return true if transitioning to Stopping"
1939 0 : ),
1940 : }
1941 :
1942 287 : let timelines_accessor = self.timelines.lock().unwrap();
1943 287 : let not_broken_timelines = timelines_accessor
1944 287 : .values()
1945 435 : .filter(|timeline| !timeline.is_broken());
1946 699 : for timeline in not_broken_timelines {
1947 412 : timeline.set_state(TimelineState::Stopping);
1948 412 : }
1949 287 : Ok(())
1950 367 : }
1951 :
1952 : /// Method for tenant::mgr to transition us into Broken state in case of a late failure in
1953 : /// `remove_tenant_from_memory`
1954 : ///
1955 : /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
1956 : ///
1957 : /// In tests, we also use this to set tenants to Broken state on purpose.
1958 75 : pub(crate) async fn set_broken(&self, reason: String) {
1959 75 : let mut rx = self.state.subscribe();
1960 75 :
1961 75 : // The load & attach routines own the tenant state until it has reached `Active`.
1962 75 : // So, wait until it's done.
1963 75 : rx.wait_for(|state| match state {
1964 : TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
1965 0 : info!(
1966 0 : "waiting for {} to turn Active|Broken|Stopping",
1967 0 : <&'static str>::from(state)
1968 0 : );
1969 0 : false
1970 : }
1971 75 : TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
1972 75 : })
1973 0 : .await
1974 75 : .expect("cannot drop self.state while on a &self method");
1975 75 :
1976 75 : // we now know we're done activating, let's see whether this task is the winner to transition into Broken
1977 75 : self.set_broken_no_wait(reason)
1978 75 : }
1979 :
1980 75 : pub(crate) fn set_broken_no_wait(&self, reason: impl Display) {
1981 75 : let reason = reason.to_string();
1982 75 : self.state.send_modify(|current_state| {
1983 75 : match *current_state {
1984 : TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
1985 0 : unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
1986 : }
1987 : TenantState::Active => {
1988 2 : if cfg!(feature = "testing") {
1989 2 : warn!("Changing Active tenant to Broken state, reason: {}", reason);
1990 2 : *current_state = TenantState::broken_from_reason(reason);
1991 : } else {
1992 0 : unreachable!("not allowed to call set_broken on Active tenants in non-testing builds")
1993 : }
1994 : }
1995 : TenantState::Broken { .. } => {
1996 0 : warn!("Tenant is already in Broken state");
1997 : }
1998 : // This is the only "expected" path, any other path is a bug.
1999 : TenantState::Stopping { .. } => {
2000 73 : warn!(
2001 73 : "Marking Stopping tenant as Broken state, reason: {}",
2002 73 : reason
2003 73 : );
2004 73 : *current_state = TenantState::broken_from_reason(reason);
2005 : }
2006 : }
2007 75 : });
2008 75 : }
2009 :
2010 39 : pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
2011 39 : self.state.subscribe()
2012 39 : }
2013 :
2014 5786 : pub(crate) async fn wait_to_become_active(&self) -> Result<(), WaitToBecomeActiveError> {
2015 5786 : let mut receiver = self.state.subscribe();
2016 6505 : loop {
2017 6505 : let current_state = receiver.borrow_and_update().clone();
2018 6505 : match current_state {
2019 : TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
2020 : // in these states, there's a chance that we can reach ::Active
2021 737 : receiver.changed().await.map_err(
2022 719 : |_e: tokio::sync::watch::error::RecvError| {
2023 0 : WaitToBecomeActiveError::TenantDropped {
2024 0 : tenant_id: self.tenant_id,
2025 0 : }
2026 719 : },
2027 719 : )?;
2028 : }
2029 : TenantState::Active { .. } => {
2030 5720 : return Ok(());
2031 : }
2032 : TenantState::Broken { .. } | TenantState::Stopping { .. } => {
2033 : // There's no chance the tenant can transition back into ::Active
2034 66 : return Err(WaitToBecomeActiveError::WillNotBecomeActive {
2035 66 : tenant_id: self.tenant_id,
2036 66 : state: current_state,
2037 66 : });
2038 : }
2039 : }
2040 : }
2041 5786 : }
2042 : }
2043 :
2044 : /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
2045 : /// perform a topological sort, so that the parent of each timeline comes
2046 : /// before the children.
2047 : /// E extracts the ancestor from T
2048 : /// This allows for T to be different. It can be TimelineMetadata, can be Timeline itself, etc.
2049 901 : fn tree_sort_timelines<T, E>(
2050 901 : timelines: HashMap<TimelineId, T>,
2051 901 : extractor: E,
2052 901 : ) -> anyhow::Result<Vec<(TimelineId, T)>>
2053 901 : where
2054 901 : E: Fn(&T) -> Option<TimelineId>,
2055 901 : {
2056 901 : let mut result = Vec::with_capacity(timelines.len());
2057 901 :
2058 901 : let mut now = Vec::with_capacity(timelines.len());
2059 901 : // (ancestor, children)
2060 901 : let mut later: HashMap<TimelineId, Vec<(TimelineId, T)>> =
2061 901 : HashMap::with_capacity(timelines.len());
2062 :
2063 1450 : for (timeline_id, value) in timelines {
2064 549 : if let Some(ancestor_id) = extractor(&value) {
2065 60 : let children = later.entry(ancestor_id).or_default();
2066 60 : children.push((timeline_id, value));
2067 489 : } else {
2068 489 : now.push((timeline_id, value));
2069 489 : }
2070 : }
2071 :
2072 1450 : while let Some((timeline_id, metadata)) = now.pop() {
2073 549 : result.push((timeline_id, metadata));
2074 : // All children of this can be loaded now
2075 549 : if let Some(mut children) = later.remove(&timeline_id) {
2076 49 : now.append(&mut children);
2077 500 : }
2078 : }
2079 :
2080 : // All timelines should be visited now. Unless there were timelines with missing ancestors.
2081 901 : if !later.is_empty() {
2082 0 : for (missing_id, orphan_ids) in later {
2083 0 : for (orphan_id, _) in orphan_ids {
2084 0 : error!("could not load timeline {orphan_id} because its ancestor timeline {missing_id} could not be loaded");
2085 : }
2086 : }
2087 0 : bail!("could not load tenant because some timelines are missing ancestors");
2088 901 : }
2089 901 :
2090 901 : Ok(result)
2091 901 : }
2092 :
2093 : impl Tenant {
2094 80 : pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
2095 80 : *self.tenant_conf.read().unwrap()
2096 80 : }
2097 :
2098 40 : pub fn effective_config(&self) -> TenantConf {
2099 40 : self.tenant_specific_overrides()
2100 40 : .merge(self.conf.default_tenant_conf)
2101 40 : }
2102 :
2103 5 : pub fn get_checkpoint_distance(&self) -> u64 {
2104 5 : let tenant_conf = self.tenant_conf.read().unwrap();
2105 5 : tenant_conf
2106 5 : .checkpoint_distance
2107 5 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
2108 5 : }
2109 :
2110 5 : pub fn get_checkpoint_timeout(&self) -> Duration {
2111 5 : let tenant_conf = self.tenant_conf.read().unwrap();
2112 5 : tenant_conf
2113 5 : .checkpoint_timeout
2114 5 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
2115 5 : }
2116 :
2117 5 : pub fn get_compaction_target_size(&self) -> u64 {
2118 5 : let tenant_conf = self.tenant_conf.read().unwrap();
2119 5 : tenant_conf
2120 5 : .compaction_target_size
2121 5 : .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
2122 5 : }
2123 :
2124 845 : pub fn get_compaction_period(&self) -> Duration {
2125 845 : let tenant_conf = self.tenant_conf.read().unwrap();
2126 845 : tenant_conf
2127 845 : .compaction_period
2128 845 : .unwrap_or(self.conf.default_tenant_conf.compaction_period)
2129 845 : }
2130 :
2131 5 : pub fn get_compaction_threshold(&self) -> usize {
2132 5 : let tenant_conf = self.tenant_conf.read().unwrap();
2133 5 : tenant_conf
2134 5 : .compaction_threshold
2135 5 : .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
2136 5 : }
2137 :
2138 396 : pub fn get_gc_horizon(&self) -> u64 {
2139 396 : let tenant_conf = self.tenant_conf.read().unwrap();
2140 396 : tenant_conf
2141 396 : .gc_horizon
2142 396 : .unwrap_or(self.conf.default_tenant_conf.gc_horizon)
2143 396 : }
2144 :
2145 748 : pub fn get_gc_period(&self) -> Duration {
2146 748 : let tenant_conf = self.tenant_conf.read().unwrap();
2147 748 : tenant_conf
2148 748 : .gc_period
2149 748 : .unwrap_or(self.conf.default_tenant_conf.gc_period)
2150 748 : }
2151 :
2152 5 : pub fn get_image_creation_threshold(&self) -> usize {
2153 5 : let tenant_conf = self.tenant_conf.read().unwrap();
2154 5 : tenant_conf
2155 5 : .image_creation_threshold
2156 5 : .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
2157 5 : }
2158 :
2159 618 : pub fn get_pitr_interval(&self) -> Duration {
2160 618 : let tenant_conf = self.tenant_conf.read().unwrap();
2161 618 : tenant_conf
2162 618 : .pitr_interval
2163 618 : .unwrap_or(self.conf.default_tenant_conf.pitr_interval)
2164 618 : }
2165 :
2166 4553 : pub fn get_trace_read_requests(&self) -> bool {
2167 4553 : let tenant_conf = self.tenant_conf.read().unwrap();
2168 4553 : tenant_conf
2169 4553 : .trace_read_requests
2170 4553 : .unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
2171 4553 : }
2172 :
2173 13 : pub fn get_min_resident_size_override(&self) -> Option<u64> {
2174 13 : let tenant_conf = self.tenant_conf.read().unwrap();
2175 13 : tenant_conf
2176 13 : .min_resident_size_override
2177 13 : .or(self.conf.default_tenant_conf.min_resident_size_override)
2178 13 : }
2179 :
2180 27 : pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
2181 27 : *self.tenant_conf.write().unwrap() = new_tenant_conf;
2182 27 : // Don't hold self.timelines.lock() during the notifies.
2183 27 : // There's no risk of deadlock right now, but there could be if we consolidate
2184 27 : // mutexes in struct Timeline in the future.
2185 27 : let timelines = self.list_timelines();
2186 56 : for timeline in timelines {
2187 29 : timeline.tenant_conf_updated();
2188 29 : }
2189 27 : }
2190 :
2191 : /// Helper function to create a new Timeline struct.
2192 : ///
2193 : /// The returned Timeline is in Loading state. The caller is responsible for
2194 : /// initializing any on-disk state, and for inserting the Timeline to the 'timelines'
2195 : /// map.
2196 : ///
2197 : /// `validate_ancestor == false` is used when a timeline is created for deletion
2198 : /// and we might not have the ancestor present anymore which is fine for to be
2199 : /// deleted timelines.
2200 1394 : fn create_timeline_struct(
2201 1394 : &self,
2202 1394 : new_timeline_id: TimelineId,
2203 1394 : new_metadata: &TimelineMetadata,
2204 1394 : ancestor: Option<Arc<Timeline>>,
2205 1394 : resources: TimelineResources,
2206 1394 : init_order: Option<&InitializationOrder>,
2207 1394 : cause: CreateTimelineCause,
2208 1394 : ) -> anyhow::Result<Arc<Timeline>> {
2209 1394 : let state = match cause {
2210 : CreateTimelineCause::Load => {
2211 1368 : let ancestor_id = new_metadata.ancestor_timeline();
2212 1368 : anyhow::ensure!(
2213 1368 : ancestor_id == ancestor.as_ref().map(|t| t.timeline_id),
2214 0 : "Timeline's {new_timeline_id} ancestor {ancestor_id:?} was not found"
2215 : );
2216 1368 : TimelineState::Loading
2217 : }
2218 26 : CreateTimelineCause::Delete => TimelineState::Stopping,
2219 : };
2220 :
2221 1394 : let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start);
2222 1394 : let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt);
2223 1394 :
2224 1394 : let pg_version = new_metadata.pg_version();
2225 1394 :
2226 1394 : let timeline = Timeline::new(
2227 1394 : self.conf,
2228 1394 : Arc::clone(&self.tenant_conf),
2229 1394 : new_metadata,
2230 1394 : ancestor,
2231 1394 : new_timeline_id,
2232 1394 : self.tenant_id,
2233 1394 : self.generation,
2234 1394 : Arc::clone(&self.walredo_mgr),
2235 1394 : resources,
2236 1394 : pg_version,
2237 1394 : initial_logical_size_can_start.cloned(),
2238 1394 : initial_logical_size_attempt.cloned().flatten(),
2239 1394 : state,
2240 1394 : );
2241 1394 :
2242 1394 : Ok(timeline)
2243 1394 : }
2244 :
2245 779 : fn new(
2246 779 : state: TenantState,
2247 779 : conf: &'static PageServerConf,
2248 779 : tenant_conf: TenantConfOpt,
2249 779 : walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
2250 779 : tenant_id: TenantId,
2251 779 : generation: Generation,
2252 779 : remote_storage: Option<GenericRemoteStorage>,
2253 779 : ) -> Tenant {
2254 779 : let (state, mut rx) = watch::channel(state);
2255 779 :
2256 779 : tokio::spawn(async move {
2257 779 : let tid = tenant_id.to_string();
2258 779 :
2259 2238 : fn inspect_state(state: &TenantState) -> ([&'static str; 1], bool) {
2260 2238 : ([state.into()], matches!(state, TenantState::Broken { .. }))
2261 2238 : }
2262 779 :
2263 779 : let mut tuple = inspect_state(&rx.borrow_and_update());
2264 779 :
2265 779 : let is_broken = tuple.1;
2266 779 : let mut counted_broken = if !is_broken {
2267 : // the tenant might be ignored and reloaded, so first remove any previous set
2268 : // element. it most likely has already been scraped, as these are manual operations
2269 : // right now. most likely we will add it back very soon.
2270 779 : drop(crate::metrics::BROKEN_TENANTS_SET.remove_label_values(&[&tid]));
2271 779 : false
2272 : } else {
2273 : // add the id to the set right away, there should not be any updates on the channel
2274 : // after
2275 0 : crate::metrics::BROKEN_TENANTS_SET
2276 0 : .with_label_values(&[&tid])
2277 0 : .set(1);
2278 0 : true
2279 : };
2280 :
2281 2238 : loop {
2282 2238 : let labels = &tuple.0;
2283 2238 : let current = TENANT_STATE_METRIC.with_label_values(labels);
2284 2238 : current.inc();
2285 2238 :
2286 2238 : if rx.changed().await.is_err() {
2287 : // tenant has been dropped; decrement the counter because a tenant with that
2288 : // state is no longer in tenant map, but allow any broken set item to exist
2289 : // still.
2290 128 : current.dec();
2291 128 : break;
2292 1459 : }
2293 1459 :
2294 1459 : current.dec();
2295 1459 : tuple = inspect_state(&rx.borrow_and_update());
2296 1459 :
2297 1459 : let is_broken = tuple.1;
2298 1459 : if is_broken && !counted_broken {
2299 85 : counted_broken = true;
2300 85 : // insert the tenant_id (back) into the set
2301 85 : crate::metrics::BROKEN_TENANTS_SET
2302 85 : .with_label_values(&[&tid])
2303 85 : .inc();
2304 1374 : }
2305 : }
2306 779 : });
2307 779 :
2308 779 : Tenant {
2309 779 : tenant_id,
2310 779 : generation,
2311 779 : conf,
2312 779 : // using now here is good enough approximation to catch tenants with really long
2313 779 : // activation times.
2314 779 : loading_started_at: Instant::now(),
2315 779 : tenant_conf: Arc::new(RwLock::new(tenant_conf)),
2316 779 : timelines: Mutex::new(HashMap::new()),
2317 779 : gc_cs: tokio::sync::Mutex::new(()),
2318 779 : walredo_mgr,
2319 779 : remote_storage,
2320 779 : state,
2321 779 : cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
2322 779 : cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
2323 779 : eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
2324 779 : delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
2325 779 : }
2326 779 : }
2327 :
2328 : /// Locate and load config
2329 739 : pub(super) fn load_tenant_config(
2330 739 : conf: &'static PageServerConf,
2331 739 : tenant_id: &TenantId,
2332 739 : ) -> anyhow::Result<TenantConfOpt> {
2333 739 : let target_config_path = conf.tenant_config_path(tenant_id);
2334 739 : let target_config_display = target_config_path.display();
2335 739 :
2336 739 : info!("loading tenantconf from {target_config_display}");
2337 :
2338 : // FIXME If the config file is not found, assume that we're attaching
2339 : // a detached tenant and config is passed via attach command.
2340 : // https://github.com/neondatabase/neon/issues/1555
2341 : // OR: we're loading after incomplete deletion that managed to remove config.
2342 739 : if !target_config_path.exists() {
2343 6 : info!("tenant config not found in {target_config_display}");
2344 6 : return Ok(TenantConfOpt::default());
2345 733 : }
2346 :
2347 : // load and parse file
2348 733 : let config = fs::read_to_string(&target_config_path).with_context(|| {
2349 0 : format!("Failed to load config from path '{target_config_display}'")
2350 733 : })?;
2351 :
2352 733 : let toml = config.parse::<toml_edit::Document>().with_context(|| {
2353 0 : format!("Failed to parse config from file '{target_config_display}' as toml file")
2354 733 : })?;
2355 :
2356 733 : let mut tenant_conf = TenantConfOpt::default();
2357 733 : for (key, item) in toml.iter() {
2358 733 : match key {
2359 733 : "tenant_config" => {
2360 733 : tenant_conf = PageServerConf::parse_toml_tenant_conf(item).with_context(|| {
2361 0 : format!("Failed to parse config from file '{target_config_display}' as pageserver config")
2362 733 : })?;
2363 : }
2364 0 : _ => bail!("config file {target_config_display} has unrecognized pageserver option '{key}'"),
2365 :
2366 : }
2367 : }
2368 :
2369 733 : Ok(tenant_conf)
2370 739 : }
2371 :
2372 1638 : #[tracing::instrument(skip_all, fields(%tenant_id))]
2373 : pub(super) async fn persist_tenant_config(
2374 : tenant_id: &TenantId,
2375 : target_config_path: &Path,
2376 : tenant_conf: TenantConfOpt,
2377 : ) -> anyhow::Result<()> {
2378 : // imitate a try-block with a closure
2379 546 : info!("persisting tenantconf to {}", target_config_path.display());
2380 :
2381 : let mut conf_content = r#"# This file contains a specific per-tenant's config.
2382 : # It is read in case of pageserver restart.
2383 :
2384 : [tenant_config]
2385 : "#
2386 : .to_string();
2387 :
2388 : // Convert the config to a toml file.
2389 : conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
2390 :
2391 : let conf_content = conf_content.as_bytes();
2392 :
2393 : let temp_path = path_with_suffix_extension(target_config_path, TEMP_FILE_SUFFIX);
2394 : VirtualFile::crashsafe_overwrite(target_config_path, &temp_path, conf_content)
2395 : .await
2396 0 : .with_context(|| {
2397 0 : format!(
2398 0 : "write tenant {tenant_id} config to {}",
2399 0 : target_config_path.display()
2400 0 : )
2401 0 : })?;
2402 : Ok(())
2403 : }
2404 :
2405 : //
2406 : // How garbage collection works:
2407 : //
2408 : // +--bar------------->
2409 : // /
2410 : // +----+-----foo---------------->
2411 : // /
2412 : // ----main--+-------------------------->
2413 : // \
2414 : // +-----baz-------->
2415 : //
2416 : //
2417 : // 1. Grab 'gc_cs' mutex to prevent new timelines from being created while Timeline's
2418 : // `gc_infos` are being refreshed
2419 : // 2. Scan collected timelines, and on each timeline, make note of the
2420 : // all the points where other timelines have been branched off.
2421 : // We will refrain from removing page versions at those LSNs.
2422 : // 3. For each timeline, scan all layer files on the timeline.
2423 : // Remove all files for which a newer file exists and which
2424 : // don't cover any branch point LSNs.
2425 : //
2426 : // TODO:
2427 : // - if a relation has a non-incremental persistent layer on a child branch, then we
2428 : // don't need to keep that in the parent anymore. But currently
2429 : // we do.
2430 544 : async fn gc_iteration_internal(
2431 544 : &self,
2432 544 : target_timeline_id: Option<TimelineId>,
2433 544 : horizon: u64,
2434 544 : pitr: Duration,
2435 544 : ctx: &RequestContext,
2436 544 : ) -> anyhow::Result<GcResult> {
2437 544 : let mut totals: GcResult = Default::default();
2438 544 : let now = Instant::now();
2439 :
2440 544 : let gc_timelines = self
2441 544 : .refresh_gc_info_internal(target_timeline_id, horizon, pitr, ctx)
2442 212999 : .await?;
2443 :
2444 3 : crate::failpoint_support::sleep_millis_async!(
2445 3 : "gc_iteration_internal_after_getting_gc_timelines"
2446 3 : );
2447 :
2448 : // If there is nothing to GC, we don't want any messages in the INFO log.
2449 543 : if !gc_timelines.is_empty() {
2450 537 : info!("{} timelines need GC", gc_timelines.len());
2451 : } else {
2452 0 : debug!("{} timelines need GC", gc_timelines.len());
2453 : }
2454 :
2455 : // Perform GC for each timeline.
2456 : //
2457 : // Note that we don't hold the GC lock here because we don't want
2458 : // to delay the branch creation task, which requires the GC lock.
2459 : // A timeline GC iteration can be slow because it may need to wait for
2460 : // compaction (both require `layer_removal_cs` lock),
2461 : // but the GC iteration can run concurrently with branch creation.
2462 : //
2463 : // See comments in [`Tenant::branch_timeline`] for more information
2464 : // about why branch creation task can run concurrently with timeline's GC iteration.
2465 1224 : for timeline in gc_timelines {
2466 687 : if task_mgr::is_shutdown_requested() {
2467 : // We were requested to shut down. Stop and return with the progress we
2468 : // made.
2469 1 : break;
2470 686 : }
2471 686 : let result = timeline.gc().await?;
2472 681 : totals += result;
2473 : }
2474 :
2475 538 : totals.elapsed = now.elapsed();
2476 538 : Ok(totals)
2477 539 : }
2478 :
2479 : /// Refreshes the Timeline::gc_info for all timelines, returning the
2480 : /// vector of timelines which have [`Timeline::get_last_record_lsn`] past
2481 : /// [`Tenant::get_gc_horizon`].
2482 : ///
2483 : /// This is usually executed as part of periodic gc, but can now be triggered more often.
2484 73 : pub async fn refresh_gc_info(
2485 73 : &self,
2486 73 : ctx: &RequestContext,
2487 73 : ) -> anyhow::Result<Vec<Arc<Timeline>>> {
2488 73 : // since this method can now be called at different rates than the configured gc loop, it
2489 73 : // might be that these configuration values get applied faster than what it was previously,
2490 73 : // since these were only read from the gc task.
2491 73 : let horizon = self.get_gc_horizon();
2492 73 : let pitr = self.get_pitr_interval();
2493 73 :
2494 73 : // refresh all timelines
2495 73 : let target_timeline_id = None;
2496 73 :
2497 73 : self.refresh_gc_info_internal(target_timeline_id, horizon, pitr, ctx)
2498 12 : .await
2499 73 : }
2500 :
2501 617 : async fn refresh_gc_info_internal(
2502 617 : &self,
2503 617 : target_timeline_id: Option<TimelineId>,
2504 617 : horizon: u64,
2505 617 : pitr: Duration,
2506 617 : ctx: &RequestContext,
2507 617 : ) -> anyhow::Result<Vec<Arc<Timeline>>> {
2508 : // grab mutex to prevent new timelines from being created here.
2509 617 : let gc_cs = self.gc_cs.lock().await;
2510 :
2511 : // Scan all timelines. For each timeline, remember the timeline ID and
2512 : // the branch point where it was created.
2513 616 : let (all_branchpoints, timeline_ids): (BTreeSet<(TimelineId, Lsn)>, _) = {
2514 617 : let timelines = self.timelines.lock().unwrap();
2515 617 : let mut all_branchpoints = BTreeSet::new();
2516 616 : let timeline_ids = {
2517 617 : if let Some(target_timeline_id) = target_timeline_id.as_ref() {
2518 508 : if timelines.get(target_timeline_id).is_none() {
2519 1 : bail!("gc target timeline does not exist")
2520 507 : }
2521 109 : };
2522 :
2523 616 : timelines
2524 616 : .iter()
2525 616 : .map(|(timeline_id, timeline_entry)| {
2526 691 : if let Some(ancestor_timeline_id) =
2527 1306 : &timeline_entry.get_ancestor_timeline_id()
2528 : {
2529 : // If target_timeline is specified, we only need to know branchpoints of its children
2530 691 : if let Some(timeline_id) = target_timeline_id {
2531 500 : if ancestor_timeline_id == &timeline_id {
2532 6 : all_branchpoints.insert((
2533 6 : *ancestor_timeline_id,
2534 6 : timeline_entry.get_ancestor_lsn(),
2535 6 : ));
2536 494 : }
2537 : }
2538 : // Collect branchpoints for all timelines
2539 191 : else {
2540 191 : all_branchpoints.insert((
2541 191 : *ancestor_timeline_id,
2542 191 : timeline_entry.get_ancestor_lsn(),
2543 191 : ));
2544 191 : }
2545 615 : }
2546 :
2547 1306 : *timeline_id
2548 1306 : })
2549 616 : .collect::<Vec<_>>()
2550 616 : };
2551 616 : (all_branchpoints, timeline_ids)
2552 616 : };
2553 616 :
2554 616 : // Ok, we now know all the branch points.
2555 616 : // Update the GC information for each timeline.
2556 616 : let mut gc_timelines = Vec::with_capacity(timeline_ids.len());
2557 1922 : for timeline_id in timeline_ids {
2558 : // Timeline is known to be local and loaded.
2559 1306 : let timeline = self
2560 1306 : .get_timeline(timeline_id, false)
2561 1306 : .with_context(|| format!("Timeline {timeline_id} was not found"))?;
2562 :
2563 : // If target_timeline is specified, ignore all other timelines
2564 1306 : if let Some(target_timeline_id) = target_timeline_id {
2565 1008 : if timeline_id != target_timeline_id {
2566 501 : continue;
2567 507 : }
2568 298 : }
2569 :
2570 805 : if let Some(cutoff) = timeline.get_last_record_lsn().checked_sub(horizon) {
2571 743 : let branchpoints: Vec<Lsn> = all_branchpoints
2572 743 : .range((
2573 743 : Included((timeline_id, Lsn(0))),
2574 743 : Included((timeline_id, Lsn(u64::MAX))),
2575 743 : ))
2576 743 : .map(|&x| x.1)
2577 743 : .collect();
2578 743 : timeline
2579 743 : .update_gc_info(branchpoints, cutoff, pitr, ctx)
2580 213011 : .await?;
2581 :
2582 743 : gc_timelines.push(timeline);
2583 62 : }
2584 : }
2585 616 : drop(gc_cs);
2586 616 : Ok(gc_timelines)
2587 617 : }
2588 :
2589 : /// A substitute for `branch_timeline` for use in unit tests.
2590 : /// The returned timeline will have state value `Active` to make various `anyhow::ensure!()`
2591 : /// calls pass, but, we do not actually call `.activate()` under the hood. So, none of the
2592 : /// timeline background tasks are launched, except the flush loop.
2593 : #[cfg(test)]
2594 107 : async fn branch_timeline_test(
2595 107 : &self,
2596 107 : src_timeline: &Arc<Timeline>,
2597 107 : dst_id: TimelineId,
2598 107 : start_lsn: Option<Lsn>,
2599 107 : ctx: &RequestContext,
2600 107 : ) -> Result<Arc<Timeline>, CreateTimelineError> {
2601 107 : let tl = self
2602 107 : .branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
2603 2 : .await?;
2604 105 : tl.set_state(TimelineState::Active);
2605 105 : Ok(tl)
2606 107 : }
2607 :
2608 : /// Branch an existing timeline.
2609 : ///
2610 : /// The caller is responsible for activating the returned timeline.
2611 259 : async fn branch_timeline(
2612 259 : &self,
2613 259 : src_timeline: &Arc<Timeline>,
2614 259 : dst_id: TimelineId,
2615 259 : start_lsn: Option<Lsn>,
2616 259 : ctx: &RequestContext,
2617 259 : ) -> Result<Arc<Timeline>, CreateTimelineError> {
2618 259 : self.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
2619 2 : .await
2620 259 : }
2621 :
2622 366 : async fn branch_timeline_impl(
2623 366 : &self,
2624 366 : src_timeline: &Arc<Timeline>,
2625 366 : dst_id: TimelineId,
2626 366 : start_lsn: Option<Lsn>,
2627 366 : _ctx: &RequestContext,
2628 366 : ) -> Result<Arc<Timeline>, CreateTimelineError> {
2629 366 : let src_id = src_timeline.timeline_id;
2630 366 :
2631 366 : // If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
2632 366 : let start_lsn = start_lsn.unwrap_or_else(|| {
2633 226 : let lsn = src_timeline.get_last_record_lsn();
2634 226 : info!("branching timeline {dst_id} from timeline {src_id} at last record LSN: {lsn}");
2635 226 : lsn
2636 366 : });
2637 :
2638 : // First acquire the GC lock so that another task cannot advance the GC
2639 : // cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
2640 : // creating the branch.
2641 366 : let _gc_cs = self.gc_cs.lock().await;
2642 :
2643 : // Create a placeholder for the new branch. This will error
2644 : // out if the new timeline ID is already in use.
2645 366 : let timeline_uninit_mark = {
2646 366 : let timelines = self.timelines.lock().unwrap();
2647 366 : self.create_timeline_uninit_mark(dst_id, &timelines)?
2648 : };
2649 :
2650 : // Ensure that `start_lsn` is valid, i.e. the LSN is within the PITR
2651 : // horizon on the source timeline
2652 : //
2653 : // We check it against both the planned GC cutoff stored in 'gc_info',
2654 : // and the 'latest_gc_cutoff' of the last GC that was performed. The
2655 : // planned GC cutoff in 'gc_info' is normally larger than
2656 : // 'latest_gc_cutoff_lsn', but beware of corner cases like if you just
2657 : // changed the GC settings for the tenant to make the PITR window
2658 : // larger, but some of the data was already removed by an earlier GC
2659 : // iteration.
2660 :
2661 : // check against last actual 'latest_gc_cutoff' first
2662 366 : let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
2663 366 : src_timeline
2664 366 : .check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
2665 366 : .context(format!(
2666 366 : "invalid branch start lsn: less than latest GC cutoff {}",
2667 366 : *latest_gc_cutoff_lsn,
2668 366 : ))
2669 366 : .map_err(CreateTimelineError::AncestorLsn)?;
2670 :
2671 : // and then the planned GC cutoff
2672 : {
2673 358 : let gc_info = src_timeline.gc_info.read().unwrap();
2674 358 : let cutoff = min(gc_info.pitr_cutoff, gc_info.horizon_cutoff);
2675 358 : if start_lsn < cutoff {
2676 0 : return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
2677 0 : "invalid branch start lsn: less than planned GC cutoff {cutoff}"
2678 0 : )));
2679 358 : }
2680 358 : }
2681 358 :
2682 358 : //
2683 358 : // The branch point is valid, and we are still holding the 'gc_cs' lock
2684 358 : // so that GC cannot advance the GC cutoff until we are finished.
2685 358 : // Proceed with the branch creation.
2686 358 : //
2687 358 :
2688 358 : // Determine prev-LSN for the new timeline. We can only determine it if
2689 358 : // the timeline was branched at the current end of the source timeline.
2690 358 : let RecordLsn {
2691 358 : last: src_last,
2692 358 : prev: src_prev,
2693 358 : } = src_timeline.get_last_record_rlsn();
2694 358 : let dst_prev = if src_last == start_lsn {
2695 331 : Some(src_prev)
2696 : } else {
2697 27 : None
2698 : };
2699 :
2700 : // Create the metadata file, noting the ancestor of the new timeline.
2701 : // There is initially no data in it, but all the read-calls know to look
2702 : // into the ancestor.
2703 358 : let metadata = TimelineMetadata::new(
2704 358 : start_lsn,
2705 358 : dst_prev,
2706 358 : Some(src_id),
2707 358 : start_lsn,
2708 358 : *src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
2709 358 : src_timeline.initdb_lsn,
2710 358 : src_timeline.pg_version,
2711 358 : );
2712 :
2713 358 : let uninitialized_timeline = self
2714 358 : .prepare_new_timeline(
2715 358 : dst_id,
2716 358 : &metadata,
2717 358 : timeline_uninit_mark,
2718 358 : start_lsn + 1,
2719 358 : Some(Arc::clone(src_timeline)),
2720 358 : )
2721 0 : .await?;
2722 :
2723 358 : let new_timeline = uninitialized_timeline.finish_creation()?;
2724 :
2725 : // Root timeline gets its layers during creation and uploads them along with the metadata.
2726 : // A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
2727 : // We still need to upload its metadata eagerly: if other nodes `attach` the tenant and miss this timeline, their GC
2728 : // could get incorrect information and remove more layers, than needed.
2729 : // See also https://github.com/neondatabase/neon/issues/3865
2730 358 : if let Some(remote_client) = new_timeline.remote_client.as_ref() {
2731 163 : remote_client
2732 163 : .schedule_index_upload_for_metadata_update(&metadata)
2733 163 : .context("branch initial metadata upload")?;
2734 195 : }
2735 :
2736 358 : info!("branched timeline {dst_id} from {src_id} at {start_lsn}");
2737 :
2738 358 : Ok(new_timeline)
2739 366 : }
2740 :
2741 : /// - run initdb to init temporary instance and get bootstrap data
2742 : /// - after initialization complete, remove the temp dir.
2743 : ///
2744 : /// The caller is responsible for activating the returned timeline.
2745 644 : async fn bootstrap_timeline(
2746 644 : &self,
2747 644 : timeline_id: TimelineId,
2748 644 : pg_version: u32,
2749 644 : ctx: &RequestContext,
2750 644 : ) -> anyhow::Result<Arc<Timeline>> {
2751 644 : let timeline_uninit_mark = {
2752 644 : let timelines = self.timelines.lock().unwrap();
2753 644 : self.create_timeline_uninit_mark(timeline_id, &timelines)?
2754 : };
2755 : // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
2756 : // temporary directory for basebackup files for the given timeline.
2757 644 : let initdb_path = path_with_suffix_extension(
2758 644 : self.conf
2759 644 : .timelines_path(&self.tenant_id)
2760 644 : .join(format!("basebackup-{timeline_id}")),
2761 644 : TEMP_FILE_SUFFIX,
2762 644 : );
2763 644 :
2764 644 : // an uninit mark was placed before, nothing else can access this timeline files
2765 644 : // current initdb was not run yet, so remove whatever was left from the previous runs
2766 644 : if initdb_path.exists() {
2767 0 : fs::remove_dir_all(&initdb_path).with_context(|| {
2768 0 : format!(
2769 0 : "Failed to remove already existing initdb directory: {}",
2770 0 : initdb_path.display()
2771 0 : )
2772 0 : })?;
2773 644 : }
2774 : // Init temporarily repo to get bootstrap data, this creates a directory in the `initdb_path` path
2775 644 : run_initdb(self.conf, &initdb_path, pg_version)?;
2776 : // this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
2777 644 : scopeguard::defer! {
2778 644 : if let Err(e) = fs::remove_dir_all(&initdb_path) {
2779 : // this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call
2780 0 : error!("Failed to remove temporary initdb directory '{}': {}", initdb_path.display(), e);
2781 644 : }
2782 : }
2783 644 : let pgdata_path = &initdb_path;
2784 644 : let pgdata_lsn = import_datadir::get_lsn_from_controlfile(pgdata_path)?.align();
2785 644 :
2786 644 : // Import the contents of the data directory at the initial checkpoint
2787 644 : // LSN, and any WAL after that.
2788 644 : // Initdb lsn will be equal to last_record_lsn which will be set after import.
2789 644 : // Because we know it upfront avoid having an option or dummy zero value by passing it to the metadata.
2790 644 : let new_metadata = TimelineMetadata::new(
2791 644 : Lsn(0),
2792 644 : None,
2793 644 : None,
2794 644 : Lsn(0),
2795 644 : pgdata_lsn,
2796 644 : pgdata_lsn,
2797 644 : pg_version,
2798 644 : );
2799 644 : let raw_timeline = self
2800 644 : .prepare_new_timeline(
2801 644 : timeline_id,
2802 644 : &new_metadata,
2803 644 : timeline_uninit_mark,
2804 644 : pgdata_lsn,
2805 644 : None,
2806 644 : )
2807 1 : .await?;
2808 :
2809 643 : let tenant_id = raw_timeline.owning_tenant.tenant_id;
2810 643 : let unfinished_timeline = raw_timeline.raw_timeline()?;
2811 :
2812 643 : import_datadir::import_timeline_from_postgres_datadir(
2813 643 : unfinished_timeline,
2814 643 : pgdata_path,
2815 643 : pgdata_lsn,
2816 643 : ctx,
2817 643 : )
2818 3210259 : .await
2819 643 : .with_context(|| {
2820 0 : format!("Failed to import pgdatadir for timeline {tenant_id}/{timeline_id}")
2821 643 : })?;
2822 :
2823 : // Flush the new layer files to disk, before we make the timeline as available to
2824 : // the outside world.
2825 : //
2826 : // Flush loop needs to be spawned in order to be able to flush.
2827 643 : unfinished_timeline.maybe_spawn_flush_loop();
2828 643 :
2829 643 : fail::fail_point!("before-checkpoint-new-timeline", |_| {
2830 1 : anyhow::bail!("failpoint before-checkpoint-new-timeline");
2831 643 : });
2832 :
2833 642 : unfinished_timeline
2834 642 : .freeze_and_flush()
2835 642 : .await
2836 642 : .with_context(|| {
2837 0 : format!(
2838 0 : "Failed to flush after pgdatadir import for timeline {tenant_id}/{timeline_id}"
2839 0 : )
2840 642 : })?;
2841 :
2842 : // All done!
2843 642 : let timeline = raw_timeline.finish_creation()?;
2844 :
2845 642 : info!(
2846 642 : "created root timeline {} timeline.lsn {}",
2847 642 : timeline_id,
2848 642 : timeline.get_last_record_lsn()
2849 642 : );
2850 :
2851 642 : Ok(timeline)
2852 644 : }
2853 :
2854 : /// Call this before constructing a timeline, to build its required structures
2855 1341 : fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
2856 1341 : let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
2857 710 : let remote_client = RemoteTimelineClient::new(
2858 710 : remote_storage.clone(),
2859 710 : self.conf,
2860 710 : self.tenant_id,
2861 710 : timeline_id,
2862 710 : self.generation,
2863 710 : );
2864 710 : Some(remote_client)
2865 : } else {
2866 631 : None
2867 : };
2868 :
2869 1341 : TimelineResources { remote_client }
2870 1341 : }
2871 :
2872 : /// Creates intermediate timeline structure and its files.
2873 : ///
2874 : /// An empty layer map is initialized, and new data and WAL can be imported starting
2875 : /// at 'disk_consistent_lsn'. After any initial data has been imported, call
2876 : /// `finish_creation` to insert the Timeline into the timelines map and to remove the
2877 : /// uninit mark file.
2878 1042 : async fn prepare_new_timeline(
2879 1042 : &self,
2880 1042 : new_timeline_id: TimelineId,
2881 1042 : new_metadata: &TimelineMetadata,
2882 1042 : uninit_mark: TimelineUninitMark,
2883 1042 : start_lsn: Lsn,
2884 1042 : ancestor: Option<Arc<Timeline>>,
2885 1042 : ) -> anyhow::Result<UninitializedTimeline> {
2886 1042 : let tenant_id = self.tenant_id;
2887 1042 :
2888 1042 : let resources = self.build_timeline_resources(new_timeline_id);
2889 1042 : if let Some(remote_client) = &resources.remote_client {
2890 563 : remote_client.init_upload_queue_for_empty_remote(new_metadata)?;
2891 479 : }
2892 :
2893 1042 : let timeline_struct = self
2894 1042 : .create_timeline_struct(
2895 1042 : new_timeline_id,
2896 1042 : new_metadata,
2897 1042 : ancestor,
2898 1042 : resources,
2899 1042 : None,
2900 1042 : CreateTimelineCause::Load,
2901 1042 : )
2902 1042 : .context("Failed to create timeline data structure")?;
2903 :
2904 1042 : timeline_struct.init_empty_layer_map(start_lsn);
2905 :
2906 1042 : if let Err(e) = self
2907 1042 : .create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
2908 0 : .await
2909 : {
2910 1 : error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}");
2911 1 : cleanup_timeline_directory(uninit_mark);
2912 1 : return Err(e);
2913 1041 : }
2914 :
2915 0 : debug!("Successfully created initial files for timeline {tenant_id}/{new_timeline_id}");
2916 :
2917 1041 : Ok(UninitializedTimeline::new(
2918 1041 : self,
2919 1041 : new_timeline_id,
2920 1041 : Some((timeline_struct, uninit_mark)),
2921 1041 : ))
2922 1042 : }
2923 :
2924 1042 : async fn create_timeline_files(
2925 1042 : &self,
2926 1042 : timeline_path: &Path,
2927 1042 : new_timeline_id: &TimelineId,
2928 1042 : new_metadata: &TimelineMetadata,
2929 1042 : ) -> anyhow::Result<()> {
2930 1042 : crashsafe::create_dir(timeline_path).context("Failed to create timeline directory")?;
2931 :
2932 1042 : fail::fail_point!("after-timeline-uninit-mark-creation", |_| {
2933 1 : anyhow::bail!("failpoint after-timeline-uninit-mark-creation");
2934 1042 : });
2935 :
2936 1041 : save_metadata(self.conf, &self.tenant_id, new_timeline_id, new_metadata)
2937 0 : .await
2938 1041 : .context("Failed to create timeline metadata")?;
2939 1041 : Ok(())
2940 1042 : }
2941 :
2942 : /// Attempts to create an uninit mark file for the timeline initialization.
2943 : /// Bails, if the timeline is already loaded into the memory (i.e. initialized before), or the uninit mark file already exists.
2944 : ///
2945 : /// This way, we need to hold the timelines lock only for small amount of time during the mark check/creation per timeline init.
2946 1051 : fn create_timeline_uninit_mark(
2947 1051 : &self,
2948 1051 : timeline_id: TimelineId,
2949 1051 : timelines: &MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
2950 1051 : ) -> anyhow::Result<TimelineUninitMark> {
2951 1051 : let tenant_id = self.tenant_id;
2952 1051 :
2953 1051 : anyhow::ensure!(
2954 1051 : timelines.get(&timeline_id).is_none(),
2955 1 : "Timeline {tenant_id}/{timeline_id} already exists in pageserver's memory"
2956 : );
2957 1050 : let timeline_path = self.conf.timeline_path(&tenant_id, &timeline_id);
2958 1050 : anyhow::ensure!(
2959 1050 : !timeline_path.exists(),
2960 0 : "Timeline {} already exists, cannot create its uninit mark file",
2961 0 : timeline_path.display()
2962 : );
2963 :
2964 1050 : let uninit_mark_path = self
2965 1050 : .conf
2966 1050 : .timeline_uninit_mark_file_path(tenant_id, timeline_id);
2967 1050 : fs::File::create(&uninit_mark_path)
2968 1050 : .context("Failed to create uninit mark file")
2969 1050 : .and_then(|_| {
2970 1050 : crashsafe::fsync_file_and_parent(&uninit_mark_path)
2971 1050 : .context("Failed to fsync uninit mark file")
2972 1050 : })
2973 1050 : .with_context(|| {
2974 0 : format!("Failed to crate uninit mark for timeline {tenant_id}/{timeline_id}")
2975 1050 : })?;
2976 :
2977 1050 : let uninit_mark = TimelineUninitMark::new(uninit_mark_path, timeline_path);
2978 1050 :
2979 1050 : Ok(uninit_mark)
2980 1051 : }
2981 :
2982 : /// Gathers inputs from all of the timelines to produce a sizing model input.
2983 : ///
2984 : /// Future is cancellation safe. Only one calculation can be running at once per tenant.
2985 268 : #[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
2986 : pub async fn gather_size_inputs(
2987 : &self,
2988 : // `max_retention_period` overrides the cutoff that is used to calculate the size
2989 : // (only if it is shorter than the real cutoff).
2990 : max_retention_period: Option<u64>,
2991 : cause: LogicalSizeCalculationCause,
2992 : ctx: &RequestContext,
2993 : ) -> anyhow::Result<size::ModelInputs> {
2994 : let logical_sizes_at_once = self
2995 : .conf
2996 : .concurrent_tenant_size_logical_size_queries
2997 : .inner();
2998 :
2999 : // TODO: Having a single mutex block concurrent reads is not great for performance.
3000 : //
3001 : // But the only case where we need to run multiple of these at once is when we
3002 : // request a size for a tenant manually via API, while another background calculation
3003 : // is in progress (which is not a common case).
3004 : //
3005 : // See more for on the issue #2748 condenced out of the initial PR review.
3006 : let mut shared_cache = self.cached_logical_sizes.lock().await;
3007 :
3008 : size::gather_inputs(
3009 : self,
3010 : logical_sizes_at_once,
3011 : max_retention_period,
3012 : &mut shared_cache,
3013 : cause,
3014 : ctx,
3015 : )
3016 : .await
3017 : }
3018 :
3019 : /// Calculate synthetic tenant size and cache the result.
3020 : /// This is periodically called by background worker.
3021 : /// result is cached in tenant struct
3022 56 : #[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
3023 : pub async fn calculate_synthetic_size(
3024 : &self,
3025 : cause: LogicalSizeCalculationCause,
3026 : ctx: &RequestContext,
3027 : ) -> anyhow::Result<u64> {
3028 : let inputs = self.gather_size_inputs(None, cause, ctx).await?;
3029 :
3030 : let size = inputs.calculate()?;
3031 :
3032 : self.set_cached_synthetic_size(size);
3033 :
3034 : Ok(size)
3035 : }
3036 :
3037 : /// Cache given synthetic size and update the metric value
3038 14 : pub fn set_cached_synthetic_size(&self, size: u64) {
3039 14 : self.cached_synthetic_tenant_size
3040 14 : .store(size, Ordering::Relaxed);
3041 14 :
3042 14 : TENANT_SYNTHETIC_SIZE_METRIC
3043 14 : .get_metric_with_label_values(&[&self.tenant_id.to_string()])
3044 14 : .unwrap()
3045 14 : .set(size);
3046 14 : }
3047 :
3048 40 : pub fn cached_synthetic_size(&self) -> u64 {
3049 40 : self.cached_synthetic_tenant_size.load(Ordering::Relaxed)
3050 40 : }
3051 : }
3052 :
3053 : fn remove_timeline_and_uninit_mark(timeline_dir: &Path, uninit_mark: &Path) -> anyhow::Result<()> {
3054 1 : fs::remove_dir_all(timeline_dir)
3055 1 : .or_else(|e| {
3056 0 : if e.kind() == std::io::ErrorKind::NotFound {
3057 : // we can leave the uninit mark without a timeline dir,
3058 : // just remove the mark then
3059 0 : Ok(())
3060 : } else {
3061 0 : Err(e)
3062 : }
3063 1 : })
3064 1 : .with_context(|| {
3065 0 : format!(
3066 0 : "Failed to remove unit marked timeline directory {}",
3067 0 : timeline_dir.display()
3068 0 : )
3069 1 : })?;
3070 1 : fs::remove_file(uninit_mark).with_context(|| {
3071 0 : format!(
3072 0 : "Failed to remove timeline uninit mark file {}",
3073 0 : uninit_mark.display()
3074 0 : )
3075 1 : })?;
3076 :
3077 1 : Ok(())
3078 1 : }
3079 :
3080 : pub(crate) enum CreateTenantFilesMode {
3081 : Create,
3082 : Attach,
3083 : }
3084 :
3085 521 : pub(crate) async fn create_tenant_files(
3086 521 : conf: &'static PageServerConf,
3087 521 : tenant_conf: TenantConfOpt,
3088 521 : tenant_id: &TenantId,
3089 521 : mode: CreateTenantFilesMode,
3090 521 : ) -> anyhow::Result<PathBuf> {
3091 521 : let target_tenant_directory = conf.tenant_path(tenant_id);
3092 521 : anyhow::ensure!(
3093 521 : !target_tenant_directory
3094 521 : .try_exists()
3095 521 : .context("check existence of tenant directory")?,
3096 2 : "tenant directory already exists",
3097 : );
3098 :
3099 519 : let temporary_tenant_dir =
3100 519 : path_with_suffix_extension(&target_tenant_directory, TEMP_FILE_SUFFIX);
3101 0 : debug!(
3102 0 : "Creating temporary directory structure in {}",
3103 0 : temporary_tenant_dir.display()
3104 0 : );
3105 :
3106 : // top-level dir may exist if we are creating it through CLI
3107 519 : crashsafe::create_dir_all(&temporary_tenant_dir).with_context(|| {
3108 0 : format!(
3109 0 : "could not create temporary tenant directory {}",
3110 0 : temporary_tenant_dir.display()
3111 0 : )
3112 519 : })?;
3113 :
3114 519 : let creation_result = try_create_target_tenant_dir(
3115 519 : conf,
3116 519 : tenant_conf,
3117 519 : tenant_id,
3118 519 : mode,
3119 519 : &temporary_tenant_dir,
3120 519 : &target_tenant_directory,
3121 519 : )
3122 0 : .await;
3123 :
3124 519 : if creation_result.is_err() {
3125 1 : error!("Failed to create directory structure for tenant {tenant_id}, cleaning tmp data");
3126 1 : if let Err(e) = fs::remove_dir_all(&temporary_tenant_dir) {
3127 0 : error!("Failed to remove temporary tenant directory {temporary_tenant_dir:?}: {e}")
3128 1 : } else if let Err(e) = crashsafe::fsync(&temporary_tenant_dir) {
3129 1 : error!(
3130 1 : "Failed to fsync removed temporary tenant directory {temporary_tenant_dir:?}: {e}"
3131 1 : )
3132 0 : }
3133 518 : }
3134 :
3135 519 : creation_result?;
3136 :
3137 518 : Ok(target_tenant_directory)
3138 521 : }
3139 :
3140 519 : async fn try_create_target_tenant_dir(
3141 519 : conf: &'static PageServerConf,
3142 519 : tenant_conf: TenantConfOpt,
3143 519 : tenant_id: &TenantId,
3144 519 : mode: CreateTenantFilesMode,
3145 519 : temporary_tenant_dir: &Path,
3146 519 : target_tenant_directory: &Path,
3147 519 : ) -> Result<(), anyhow::Error> {
3148 519 : match mode {
3149 480 : CreateTenantFilesMode::Create => {} // needs no attach marker, writing tenant conf + atomic rename of dir is good enough
3150 : CreateTenantFilesMode::Attach => {
3151 39 : let attach_marker_path = temporary_tenant_dir.join(TENANT_ATTACHING_MARKER_FILENAME);
3152 39 : let file = std::fs::OpenOptions::new()
3153 39 : .create_new(true)
3154 39 : .write(true)
3155 39 : .open(&attach_marker_path)
3156 39 : .with_context(|| {
3157 0 : format!("could not create attach marker file {attach_marker_path:?}")
3158 39 : })?;
3159 39 : file.sync_all().with_context(|| {
3160 0 : format!("could not sync attach marker file: {attach_marker_path:?}")
3161 39 : })?;
3162 : // fsync of the directory in which the file resides comes later in this function
3163 : }
3164 : }
3165 :
3166 519 : let temporary_tenant_timelines_dir = rebase_directory(
3167 519 : &conf.timelines_path(tenant_id),
3168 519 : target_tenant_directory,
3169 519 : temporary_tenant_dir,
3170 519 : )
3171 519 : .with_context(|| format!("resolve tenant {tenant_id} temporary timelines dir"))?;
3172 519 : let temporary_tenant_config_path = rebase_directory(
3173 519 : &conf.tenant_config_path(tenant_id),
3174 519 : target_tenant_directory,
3175 519 : temporary_tenant_dir,
3176 519 : )
3177 519 : .with_context(|| format!("resolve tenant {tenant_id} temporary config path"))?;
3178 :
3179 519 : Tenant::persist_tenant_config(tenant_id, &temporary_tenant_config_path, tenant_conf).await?;
3180 :
3181 519 : crashsafe::create_dir(&temporary_tenant_timelines_dir).with_context(|| {
3182 0 : format!(
3183 0 : "create tenant {} temporary timelines directory {}",
3184 0 : tenant_id,
3185 0 : temporary_tenant_timelines_dir.display()
3186 0 : )
3187 519 : })?;
3188 519 : fail::fail_point!("tenant-creation-before-tmp-rename", |_| {
3189 1 : anyhow::bail!("failpoint tenant-creation-before-tmp-rename");
3190 519 : });
3191 :
3192 : // Make sure the current tenant directory entries are durable before renaming.
3193 : // Without this, a crash may reorder any of the directory entry creations above.
3194 518 : crashsafe::fsync(temporary_tenant_dir)
3195 518 : .with_context(|| format!("sync temporary tenant directory {temporary_tenant_dir:?}"))?;
3196 :
3197 518 : fs::rename(temporary_tenant_dir, target_tenant_directory).with_context(|| {
3198 0 : format!(
3199 0 : "move tenant {} temporary directory {} into the permanent one {}",
3200 0 : tenant_id,
3201 0 : temporary_tenant_dir.display(),
3202 0 : target_tenant_directory.display()
3203 0 : )
3204 518 : })?;
3205 518 : let target_dir_parent = target_tenant_directory.parent().with_context(|| {
3206 0 : format!(
3207 0 : "get tenant {} dir parent for {}",
3208 0 : tenant_id,
3209 0 : target_tenant_directory.display()
3210 0 : )
3211 518 : })?;
3212 518 : crashsafe::fsync(target_dir_parent).with_context(|| {
3213 0 : format!(
3214 0 : "fsync renamed directory's parent {} for tenant {}",
3215 0 : target_dir_parent.display(),
3216 0 : tenant_id,
3217 0 : )
3218 518 : })?;
3219 :
3220 518 : Ok(())
3221 519 : }
3222 :
3223 1038 : fn rebase_directory(original_path: &Path, base: &Path, new_base: &Path) -> anyhow::Result<PathBuf> {
3224 1038 : let relative_path = original_path.strip_prefix(base).with_context(|| {
3225 0 : format!(
3226 0 : "Failed to strip base prefix '{}' off path '{}'",
3227 0 : base.display(),
3228 0 : original_path.display()
3229 0 : )
3230 1038 : })?;
3231 1038 : Ok(new_base.join(relative_path))
3232 1038 : }
3233 :
3234 : /// Create the cluster temporarily in 'initdbpath' directory inside the repository
3235 : /// to get bootstrap data for timeline initialization.
3236 644 : fn run_initdb(
3237 644 : conf: &'static PageServerConf,
3238 644 : initdb_target_dir: &Path,
3239 644 : pg_version: u32,
3240 644 : ) -> anyhow::Result<()> {
3241 644 : let initdb_bin_path = conf.pg_bin_dir(pg_version)?.join("initdb");
3242 644 : let initdb_lib_dir = conf.pg_lib_dir(pg_version)?;
3243 644 : info!(
3244 644 : "running {} in {}, libdir: {}",
3245 644 : initdb_bin_path.display(),
3246 644 : initdb_target_dir.display(),
3247 644 : initdb_lib_dir.display(),
3248 644 : );
3249 :
3250 644 : let initdb_output = Command::new(&initdb_bin_path)
3251 644 : .args(["-D", &initdb_target_dir.to_string_lossy()])
3252 644 : .args(["-U", &conf.superuser])
3253 644 : .args(["-E", "utf8"])
3254 644 : .arg("--no-instructions")
3255 644 : // This is only used for a temporary installation that is deleted shortly after,
3256 644 : // so no need to fsync it
3257 644 : .arg("--no-sync")
3258 644 : .env_clear()
3259 644 : .env("LD_LIBRARY_PATH", &initdb_lib_dir)
3260 644 : .env("DYLD_LIBRARY_PATH", &initdb_lib_dir)
3261 644 : .stdout(Stdio::null())
3262 644 : .output()
3263 644 : .with_context(|| {
3264 0 : format!(
3265 0 : "failed to execute {} at target dir {}",
3266 0 : initdb_bin_path.display(),
3267 0 : initdb_target_dir.display()
3268 0 : )
3269 644 : })?;
3270 644 : if !initdb_output.status.success() {
3271 0 : bail!(
3272 0 : "initdb failed: '{}'",
3273 0 : String::from_utf8_lossy(&initdb_output.stderr)
3274 0 : );
3275 644 : }
3276 644 :
3277 644 : Ok(())
3278 644 : }
3279 :
3280 : impl Drop for Tenant {
3281 164 : fn drop(&mut self) {
3282 164 : remove_tenant_metrics(&self.tenant_id);
3283 164 : }
3284 : }
3285 : /// Dump contents of a layer file to stdout.
3286 0 : pub async fn dump_layerfile_from_path(
3287 0 : path: &Path,
3288 0 : verbose: bool,
3289 0 : ctx: &RequestContext,
3290 0 : ) -> anyhow::Result<()> {
3291 : use std::os::unix::fs::FileExt;
3292 :
3293 : // All layer files start with a two-byte "magic" value, to identify the kind of
3294 : // file.
3295 0 : let file = File::open(path)?;
3296 0 : let mut header_buf = [0u8; 2];
3297 0 : file.read_exact_at(&mut header_buf, 0)?;
3298 :
3299 0 : match u16::from_be_bytes(header_buf) {
3300 : crate::IMAGE_FILE_MAGIC => {
3301 0 : ImageLayer::new_for_path(path, file)?
3302 0 : .dump(verbose, ctx)
3303 0 : .await?
3304 : }
3305 : crate::DELTA_FILE_MAGIC => {
3306 0 : DeltaLayer::new_for_path(path, file)?
3307 0 : .dump(verbose, ctx)
3308 0 : .await?
3309 : }
3310 0 : magic => bail!("unrecognized magic identifier: {:?}", magic),
3311 : }
3312 :
3313 0 : Ok(())
3314 0 : }
3315 :
3316 : #[cfg(test)]
3317 : pub mod harness {
3318 : use bytes::{Bytes, BytesMut};
3319 : use once_cell::sync::OnceCell;
3320 : use std::sync::Arc;
3321 : use std::{fs, path::PathBuf};
3322 : use utils::logging;
3323 : use utils::lsn::Lsn;
3324 :
3325 : use crate::{
3326 : config::PageServerConf,
3327 : repository::Key,
3328 : tenant::Tenant,
3329 : walrecord::NeonWalRecord,
3330 : walredo::{WalRedoError, WalRedoManager},
3331 : };
3332 :
3333 : use super::*;
3334 : use crate::tenant::config::{TenantConf, TenantConfOpt};
3335 : use hex_literal::hex;
3336 : use utils::id::{TenantId, TimelineId};
3337 :
3338 : pub const TIMELINE_ID: TimelineId =
3339 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
3340 : pub const NEW_TIMELINE_ID: TimelineId =
3341 : TimelineId::from_array(hex!("AA223344556677881122334455667788"));
3342 :
3343 : /// Convenience function to create a page image with given string as the only content
3344 : #[allow(non_snake_case)]
3345 854153 : pub fn TEST_IMG(s: &str) -> Bytes {
3346 854153 : let mut buf = BytesMut::new();
3347 854153 : buf.extend_from_slice(s.as_bytes());
3348 854153 : buf.resize(64, 0);
3349 854153 :
3350 854153 : buf.freeze()
3351 854153 : }
3352 :
3353 : impl From<TenantConf> for TenantConfOpt {
3354 40 : fn from(tenant_conf: TenantConf) -> Self {
3355 40 : Self {
3356 40 : checkpoint_distance: Some(tenant_conf.checkpoint_distance),
3357 40 : checkpoint_timeout: Some(tenant_conf.checkpoint_timeout),
3358 40 : compaction_target_size: Some(tenant_conf.compaction_target_size),
3359 40 : compaction_period: Some(tenant_conf.compaction_period),
3360 40 : compaction_threshold: Some(tenant_conf.compaction_threshold),
3361 40 : gc_horizon: Some(tenant_conf.gc_horizon),
3362 40 : gc_period: Some(tenant_conf.gc_period),
3363 40 : image_creation_threshold: Some(tenant_conf.image_creation_threshold),
3364 40 : pitr_interval: Some(tenant_conf.pitr_interval),
3365 40 : walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
3366 40 : lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
3367 40 : max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
3368 40 : trace_read_requests: Some(tenant_conf.trace_read_requests),
3369 40 : eviction_policy: Some(tenant_conf.eviction_policy),
3370 40 : min_resident_size_override: tenant_conf.min_resident_size_override,
3371 40 : evictions_low_residence_duration_metric_threshold: Some(
3372 40 : tenant_conf.evictions_low_residence_duration_metric_threshold,
3373 40 : ),
3374 40 : gc_feedback: Some(tenant_conf.gc_feedback),
3375 40 : }
3376 40 : }
3377 : }
3378 :
3379 : pub struct TenantHarness {
3380 : pub conf: &'static PageServerConf,
3381 : pub tenant_conf: TenantConf,
3382 : pub tenant_id: TenantId,
3383 : pub generation: Generation,
3384 : remote_storage: GenericRemoteStorage,
3385 : pub remote_fs_dir: PathBuf,
3386 : }
3387 :
3388 : static LOG_HANDLE: OnceCell<()> = OnceCell::new();
3389 :
3390 : impl TenantHarness {
3391 36 : pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
3392 36 : LOG_HANDLE.get_or_init(|| {
3393 1 : logging::init(
3394 1 : logging::LogFormat::Test,
3395 1 : // enable it in case in case the tests exercise code paths that use
3396 1 : // debug_assert_current_span_has_tenant_and_timeline_id
3397 1 : logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
3398 1 : )
3399 1 : .expect("Failed to init test logging")
3400 36 : });
3401 36 :
3402 36 : let repo_dir = PageServerConf::test_repo_dir(test_name);
3403 36 : let _ = fs::remove_dir_all(&repo_dir);
3404 36 : fs::create_dir_all(&repo_dir)?;
3405 :
3406 36 : let conf = PageServerConf::dummy_conf(repo_dir);
3407 36 : // Make a static copy of the config. This can never be free'd, but that's
3408 36 : // OK in a test.
3409 36 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
3410 36 :
3411 36 : // Disable automatic GC and compaction to make the unit tests more deterministic.
3412 36 : // The tests perform them manually if needed.
3413 36 : let tenant_conf = TenantConf {
3414 36 : gc_period: Duration::ZERO,
3415 36 : compaction_period: Duration::ZERO,
3416 36 : ..TenantConf::default()
3417 36 : };
3418 36 :
3419 36 : let tenant_id = TenantId::generate();
3420 36 : fs::create_dir_all(conf.tenant_path(&tenant_id))?;
3421 36 : fs::create_dir_all(conf.timelines_path(&tenant_id))?;
3422 :
3423 : use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
3424 36 : let remote_fs_dir = conf.workdir.join("localfs");
3425 36 : std::fs::create_dir_all(&remote_fs_dir).unwrap();
3426 36 : let config = RemoteStorageConfig {
3427 36 : // TODO: why not remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
3428 36 : max_concurrent_syncs: std::num::NonZeroUsize::new(2_000_000).unwrap(),
3429 36 : // TODO: why not remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
3430 36 : max_sync_errors: std::num::NonZeroU32::new(3_000_000).unwrap(),
3431 36 : storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
3432 36 : };
3433 36 : let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
3434 36 :
3435 36 : Ok(Self {
3436 36 : conf,
3437 36 : tenant_conf,
3438 36 : tenant_id,
3439 36 : generation: Generation::new(0xdeadbeef),
3440 36 : remote_storage,
3441 36 : remote_fs_dir,
3442 36 : })
3443 36 : }
3444 :
3445 37 : pub async fn load(&self) -> (Arc<Tenant>, RequestContext) {
3446 37 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
3447 37 : (
3448 37 : self.try_load(&ctx)
3449 49 : .await
3450 37 : .expect("failed to load test tenant"),
3451 37 : ctx,
3452 37 : )
3453 37 : }
3454 :
3455 40 : pub async fn try_load(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
3456 40 : let walredo_mgr = Arc::new(TestRedoManager);
3457 40 :
3458 40 : let tenant = Arc::new(Tenant::new(
3459 40 : TenantState::Loading,
3460 40 : self.conf,
3461 40 : TenantConfOpt::from(self.tenant_conf),
3462 40 : walredo_mgr,
3463 40 : self.tenant_id,
3464 40 : self.generation,
3465 40 : Some(self.remote_storage.clone()),
3466 40 : ));
3467 40 : tenant
3468 40 : .load(None, ctx)
3469 40 : .instrument(info_span!("try_load", tenant_id=%self.tenant_id))
3470 52 : .await?;
3471 :
3472 : // TODO reuse Tenant::activate (needs broker)
3473 39 : tenant.state.send_replace(TenantState::Active);
3474 39 : for timeline in tenant.timelines.lock().unwrap().values() {
3475 3 : timeline.set_state(TimelineState::Active);
3476 3 : }
3477 39 : Ok(tenant)
3478 40 : }
3479 :
3480 3 : pub fn timeline_path(&self, timeline_id: &TimelineId) -> PathBuf {
3481 3 : self.conf.timeline_path(&self.tenant_id, timeline_id)
3482 3 : }
3483 : }
3484 :
3485 : // Mock WAL redo manager that doesn't do much
3486 : pub struct TestRedoManager;
3487 :
3488 : impl WalRedoManager for TestRedoManager {
3489 0 : fn request_redo(
3490 0 : &self,
3491 0 : key: Key,
3492 0 : lsn: Lsn,
3493 0 : base_img: Option<(Lsn, Bytes)>,
3494 0 : records: Vec<(Lsn, NeonWalRecord)>,
3495 0 : _pg_version: u32,
3496 0 : ) -> Result<Bytes, WalRedoError> {
3497 0 : let s = format!(
3498 0 : "redo for {} to get to {}, with {} and {} records",
3499 0 : key,
3500 0 : lsn,
3501 0 : if base_img.is_some() {
3502 0 : "base image"
3503 : } else {
3504 0 : "no base image"
3505 : },
3506 0 : records.len()
3507 0 : );
3508 0 : println!("{s}");
3509 0 :
3510 0 : Ok(TEST_IMG(&s))
3511 0 : }
3512 : }
3513 : }
3514 :
3515 : #[cfg(test)]
3516 : mod tests {
3517 : use super::*;
3518 : use crate::keyspace::KeySpaceAccum;
3519 : use crate::repository::{Key, Value};
3520 : use crate::tenant::harness::*;
3521 : use crate::DEFAULT_PG_VERSION;
3522 : use crate::METADATA_FILE_NAME;
3523 : use bytes::BytesMut;
3524 : use hex_literal::hex;
3525 : use once_cell::sync::Lazy;
3526 : use rand::{thread_rng, Rng};
3527 : use tokio_util::sync::CancellationToken;
3528 :
3529 : static TEST_KEY: Lazy<Key> =
3530 1 : Lazy::new(|| Key::from_slice(&hex!("112222222233333333444444445500000001")));
3531 :
3532 1 : #[tokio::test]
3533 1 : async fn test_basic() -> anyhow::Result<()> {
3534 1 : let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
3535 1 : let tline = tenant
3536 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
3537 2 : .await?;
3538 :
3539 1 : let writer = tline.writer().await;
3540 1 : writer
3541 1 : .put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
3542 0 : .await?;
3543 1 : writer.finish_write(Lsn(0x10));
3544 1 : drop(writer);
3545 :
3546 1 : let writer = tline.writer().await;
3547 1 : writer
3548 1 : .put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
3549 0 : .await?;
3550 1 : writer.finish_write(Lsn(0x20));
3551 1 : drop(writer);
3552 :
3553 1 : assert_eq!(
3554 1 : tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
3555 1 : TEST_IMG("foo at 0x10")
3556 : );
3557 1 : assert_eq!(
3558 1 : tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
3559 1 : TEST_IMG("foo at 0x10")
3560 : );
3561 1 : assert_eq!(
3562 1 : tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
3563 1 : TEST_IMG("foo at 0x20")
3564 : );
3565 :
3566 1 : Ok(())
3567 : }
3568 :
3569 1 : #[tokio::test]
3570 1 : async fn no_duplicate_timelines() -> anyhow::Result<()> {
3571 1 : let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
3572 1 : .load()
3573 1 : .await;
3574 1 : let _ = tenant
3575 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
3576 2 : .await?;
3577 :
3578 1 : match tenant
3579 1 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
3580 0 : .await
3581 : {
3582 0 : Ok(_) => panic!("duplicate timeline creation should fail"),
3583 1 : Err(e) => assert_eq!(
3584 1 : e.to_string(),
3585 1 : format!(
3586 1 : "Timeline {}/{} already exists in pageserver's memory",
3587 1 : tenant.tenant_id, TIMELINE_ID
3588 1 : )
3589 1 : ),
3590 : }
3591 :
3592 1 : Ok(())
3593 : }
3594 :
3595 : /// Convenience function to create a page image with given string as the only content
3596 5 : pub fn test_value(s: &str) -> Value {
3597 5 : let mut buf = BytesMut::new();
3598 5 : buf.extend_from_slice(s.as_bytes());
3599 5 : Value::Image(buf.freeze())
3600 5 : }
3601 :
3602 : ///
3603 : /// Test branch creation
3604 : ///
3605 1 : #[tokio::test]
3606 1 : async fn test_branch() -> anyhow::Result<()> {
3607 : use std::str::from_utf8;
3608 :
3609 1 : let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
3610 1 : let tline = tenant
3611 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
3612 2 : .await?;
3613 1 : let writer = tline.writer().await;
3614 :
3615 : #[allow(non_snake_case)]
3616 1 : let TEST_KEY_A: Key = Key::from_hex("112222222233333333444444445500000001").unwrap();
3617 1 : #[allow(non_snake_case)]
3618 1 : let TEST_KEY_B: Key = Key::from_hex("112222222233333333444444445500000002").unwrap();
3619 1 :
3620 1 : // Insert a value on the timeline
3621 1 : writer
3622 1 : .put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))
3623 0 : .await?;
3624 1 : writer
3625 1 : .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))
3626 0 : .await?;
3627 1 : writer.finish_write(Lsn(0x20));
3628 1 :
3629 1 : writer
3630 1 : .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))
3631 0 : .await?;
3632 1 : writer.finish_write(Lsn(0x30));
3633 1 : writer
3634 1 : .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))
3635 0 : .await?;
3636 1 : writer.finish_write(Lsn(0x40));
3637 1 :
3638 1 : //assert_current_logical_size(&tline, Lsn(0x40));
3639 1 :
3640 1 : // Branch the history, modify relation differently on the new timeline
3641 1 : tenant
3642 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx)
3643 0 : .await?;
3644 1 : let newtline = tenant
3645 1 : .get_timeline(NEW_TIMELINE_ID, true)
3646 1 : .expect("Should have a local timeline");
3647 1 : let new_writer = newtline.writer().await;
3648 1 : new_writer
3649 1 : .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))
3650 0 : .await?;
3651 1 : new_writer.finish_write(Lsn(0x40));
3652 :
3653 : // Check page contents on both branches
3654 1 : assert_eq!(
3655 1 : from_utf8(&tline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
3656 : "foo at 0x40"
3657 : );
3658 1 : assert_eq!(
3659 1 : from_utf8(&newtline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
3660 : "bar at 0x40"
3661 : );
3662 1 : assert_eq!(
3663 1 : from_utf8(&newtline.get(TEST_KEY_B, Lsn(0x40), &ctx).await?)?,
3664 : "foobar at 0x20"
3665 : );
3666 :
3667 : //assert_current_logical_size(&tline, Lsn(0x40));
3668 :
3669 1 : Ok(())
3670 : }
3671 :
3672 10 : async fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> {
3673 10 : let mut lsn = start_lsn;
3674 : #[allow(non_snake_case)]
3675 : {
3676 10 : let writer = tline.writer().await;
3677 : // Create a relation on the timeline
3678 10 : writer
3679 10 : .put(
3680 10 : *TEST_KEY,
3681 10 : lsn,
3682 10 : &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
3683 10 : )
3684 0 : .await?;
3685 10 : writer.finish_write(lsn);
3686 10 : lsn += 0x10;
3687 10 : writer
3688 10 : .put(
3689 10 : *TEST_KEY,
3690 10 : lsn,
3691 10 : &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
3692 10 : )
3693 0 : .await?;
3694 10 : writer.finish_write(lsn);
3695 10 : lsn += 0x10;
3696 10 : }
3697 10 : tline.freeze_and_flush().await?;
3698 : {
3699 10 : let writer = tline.writer().await;
3700 10 : writer
3701 10 : .put(
3702 10 : *TEST_KEY,
3703 10 : lsn,
3704 10 : &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
3705 10 : )
3706 0 : .await?;
3707 10 : writer.finish_write(lsn);
3708 10 : lsn += 0x10;
3709 10 : writer
3710 10 : .put(
3711 10 : *TEST_KEY,
3712 10 : lsn,
3713 10 : &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
3714 10 : )
3715 0 : .await?;
3716 10 : writer.finish_write(lsn);
3717 10 : }
3718 10 : tline.freeze_and_flush().await
3719 10 : }
3720 :
3721 1 : #[tokio::test]
3722 1 : async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
3723 1 : let (tenant, ctx) =
3724 1 : TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
3725 1 : .load()
3726 1 : .await;
3727 1 : let tline = tenant
3728 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
3729 2 : .await?;
3730 2 : make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
3731 :
3732 : // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
3733 : // FIXME: this doesn't actually remove any layer currently, given how the flushing
3734 : // and compaction works. But it does set the 'cutoff' point so that the cross check
3735 : // below should fail.
3736 1 : tenant
3737 1 : .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
3738 1 : .await?;
3739 :
3740 : // try to branch at lsn 25, should fail because we already garbage collected the data
3741 1 : match tenant
3742 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
3743 0 : .await
3744 : {
3745 0 : Ok(_) => panic!("branching should have failed"),
3746 1 : Err(err) => {
3747 1 : let CreateTimelineError::AncestorLsn(err) = err else {
3748 0 : panic!("wrong error type")
3749 : };
3750 1 : assert!(err.to_string().contains("invalid branch start lsn"));
3751 1 : assert!(err
3752 1 : .source()
3753 1 : .unwrap()
3754 1 : .to_string()
3755 1 : .contains("we might've already garbage collected needed data"))
3756 : }
3757 : }
3758 :
3759 1 : Ok(())
3760 : }
3761 :
3762 1 : #[tokio::test]
3763 1 : async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
3764 1 : let (tenant, ctx) =
3765 1 : TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?
3766 1 : .load()
3767 1 : .await;
3768 :
3769 1 : let tline = tenant
3770 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)
3771 2 : .await?;
3772 : // try to branch at lsn 0x25, should fail because initdb lsn is 0x50
3773 1 : match tenant
3774 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
3775 0 : .await
3776 : {
3777 0 : Ok(_) => panic!("branching should have failed"),
3778 1 : Err(err) => {
3779 1 : let CreateTimelineError::AncestorLsn(err) = err else {
3780 0 : panic!("wrong error type");
3781 : };
3782 1 : assert!(&err.to_string().contains("invalid branch start lsn"));
3783 1 : assert!(&err
3784 1 : .source()
3785 1 : .unwrap()
3786 1 : .to_string()
3787 1 : .contains("is earlier than latest GC horizon"));
3788 : }
3789 : }
3790 :
3791 1 : Ok(())
3792 : }
3793 :
3794 : /*
3795 : // FIXME: This currently fails to error out. Calling GC doesn't currently
3796 : // remove the old value, we'd need to work a little harder
3797 : #[tokio::test]
3798 : async fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
3799 : let repo =
3800 : RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
3801 : .load();
3802 :
3803 : let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
3804 : make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
3805 :
3806 : repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
3807 : let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
3808 : assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
3809 : match tline.get(*TEST_KEY, Lsn(0x25)) {
3810 : Ok(_) => panic!("request for page should have failed"),
3811 : Err(err) => assert!(err.to_string().contains("not found at")),
3812 : }
3813 : Ok(())
3814 : }
3815 : */
3816 :
3817 1 : #[tokio::test]
3818 1 : async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> {
3819 1 : let (tenant, ctx) =
3820 1 : TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
3821 1 : .load()
3822 1 : .await;
3823 1 : let tline = tenant
3824 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
3825 2 : .await?;
3826 2 : make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
3827 :
3828 1 : tenant
3829 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
3830 0 : .await?;
3831 1 : let newtline = tenant
3832 1 : .get_timeline(NEW_TIMELINE_ID, true)
3833 1 : .expect("Should have a local timeline");
3834 1 :
3835 2 : make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
3836 :
3837 1 : tline.set_broken("test".to_owned());
3838 1 :
3839 1 : tenant
3840 1 : .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
3841 0 : .await?;
3842 :
3843 : // The branchpoints should contain all timelines, even ones marked
3844 : // as Broken.
3845 : {
3846 1 : let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
3847 1 : assert_eq!(branchpoints.len(), 1);
3848 1 : assert_eq!(branchpoints[0], Lsn(0x40));
3849 : }
3850 :
3851 : // You can read the key from the child branch even though the parent is
3852 : // Broken, as long as you don't need to access data from the parent.
3853 1 : assert_eq!(
3854 1 : newtline.get(*TEST_KEY, Lsn(0x70), &ctx).await?,
3855 1 : TEST_IMG(&format!("foo at {}", Lsn(0x70)))
3856 : );
3857 :
3858 : // This needs to traverse to the parent, and fails.
3859 1 : let err = newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await.unwrap_err();
3860 1 : assert!(err
3861 1 : .to_string()
3862 1 : .contains("will not become active. Current state: Broken"));
3863 :
3864 1 : Ok(())
3865 : }
3866 :
3867 1 : #[tokio::test]
3868 1 : async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
3869 1 : let (tenant, ctx) =
3870 1 : TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
3871 1 : .load()
3872 1 : .await;
3873 1 : let tline = tenant
3874 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
3875 2 : .await?;
3876 2 : make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
3877 :
3878 1 : tenant
3879 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
3880 0 : .await?;
3881 1 : let newtline = tenant
3882 1 : .get_timeline(NEW_TIMELINE_ID, true)
3883 1 : .expect("Should have a local timeline");
3884 1 : // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
3885 1 : tenant
3886 1 : .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
3887 1 : .await?;
3888 1 : assert!(newtline.get(*TEST_KEY, Lsn(0x25), &ctx).await.is_ok());
3889 :
3890 1 : Ok(())
3891 : }
3892 1 : #[tokio::test]
3893 1 : async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
3894 1 : let (tenant, ctx) =
3895 1 : TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
3896 1 : .load()
3897 1 : .await;
3898 1 : let tline = tenant
3899 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
3900 2 : .await?;
3901 2 : make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
3902 :
3903 1 : tenant
3904 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
3905 0 : .await?;
3906 1 : let newtline = tenant
3907 1 : .get_timeline(NEW_TIMELINE_ID, true)
3908 1 : .expect("Should have a local timeline");
3909 1 :
3910 2 : make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
3911 :
3912 : // run gc on parent
3913 1 : tenant
3914 1 : .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
3915 0 : .await?;
3916 :
3917 : // Check that the data is still accessible on the branch.
3918 1 : assert_eq!(
3919 1 : newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await?,
3920 1 : TEST_IMG(&format!("foo at {}", Lsn(0x40)))
3921 : );
3922 :
3923 1 : Ok(())
3924 : }
3925 :
3926 1 : #[tokio::test]
3927 1 : async fn timeline_load() -> anyhow::Result<()> {
3928 : const TEST_NAME: &str = "timeline_load";
3929 1 : let harness = TenantHarness::create(TEST_NAME)?;
3930 : {
3931 1 : let (tenant, ctx) = harness.load().await;
3932 1 : let tline = tenant
3933 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)
3934 2 : .await?;
3935 2 : make_some_layers(tline.as_ref(), Lsn(0x8000)).await?;
3936 : // so that all uploads finish & we can call harness.load() below again
3937 1 : tenant
3938 1 : .shutdown(Default::default(), true)
3939 1 : .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
3940 2 : .await
3941 1 : .ok()
3942 1 : .unwrap();
3943 : }
3944 :
3945 5 : let (tenant, _ctx) = harness.load().await;
3946 1 : tenant
3947 1 : .get_timeline(TIMELINE_ID, true)
3948 1 : .expect("cannot load timeline");
3949 1 :
3950 1 : Ok(())
3951 : }
3952 :
3953 1 : #[tokio::test]
3954 1 : async fn timeline_load_with_ancestor() -> anyhow::Result<()> {
3955 : const TEST_NAME: &str = "timeline_load_with_ancestor";
3956 1 : let harness = TenantHarness::create(TEST_NAME)?;
3957 : // create two timelines
3958 : {
3959 1 : let (tenant, ctx) = harness.load().await;
3960 1 : let tline = tenant
3961 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
3962 2 : .await?;
3963 :
3964 2 : make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
3965 :
3966 1 : let child_tline = tenant
3967 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
3968 0 : .await?;
3969 1 : child_tline.set_state(TimelineState::Active);
3970 1 :
3971 1 : let newtline = tenant
3972 1 : .get_timeline(NEW_TIMELINE_ID, true)
3973 1 : .expect("Should have a local timeline");
3974 1 :
3975 2 : make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
3976 :
3977 : // so that all uploads finish & we can call harness.load() below again
3978 1 : tenant
3979 1 : .shutdown(Default::default(), true)
3980 1 : .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
3981 4 : .await
3982 1 : .ok()
3983 1 : .unwrap();
3984 : }
3985 :
3986 : // check that both of them are initially unloaded
3987 9 : let (tenant, _ctx) = harness.load().await;
3988 :
3989 : // check that both, child and ancestor are loaded
3990 1 : let _child_tline = tenant
3991 1 : .get_timeline(NEW_TIMELINE_ID, true)
3992 1 : .expect("cannot get child timeline loaded");
3993 1 :
3994 1 : let _ancestor_tline = tenant
3995 1 : .get_timeline(TIMELINE_ID, true)
3996 1 : .expect("cannot get ancestor timeline loaded");
3997 1 :
3998 1 : Ok(())
3999 : }
4000 :
4001 1 : #[tokio::test]
4002 1 : async fn delta_layer_dumping() -> anyhow::Result<()> {
4003 1 : let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
4004 1 : let tline = tenant
4005 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4006 2 : .await?;
4007 2 : make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
4008 :
4009 1 : let layer_map = tline.layers.read().await;
4010 1 : let level0_deltas = layer_map.layer_map().get_level0_deltas()?;
4011 :
4012 1 : assert!(!level0_deltas.is_empty());
4013 :
4014 3 : for delta in level0_deltas {
4015 2 : let delta = layer_map.get_from_desc(&delta);
4016 2 : // Ensure we are dumping a delta layer here
4017 2 : let delta = delta.downcast_delta_layer().unwrap();
4018 2 :
4019 2 : delta.dump(false, &ctx).await.unwrap();
4020 2 : delta.dump(true, &ctx).await.unwrap();
4021 : }
4022 :
4023 1 : Ok(())
4024 : }
4025 :
4026 1 : #[tokio::test]
4027 1 : async fn corrupt_metadata() -> anyhow::Result<()> {
4028 : const TEST_NAME: &str = "corrupt_metadata";
4029 1 : let harness = TenantHarness::create(TEST_NAME)?;
4030 1 : let (tenant, ctx) = harness.load().await;
4031 :
4032 1 : let tline = tenant
4033 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4034 2 : .await?;
4035 1 : drop(tline);
4036 1 : // so that all uploads finish & we can call harness.try_load() below again
4037 1 : tenant
4038 1 : .shutdown(Default::default(), true)
4039 1 : .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
4040 2 : .await
4041 1 : .ok()
4042 1 : .unwrap();
4043 1 : drop(tenant);
4044 1 :
4045 1 : let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
4046 1 :
4047 1 : assert!(metadata_path.is_file());
4048 :
4049 1 : let mut metadata_bytes = std::fs::read(&metadata_path)?;
4050 1 : assert_eq!(metadata_bytes.len(), 512);
4051 1 : metadata_bytes[8] ^= 1;
4052 1 : std::fs::write(metadata_path, metadata_bytes)?;
4053 :
4054 1 : let err = harness.try_load(&ctx).await.err().expect("should fail");
4055 1 : // get all the stack with all .context, not only the last one
4056 1 : let message = format!("{err:#}");
4057 1 : let expected = "failed to load metadata";
4058 1 : assert!(
4059 1 : message.contains(expected),
4060 0 : "message '{message}' expected to contain {expected}"
4061 : );
4062 :
4063 1 : let mut found_error_message = false;
4064 1 : let mut err_source = err.source();
4065 1 : while let Some(source) = err_source {
4066 1 : if source.to_string().contains("metadata checksum mismatch") {
4067 1 : found_error_message = true;
4068 1 : break;
4069 0 : }
4070 0 : err_source = source.source();
4071 : }
4072 1 : assert!(
4073 1 : found_error_message,
4074 0 : "didn't find the corrupted metadata error in {}",
4075 : message
4076 : );
4077 :
4078 1 : Ok(())
4079 : }
4080 :
4081 1 : #[tokio::test]
4082 1 : async fn test_images() -> anyhow::Result<()> {
4083 1 : let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
4084 1 : let tline = tenant
4085 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
4086 2 : .await?;
4087 :
4088 1 : let writer = tline.writer().await;
4089 1 : writer
4090 1 : .put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
4091 0 : .await?;
4092 1 : writer.finish_write(Lsn(0x10));
4093 1 : drop(writer);
4094 1 :
4095 1 : tline.freeze_and_flush().await?;
4096 1 : tline.compact(&CancellationToken::new(), &ctx).await?;
4097 :
4098 1 : let writer = tline.writer().await;
4099 1 : writer
4100 1 : .put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
4101 0 : .await?;
4102 1 : writer.finish_write(Lsn(0x20));
4103 1 : drop(writer);
4104 1 :
4105 1 : tline.freeze_and_flush().await?;
4106 1 : tline.compact(&CancellationToken::new(), &ctx).await?;
4107 :
4108 1 : let writer = tline.writer().await;
4109 1 : writer
4110 1 : .put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))
4111 0 : .await?;
4112 1 : writer.finish_write(Lsn(0x30));
4113 1 : drop(writer);
4114 1 :
4115 1 : tline.freeze_and_flush().await?;
4116 1 : tline.compact(&CancellationToken::new(), &ctx).await?;
4117 :
4118 1 : let writer = tline.writer().await;
4119 1 : writer
4120 1 : .put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))
4121 0 : .await?;
4122 1 : writer.finish_write(Lsn(0x40));
4123 1 : drop(writer);
4124 1 :
4125 1 : tline.freeze_and_flush().await?;
4126 1 : tline.compact(&CancellationToken::new(), &ctx).await?;
4127 :
4128 1 : assert_eq!(
4129 1 : tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
4130 1 : TEST_IMG("foo at 0x10")
4131 : );
4132 1 : assert_eq!(
4133 1 : tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
4134 1 : TEST_IMG("foo at 0x10")
4135 : );
4136 1 : assert_eq!(
4137 1 : tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
4138 1 : TEST_IMG("foo at 0x20")
4139 : );
4140 1 : assert_eq!(
4141 1 : tline.get(*TEST_KEY, Lsn(0x30), &ctx).await?,
4142 1 : TEST_IMG("foo at 0x30")
4143 : );
4144 1 : assert_eq!(
4145 1 : tline.get(*TEST_KEY, Lsn(0x40), &ctx).await?,
4146 1 : TEST_IMG("foo at 0x40")
4147 : );
4148 :
4149 1 : Ok(())
4150 : }
4151 :
4152 : //
4153 : // Insert 1000 key-value pairs with increasing keys, flush, compact, GC.
4154 : // Repeat 50 times.
4155 : //
4156 1 : #[tokio::test]
4157 1 : async fn test_bulk_insert() -> anyhow::Result<()> {
4158 1 : let (tenant, ctx) = TenantHarness::create("test_bulk_insert")?.load().await;
4159 1 : let tline = tenant
4160 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
4161 2 : .await?;
4162 :
4163 1 : let mut lsn = Lsn(0x10);
4164 1 :
4165 1 : let mut keyspace = KeySpaceAccum::new();
4166 1 :
4167 1 : let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap();
4168 1 : let mut blknum = 0;
4169 51 : for _ in 0..50 {
4170 500050 : for _ in 0..10000 {
4171 500000 : test_key.field6 = blknum;
4172 500000 : let writer = tline.writer().await;
4173 500000 : writer
4174 500000 : .put(
4175 500000 : test_key,
4176 500000 : lsn,
4177 500000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
4178 500000 : )
4179 7800 : .await?;
4180 500000 : writer.finish_write(lsn);
4181 500000 : drop(writer);
4182 500000 :
4183 500000 : keyspace.add_key(test_key);
4184 500000 :
4185 500000 : lsn = Lsn(lsn.0 + 0x10);
4186 500000 : blknum += 1;
4187 : }
4188 :
4189 50 : let cutoff = tline.get_last_record_lsn();
4190 50 :
4191 50 : tline
4192 50 : .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
4193 0 : .await?;
4194 50 : tline.freeze_and_flush().await?;
4195 4005 : tline.compact(&CancellationToken::new(), &ctx).await?;
4196 50 : tline.gc().await?;
4197 : }
4198 :
4199 1 : Ok(())
4200 : }
4201 :
4202 1 : #[tokio::test]
4203 1 : async fn test_random_updates() -> anyhow::Result<()> {
4204 1 : let (tenant, ctx) = TenantHarness::create("test_random_updates")?.load().await;
4205 1 : let tline = tenant
4206 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4207 2 : .await?;
4208 :
4209 : const NUM_KEYS: usize = 1000;
4210 :
4211 1 : let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap();
4212 1 :
4213 1 : let mut keyspace = KeySpaceAccum::new();
4214 1 :
4215 1 : // Track when each page was last modified. Used to assert that
4216 1 : // a read sees the latest page version.
4217 1 : let mut updated = [Lsn(0); NUM_KEYS];
4218 1 :
4219 1 : let mut lsn = Lsn(0x10);
4220 : #[allow(clippy::needless_range_loop)]
4221 1001 : for blknum in 0..NUM_KEYS {
4222 1000 : lsn = Lsn(lsn.0 + 0x10);
4223 1000 : test_key.field6 = blknum as u32;
4224 1000 : let writer = tline.writer().await;
4225 1000 : writer
4226 1000 : .put(
4227 1000 : test_key,
4228 1000 : lsn,
4229 1000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
4230 1000 : )
4231 16 : .await?;
4232 1000 : writer.finish_write(lsn);
4233 1000 : updated[blknum] = lsn;
4234 1000 : drop(writer);
4235 1000 :
4236 1000 : keyspace.add_key(test_key);
4237 : }
4238 :
4239 51 : for _ in 0..50 {
4240 50050 : for _ in 0..NUM_KEYS {
4241 50000 : lsn = Lsn(lsn.0 + 0x10);
4242 50000 : let blknum = thread_rng().gen_range(0..NUM_KEYS);
4243 50000 : test_key.field6 = blknum as u32;
4244 50000 : let writer = tline.writer().await;
4245 50000 : writer
4246 50000 : .put(
4247 50000 : test_key,
4248 50000 : lsn,
4249 50000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
4250 50000 : )
4251 750 : .await?;
4252 50000 : writer.finish_write(lsn);
4253 50000 : drop(writer);
4254 50000 : updated[blknum] = lsn;
4255 : }
4256 :
4257 : // Read all the blocks
4258 50000 : for (blknum, last_lsn) in updated.iter().enumerate() {
4259 50000 : test_key.field6 = blknum as u32;
4260 50000 : assert_eq!(
4261 50000 : tline.get(test_key, lsn, &ctx).await?,
4262 50000 : TEST_IMG(&format!("{} at {}", blknum, last_lsn))
4263 : );
4264 : }
4265 :
4266 : // Perform a cycle of flush, compact, and GC
4267 50 : let cutoff = tline.get_last_record_lsn();
4268 50 : tline
4269 50 : .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
4270 0 : .await?;
4271 52 : tline.freeze_and_flush().await?;
4272 453 : tline.compact(&CancellationToken::new(), &ctx).await?;
4273 50 : tline.gc().await?;
4274 : }
4275 :
4276 1 : Ok(())
4277 : }
4278 :
4279 1 : #[tokio::test]
4280 1 : async fn test_traverse_branches() -> anyhow::Result<()> {
4281 1 : let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
4282 1 : .load()
4283 1 : .await;
4284 1 : let mut tline = tenant
4285 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4286 2 : .await?;
4287 :
4288 : const NUM_KEYS: usize = 1000;
4289 :
4290 1 : let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap();
4291 1 :
4292 1 : let mut keyspace = KeySpaceAccum::new();
4293 1 :
4294 1 : // Track when each page was last modified. Used to assert that
4295 1 : // a read sees the latest page version.
4296 1 : let mut updated = [Lsn(0); NUM_KEYS];
4297 1 :
4298 1 : let mut lsn = Lsn(0x10);
4299 : #[allow(clippy::needless_range_loop)]
4300 1001 : for blknum in 0..NUM_KEYS {
4301 1000 : lsn = Lsn(lsn.0 + 0x10);
4302 1000 : test_key.field6 = blknum as u32;
4303 1000 : let writer = tline.writer().await;
4304 1000 : writer
4305 1000 : .put(
4306 1000 : test_key,
4307 1000 : lsn,
4308 1000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
4309 1000 : )
4310 16 : .await?;
4311 1000 : writer.finish_write(lsn);
4312 1000 : updated[blknum] = lsn;
4313 1000 : drop(writer);
4314 1000 :
4315 1000 : keyspace.add_key(test_key);
4316 : }
4317 :
4318 51 : for _ in 0..50 {
4319 50 : let new_tline_id = TimelineId::generate();
4320 50 : tenant
4321 50 : .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
4322 0 : .await?;
4323 50 : tline = tenant
4324 50 : .get_timeline(new_tline_id, true)
4325 50 : .expect("Should have the branched timeline");
4326 :
4327 50050 : for _ in 0..NUM_KEYS {
4328 50000 : lsn = Lsn(lsn.0 + 0x10);
4329 50000 : let blknum = thread_rng().gen_range(0..NUM_KEYS);
4330 50000 : test_key.field6 = blknum as u32;
4331 50000 : let writer = tline.writer().await;
4332 50000 : writer
4333 50000 : .put(
4334 50000 : test_key,
4335 50000 : lsn,
4336 50000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
4337 50000 : )
4338 751 : .await?;
4339 50000 : println!("updating {} at {}", blknum, lsn);
4340 50000 : writer.finish_write(lsn);
4341 50000 : drop(writer);
4342 50000 : updated[blknum] = lsn;
4343 : }
4344 :
4345 : // Read all the blocks
4346 50000 : for (blknum, last_lsn) in updated.iter().enumerate() {
4347 50000 : test_key.field6 = blknum as u32;
4348 50000 : assert_eq!(
4349 50000 : tline.get(test_key, lsn, &ctx).await?,
4350 50000 : TEST_IMG(&format!("{} at {}", blknum, last_lsn))
4351 : );
4352 : }
4353 :
4354 : // Perform a cycle of flush, compact, and GC
4355 50 : let cutoff = tline.get_last_record_lsn();
4356 50 : tline
4357 50 : .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
4358 0 : .await?;
4359 51 : tline.freeze_and_flush().await?;
4360 147 : tline.compact(&CancellationToken::new(), &ctx).await?;
4361 50 : tline.gc().await?;
4362 : }
4363 :
4364 1 : Ok(())
4365 : }
4366 :
4367 1 : #[tokio::test]
4368 1 : async fn test_traverse_ancestors() -> anyhow::Result<()> {
4369 1 : let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
4370 1 : .load()
4371 1 : .await;
4372 1 : let mut tline = tenant
4373 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4374 2 : .await?;
4375 :
4376 : const NUM_KEYS: usize = 100;
4377 : const NUM_TLINES: usize = 50;
4378 :
4379 1 : let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap();
4380 1 : // Track page mutation lsns across different timelines.
4381 1 : let mut updated = [[Lsn(0); NUM_KEYS]; NUM_TLINES];
4382 1 :
4383 1 : let mut lsn = Lsn(0x10);
4384 :
4385 : #[allow(clippy::needless_range_loop)]
4386 51 : for idx in 0..NUM_TLINES {
4387 50 : let new_tline_id = TimelineId::generate();
4388 50 : tenant
4389 50 : .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
4390 0 : .await?;
4391 50 : tline = tenant
4392 50 : .get_timeline(new_tline_id, true)
4393 50 : .expect("Should have the branched timeline");
4394 :
4395 5050 : for _ in 0..NUM_KEYS {
4396 5000 : lsn = Lsn(lsn.0 + 0x10);
4397 5000 : let blknum = thread_rng().gen_range(0..NUM_KEYS);
4398 5000 : test_key.field6 = blknum as u32;
4399 5000 : let writer = tline.writer().await;
4400 5000 : writer
4401 5000 : .put(
4402 5000 : test_key,
4403 5000 : lsn,
4404 5000 : &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
4405 5000 : )
4406 78 : .await?;
4407 5000 : println!("updating [{}][{}] at {}", idx, blknum, lsn);
4408 5000 : writer.finish_write(lsn);
4409 5000 : drop(writer);
4410 5000 : updated[idx][blknum] = lsn;
4411 : }
4412 : }
4413 :
4414 : // Read pages from leaf timeline across all ancestors.
4415 50 : for (idx, lsns) in updated.iter().enumerate() {
4416 5000 : for (blknum, lsn) in lsns.iter().enumerate() {
4417 : // Skip empty mutations.
4418 5000 : if lsn.0 == 0 {
4419 1797 : continue;
4420 3203 : }
4421 3203 : println!("checking [{idx}][{blknum}] at {lsn}");
4422 3203 : test_key.field6 = blknum as u32;
4423 3203 : assert_eq!(
4424 3203 : tline.get(test_key, *lsn, &ctx).await?,
4425 3203 : TEST_IMG(&format!("{idx} {blknum} at {lsn}"))
4426 : );
4427 : }
4428 : }
4429 1 : Ok(())
4430 : }
4431 :
4432 1 : #[tokio::test]
4433 1 : async fn test_write_at_initdb_lsn_takes_optimization_code_path() -> anyhow::Result<()> {
4434 1 : let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")?
4435 1 : .load()
4436 1 : .await;
4437 :
4438 1 : let initdb_lsn = Lsn(0x20);
4439 1 : let utline = tenant
4440 1 : .create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
4441 0 : .await?;
4442 1 : let tline = utline.raw_timeline().unwrap();
4443 1 :
4444 1 : // Spawn flush loop now so that we can set the `expect_initdb_optimization`
4445 1 : tline.maybe_spawn_flush_loop();
4446 1 :
4447 1 : // Make sure the timeline has the minimum set of required keys for operation.
4448 1 : // The only operation you can always do on an empty timeline is to `put` new data.
4449 1 : // Except if you `put` at `initdb_lsn`.
4450 1 : // In that case, there's an optimization to directly create image layers instead of delta layers.
4451 1 : // It uses `repartition()`, which assumes some keys to be present.
4452 1 : // Let's make sure the test timeline can handle that case.
4453 1 : {
4454 1 : let mut state = tline.flush_loop_state.lock().unwrap();
4455 1 : assert_eq!(
4456 1 : timeline::FlushLoopState::Running {
4457 1 : expect_initdb_optimization: false,
4458 1 : initdb_optimization_count: 0,
4459 1 : },
4460 1 : *state
4461 1 : );
4462 1 : *state = timeline::FlushLoopState::Running {
4463 1 : expect_initdb_optimization: true,
4464 1 : initdb_optimization_count: 0,
4465 1 : };
4466 1 : }
4467 1 :
4468 1 : // Make writes at the initdb_lsn. When we flush it below, it should be handled by the optimization.
4469 1 : // As explained above, the optimization requires some keys to be present.
4470 1 : // As per `create_empty_timeline` documentation, use init_empty to set them.
4471 1 : // This is what `create_test_timeline` does, by the way.
4472 1 : let mut modification = tline.begin_modification(initdb_lsn);
4473 1 : modification
4474 1 : .init_empty_test_timeline()
4475 1 : .context("init_empty_test_timeline")?;
4476 1 : modification
4477 1 : .commit()
4478 0 : .await
4479 1 : .context("commit init_empty_test_timeline modification")?;
4480 :
4481 : // Do the flush. The flush code will check the expectations that we set above.
4482 1 : tline.freeze_and_flush().await?;
4483 :
4484 : // assert freeze_and_flush exercised the initdb optimization
4485 : {
4486 1 : let state = tline.flush_loop_state.lock().unwrap();
4487 : let timeline::FlushLoopState::Running {
4488 1 : expect_initdb_optimization,
4489 1 : initdb_optimization_count,
4490 1 : } = *state
4491 : else {
4492 0 : panic!("unexpected state: {:?}", *state);
4493 : };
4494 1 : assert!(expect_initdb_optimization);
4495 1 : assert!(initdb_optimization_count > 0);
4496 : }
4497 1 : Ok(())
4498 : }
4499 :
4500 1 : #[tokio::test]
4501 1 : async fn test_uninit_mark_crash() -> anyhow::Result<()> {
4502 1 : let name = "test_uninit_mark_crash";
4503 1 : let harness = TenantHarness::create(name)?;
4504 : {
4505 1 : let (tenant, ctx) = harness.load().await;
4506 1 : let tline = tenant
4507 1 : .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
4508 0 : .await?;
4509 : // Keeps uninit mark in place
4510 1 : let raw_tline = tline.raw_timeline().unwrap();
4511 1 : raw_tline
4512 1 : .shutdown(false)
4513 1 : .instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_id))
4514 0 : .await;
4515 1 : std::mem::forget(tline);
4516 : }
4517 :
4518 1 : let (tenant, _) = harness.load().await;
4519 1 : match tenant.get_timeline(TIMELINE_ID, false) {
4520 0 : Ok(_) => panic!("timeline should've been removed during load"),
4521 1 : Err(e) => {
4522 1 : assert_eq!(
4523 1 : e,
4524 1 : GetTimelineError::NotFound {
4525 1 : tenant_id: tenant.tenant_id,
4526 1 : timeline_id: TIMELINE_ID,
4527 1 : }
4528 1 : )
4529 : }
4530 : }
4531 :
4532 1 : assert!(!harness
4533 1 : .conf
4534 1 : .timeline_path(&tenant.tenant_id, &TIMELINE_ID)
4535 1 : .exists());
4536 :
4537 1 : assert!(!harness
4538 1 : .conf
4539 1 : .timeline_uninit_mark_file_path(tenant.tenant_id, TIMELINE_ID)
4540 1 : .exists());
4541 :
4542 1 : Ok(())
4543 : }
4544 : }
|