TLA Line data Source code
1 : //!
2 : //! Timeline repository implementation that keeps old data in files on disk, and
3 : //! the recent changes in memory. See tenant/*_layer.rs files.
4 : //! The functions here are responsible for locating the correct layer for the
5 : //! get/put call, walking back the timeline branching history as needed.
6 : //!
7 : //! The files are stored in the .neon/tenants/<tenant_id>/timelines/<timeline_id>
8 : //! directory. See docs/pageserver-storage.md for how the files are managed.
9 : //! In addition to the layer files, there is a metadata file in the same
10 : //! directory that contains information about the timeline, in particular its
11 : //! parent timeline, and the last LSN that has been written to disk.
12 : //!
13 :
14 : use anyhow::{bail, Context};
15 : use camino::{Utf8Path, Utf8PathBuf};
16 : use futures::FutureExt;
17 : use pageserver_api::models::TimelineState;
18 : use remote_storage::DownloadError;
19 : use remote_storage::GenericRemoteStorage;
20 : use storage_broker::BrokerClientChannel;
21 : use tokio::sync::watch;
22 : use tokio::task::JoinSet;
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::*;
25 : use utils::completion;
26 : use utils::completion::Completion;
27 : use utils::crashsafe::path_with_suffix_extension;
28 :
29 : use std::cmp::min;
30 : use std::collections::hash_map::Entry;
31 : use std::collections::BTreeSet;
32 : use std::collections::HashMap;
33 : use std::collections::HashSet;
34 : use std::fmt::Debug;
35 : use std::fmt::Display;
36 : use std::fs;
37 : use std::fs::File;
38 : use std::io;
39 : use std::ops::Bound::Included;
40 : use std::process::Command;
41 : use std::process::Stdio;
42 : use std::sync::atomic::AtomicU64;
43 : use std::sync::atomic::Ordering;
44 : use std::sync::Arc;
45 : use std::sync::MutexGuard;
46 : use std::sync::{Mutex, RwLock};
47 : use std::time::{Duration, Instant};
48 :
49 : use self::config::AttachedLocationConfig;
50 : use self::config::AttachmentMode;
51 : use self::config::LocationConf;
52 : use self::config::TenantConf;
53 : use self::delete::DeleteTenantFlow;
54 : use self::metadata::LoadMetadataError;
55 : use self::metadata::TimelineMetadata;
56 : use self::mgr::TenantsMap;
57 : use self::remote_timeline_client::RemoteTimelineClient;
58 : use self::timeline::uninit::TimelineUninitMark;
59 : use self::timeline::uninit::UninitializedTimeline;
60 : use self::timeline::EvictionTaskTenantState;
61 : use self::timeline::TimelineResources;
62 : use crate::config::PageServerConf;
63 : use crate::context::{DownloadBehavior, RequestContext};
64 : use crate::deletion_queue::DeletionQueueClient;
65 : use crate::import_datadir;
66 : use crate::is_uninit_mark;
67 : use crate::metrics::TENANT_ACTIVATION;
68 : use crate::metrics::{remove_tenant_metrics, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC};
69 : use crate::repository::GcResult;
70 : use crate::task_mgr;
71 : use crate::task_mgr::TaskKind;
72 : use crate::tenant::config::LocationMode;
73 : use crate::tenant::config::TenantConfOpt;
74 : use crate::tenant::metadata::load_metadata;
75 : pub use crate::tenant::remote_timeline_client::index::IndexPart;
76 : use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
77 : use crate::tenant::storage_layer::DeltaLayer;
78 : use crate::tenant::storage_layer::ImageLayer;
79 : use crate::InitializationOrder;
80 : use crate::METADATA_FILE_NAME;
81 :
82 : use crate::tenant::timeline::delete::DeleteTimelineFlow;
83 : use crate::tenant::timeline::uninit::cleanup_timeline_directory;
84 : use crate::virtual_file::VirtualFile;
85 : use crate::walredo::PostgresRedoManager;
86 : use crate::TEMP_FILE_SUFFIX;
87 : pub use pageserver_api::models::TenantState;
88 :
89 : use toml_edit;
90 : use utils::{
91 : crashsafe,
92 : generation::Generation,
93 : id::{TenantId, TimelineId},
94 : lsn::{Lsn, RecordLsn},
95 : };
96 :
97 : /// Declare a failpoint that can use the `pause` failpoint action.
98 : /// We don't want to block the executor thread, hence, spawn_blocking + await.
99 : macro_rules! pausable_failpoint {
100 : ($name:literal) => {
101 : if cfg!(feature = "testing") {
102 : tokio::task::spawn_blocking({
103 : let current = tracing::Span::current();
104 : move || {
105 : let _entered = current.entered();
106 : tracing::info!("at failpoint {}", $name);
107 : fail::fail_point!($name);
108 : }
109 : })
110 : .await
111 : .expect("spawn_blocking");
112 : }
113 : };
114 : }
115 :
116 : pub mod blob_io;
117 : pub mod block_io;
118 :
119 : pub mod disk_btree;
120 : pub(crate) mod ephemeral_file;
121 : pub mod layer_map;
122 : mod span;
123 :
124 : pub mod metadata;
125 : mod par_fsync;
126 : pub mod remote_timeline_client;
127 : pub mod storage_layer;
128 :
129 : pub mod config;
130 : pub mod delete;
131 : pub mod mgr;
132 : pub mod tasks;
133 : pub mod upload_queue;
134 :
135 : pub(crate) mod timeline;
136 :
137 : pub mod size;
138 :
139 : pub(crate) use timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
140 : pub use timeline::{
141 : LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline,
142 : };
143 :
144 : // re-export for use in remote_timeline_client.rs
145 : pub use crate::tenant::metadata::save_metadata;
146 :
147 : // re-export for use in walreceiver
148 : pub use crate::tenant::timeline::WalReceiverInfo;
149 :
150 : /// The "tenants" part of `tenants/<tenant>/timelines...`
151 : pub const TENANTS_SEGMENT_NAME: &str = "tenants";
152 :
153 : /// Parts of the `.neon/tenants/<tenant_id>/timelines/<timeline_id>` directory prefix.
154 : pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
155 :
156 : pub const TENANT_ATTACHING_MARKER_FILENAME: &str = "attaching";
157 :
158 : pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
159 :
160 : /// References to shared objects that are passed into each tenant, such
161 : /// as the shared remote storage client and process initialization state.
162 CBC 210 : #[derive(Clone)]
163 : pub struct TenantSharedResources {
164 : pub broker_client: storage_broker::BrokerClientChannel,
165 : pub remote_storage: Option<GenericRemoteStorage>,
166 : pub deletion_queue_client: DeletionQueueClient,
167 : }
168 :
169 : /// A [`Tenant`] is really an _attached_ tenant. The configuration
170 : /// for an attached tenant is a subset of the [`LocationConf`], represented
171 : /// in this struct.
172 : pub(super) struct AttachedTenantConf {
173 : tenant_conf: TenantConfOpt,
174 : location: AttachedLocationConfig,
175 : }
176 :
177 : impl AttachedTenantConf {
178 748 : fn try_from(location_conf: LocationConf) -> anyhow::Result<Self> {
179 748 : match &location_conf.mode {
180 748 : LocationMode::Attached(attach_conf) => Ok(Self {
181 748 : tenant_conf: location_conf.tenant_conf,
182 748 : location: attach_conf.clone(),
183 748 : }),
184 : LocationMode::Secondary(_) => {
185 UBC 0 : anyhow::bail!("Attempted to construct AttachedTenantConf from a LocationConf in secondary mode")
186 : }
187 : }
188 CBC 748 : }
189 : }
190 : struct TimelinePreload {
191 : timeline_id: TimelineId,
192 : client: RemoteTimelineClient,
193 : index_part: Result<MaybeDeletedIndexPart, DownloadError>,
194 : }
195 :
196 : ///
197 : /// Tenant consists of multiple timelines. Keep them in a hash table.
198 : ///
199 : pub struct Tenant {
200 : // Global pageserver config parameters
201 : pub conf: &'static PageServerConf,
202 :
203 : /// The value creation timestamp, used to measure activation delay, see:
204 : /// <https://github.com/neondatabase/neon/issues/4025>
205 : loading_started_at: Instant,
206 :
207 : state: watch::Sender<TenantState>,
208 :
209 : // Overridden tenant-specific config parameters.
210 : // We keep TenantConfOpt sturct here to preserve the information
211 : // about parameters that are not set.
212 : // This is necessary to allow global config updates.
213 : tenant_conf: Arc<RwLock<AttachedTenantConf>>,
214 :
215 : tenant_id: TenantId,
216 :
217 : /// The remote storage generation, used to protect S3 objects from split-brain.
218 : /// Does not change over the lifetime of the [`Tenant`] object.
219 : ///
220 : /// This duplicates the generation stored in LocationConf, but that structure is mutable:
221 : /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
222 : generation: Generation,
223 :
224 : timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
225 : // This mutex prevents creation of new timelines during GC.
226 : // Adding yet another mutex (in addition to `timelines`) is needed because holding
227 : // `timelines` mutex during all GC iteration
228 : // may block for a long time `get_timeline`, `get_timelines_state`,... and other operations
229 : // with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
230 : // timeout...
231 : gc_cs: tokio::sync::Mutex<()>,
232 : walredo_mgr: Arc<WalRedoManager>,
233 :
234 : // provides access to timeline data sitting in the remote storage
235 : pub(crate) remote_storage: Option<GenericRemoteStorage>,
236 :
237 : // Access to global deletion queue for when this tenant wants to schedule a deletion
238 : deletion_queue_client: DeletionQueueClient,
239 :
240 : /// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
241 : cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
242 : cached_synthetic_tenant_size: Arc<AtomicU64>,
243 :
244 : eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
245 :
246 : pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
247 : }
248 :
249 : pub(crate) enum WalRedoManager {
250 : Prod(PostgresRedoManager),
251 : #[cfg(test)]
252 : Test(harness::TestRedoManager),
253 : }
254 :
255 : impl From<PostgresRedoManager> for WalRedoManager {
256 706 : fn from(mgr: PostgresRedoManager) -> Self {
257 706 : Self::Prod(mgr)
258 706 : }
259 : }
260 :
261 : #[cfg(test)]
262 : impl From<harness::TestRedoManager> for WalRedoManager {
263 42 : fn from(mgr: harness::TestRedoManager) -> Self {
264 42 : Self::Test(mgr)
265 42 : }
266 : }
267 :
268 : impl WalRedoManager {
269 2255848 : pub async fn request_redo(
270 2255848 : &self,
271 2255848 : key: crate::repository::Key,
272 2255848 : lsn: Lsn,
273 2255848 : base_img: Option<(Lsn, bytes::Bytes)>,
274 2255848 : records: Vec<(Lsn, crate::walrecord::NeonWalRecord)>,
275 2255848 : pg_version: u32,
276 2255848 : ) -> anyhow::Result<bytes::Bytes> {
277 UBC 0 : match self {
278 CBC 2255849 : Self::Prod(mgr) => {
279 UBC 0 : mgr.request_redo(key, lsn, base_img, records, pg_version)
280 0 : .await
281 : }
282 : #[cfg(test)]
283 0 : Self::Test(mgr) => {
284 0 : mgr.request_redo(key, lsn, base_img, records, pg_version)
285 0 : .await
286 : }
287 : }
288 CBC 2255846 : }
289 : }
290 :
291 156 : #[derive(Debug, thiserror::Error, PartialEq, Eq)]
292 : pub enum GetTimelineError {
293 : #[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")]
294 : NotActive {
295 : tenant_id: TenantId,
296 : timeline_id: TimelineId,
297 : state: TimelineState,
298 : },
299 : #[error("Timeline {tenant_id}/{timeline_id} was not found")]
300 : NotFound {
301 : tenant_id: TenantId,
302 : timeline_id: TimelineId,
303 : },
304 : }
305 :
306 UBC 0 : #[derive(Debug, thiserror::Error)]
307 : pub enum LoadLocalTimelineError {
308 : #[error("FailedToLoad")]
309 : Load(#[source] anyhow::Error),
310 : #[error("FailedToResumeDeletion")]
311 : ResumeDeletion(#[source] anyhow::Error),
312 : }
313 :
314 CBC 183 : #[derive(thiserror::Error)]
315 : pub enum DeleteTimelineError {
316 : #[error("NotFound")]
317 : NotFound,
318 :
319 : #[error("HasChildren")]
320 : HasChildren(Vec<TimelineId>),
321 :
322 : #[error("Timeline deletion is already in progress")]
323 : AlreadyInProgress(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>),
324 :
325 : #[error(transparent)]
326 : Other(#[from] anyhow::Error),
327 : }
328 :
329 : impl Debug for DeleteTimelineError {
330 UBC 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331 0 : match self {
332 0 : Self::NotFound => write!(f, "NotFound"),
333 0 : Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(),
334 0 : Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(),
335 0 : Self::Other(e) => f.debug_tuple("Other").field(e).finish(),
336 : }
337 0 : }
338 : }
339 :
340 : pub enum SetStoppingError {
341 : AlreadyStopping(completion::Barrier),
342 : Broken,
343 : }
344 :
345 : impl Debug for SetStoppingError {
346 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347 0 : match self {
348 0 : Self::AlreadyStopping(_) => f.debug_tuple("AlreadyStopping").finish(),
349 0 : Self::Broken => write!(f, "Broken"),
350 : }
351 0 : }
352 : }
353 :
354 0 : #[derive(Debug, thiserror::Error)]
355 : pub(crate) enum WaitToBecomeActiveError {
356 : WillNotBecomeActive {
357 : tenant_id: TenantId,
358 : state: TenantState,
359 : },
360 : TenantDropped {
361 : tenant_id: TenantId,
362 : },
363 : }
364 :
365 : impl std::fmt::Display for WaitToBecomeActiveError {
366 CBC 62 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
367 62 : match self {
368 62 : WaitToBecomeActiveError::WillNotBecomeActive { tenant_id, state } => {
369 62 : write!(
370 62 : f,
371 62 : "Tenant {} will not become active. Current state: {:?}",
372 62 : tenant_id, state
373 62 : )
374 : }
375 UBC 0 : WaitToBecomeActiveError::TenantDropped { tenant_id } => {
376 0 : write!(f, "Tenant {tenant_id} will not become active (dropped)")
377 : }
378 : }
379 CBC 62 : }
380 : }
381 :
382 18 : #[derive(thiserror::Error, Debug)]
383 : pub enum CreateTimelineError {
384 : #[error("a timeline with the given ID already exists")]
385 : AlreadyExists,
386 : #[error(transparent)]
387 : AncestorLsn(anyhow::Error),
388 : #[error("ancestor timeline is not active")]
389 : AncestorNotActive,
390 : #[error(transparent)]
391 : Other(#[from] anyhow::Error),
392 : }
393 :
394 : /// spawn_attach argument for whether the caller is using attachment markers
395 : pub(super) enum AttachMarkerMode {
396 : Expect,
397 : Ignore,
398 : }
399 :
400 : struct TenantDirectoryScan {
401 : sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
402 : timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
403 : }
404 :
405 : enum CreateTimelineCause {
406 : Load,
407 : Delete,
408 : }
409 :
410 : impl Tenant {
411 : /// Yet another helper for timeline initialization.
412 : /// Contains the common part of `load_local_timeline` and `load_remote_timeline`.
413 : ///
414 : /// - Initializes the Timeline struct and inserts it into the tenant's hash map
415 : /// - Scans the local timeline directory for layer files and builds the layer map
416 : /// - Downloads remote index file and adds remote files to the layer map
417 : /// - Schedules remote upload tasks for any files that are present locally but missing from remote storage.
418 : ///
419 : /// If the operation fails, the timeline is left in the tenant's hash map in Broken state. On success,
420 : /// it is marked as Active.
421 : #[allow(clippy::too_many_arguments)]
422 314 : async fn timeline_init_and_sync(
423 314 : &self,
424 314 : timeline_id: TimelineId,
425 314 : resources: TimelineResources,
426 314 : index_part: Option<IndexPart>,
427 314 : metadata: TimelineMetadata,
428 314 : ancestor: Option<Arc<Timeline>>,
429 314 : init_order: Option<&InitializationOrder>,
430 314 : _ctx: &RequestContext,
431 314 : ) -> anyhow::Result<()> {
432 314 : let tenant_id = self.tenant_id;
433 :
434 314 : let timeline = self.create_timeline_struct(
435 314 : timeline_id,
436 314 : &metadata,
437 314 : ancestor.clone(),
438 314 : resources,
439 314 : init_order,
440 314 : CreateTimelineCause::Load,
441 314 : )?;
442 314 : let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
443 314 : anyhow::ensure!(
444 314 : disk_consistent_lsn.is_valid(),
445 UBC 0 : "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
446 : );
447 CBC 314 : assert_eq!(
448 314 : disk_consistent_lsn,
449 314 : metadata.disk_consistent_lsn(),
450 UBC 0 : "these are used interchangeably"
451 : );
452 :
453 CBC 314 : if let Some(index_part) = index_part.as_ref() {
454 314 : timeline
455 314 : .remote_client
456 314 : .as_ref()
457 314 : .unwrap()
458 314 : .init_upload_queue(index_part)?;
459 UBC 0 : } else if self.remote_storage.is_some() {
460 : // No data on the remote storage, but we have local metadata file. We can end up
461 : // here with timeline_create being interrupted before finishing index part upload.
462 : // By doing what we do here, the index part upload is retried.
463 : // If control plane retries timeline creation in the meantime, the mgmt API handler
464 : // for timeline creation will coalesce on the upload we queue here.
465 0 : let rtc = timeline.remote_client.as_ref().unwrap();
466 0 : rtc.init_upload_queue_for_empty_remote(&metadata)?;
467 0 : rtc.schedule_index_upload_for_metadata_update(&metadata)?;
468 0 : }
469 :
470 CBC 314 : timeline
471 314 : .load_layer_map(disk_consistent_lsn, index_part)
472 314 : .await
473 314 : .with_context(|| {
474 UBC 0 : format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}")
475 CBC 314 : })?;
476 :
477 : {
478 : // avoiding holding it across awaits
479 314 : let mut timelines_accessor = self.timelines.lock().unwrap();
480 314 : match timelines_accessor.entry(timeline_id) {
481 : Entry::Occupied(_) => {
482 : // The uninit mark file acts as a lock that prevents another task from
483 : // initializing the timeline at the same time.
484 UBC 0 : unreachable!(
485 0 : "Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
486 0 : );
487 : }
488 CBC 314 : Entry::Vacant(v) => {
489 314 : v.insert(Arc::clone(&timeline));
490 314 : timeline.maybe_spawn_flush_loop();
491 314 : }
492 314 : }
493 314 : };
494 314 :
495 314 : // Sanity check: a timeline should have some content.
496 314 : anyhow::ensure!(
497 314 : ancestor.is_some()
498 267 : || timeline
499 267 : .layers
500 267 : .read()
501 UBC 0 : .await
502 CBC 267 : .layer_map()
503 267 : .iter_historic_layers()
504 267 : .next()
505 267 : .is_some(),
506 UBC 0 : "Timeline has no ancestor and no layer files"
507 : );
508 :
509 CBC 314 : Ok(())
510 314 : }
511 :
512 : ///
513 : /// Attach a tenant that's available in cloud storage.
514 : ///
515 : /// This returns quickly, after just creating the in-memory object
516 : /// Tenant struct and launching a background task to download
517 : /// the remote index files. On return, the tenant is most likely still in
518 : /// Attaching state, and it will become Active once the background task
519 : /// finishes. You can use wait_until_active() to wait for the task to
520 : /// complete.
521 : ///
522 48 : pub(crate) fn spawn_attach(
523 48 : conf: &'static PageServerConf,
524 48 : tenant_id: TenantId,
525 48 : resources: TenantSharedResources,
526 48 : attached_conf: AttachedTenantConf,
527 48 : tenants: &'static tokio::sync::RwLock<TenantsMap>,
528 48 : expect_marker: AttachMarkerMode,
529 48 : ctx: &RequestContext,
530 48 : ) -> anyhow::Result<Arc<Tenant>> {
531 48 : // TODO dedup with spawn_load
532 48 : let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
533 48 : conf, tenant_id,
534 48 : )));
535 48 :
536 48 : let TenantSharedResources {
537 48 : broker_client,
538 48 : remote_storage,
539 48 : deletion_queue_client,
540 48 : } = resources;
541 48 :
542 48 : let tenant = Arc::new(Tenant::new(
543 48 : TenantState::Attaching,
544 48 : conf,
545 48 : attached_conf,
546 48 : wal_redo_manager,
547 48 : tenant_id,
548 48 : remote_storage.clone(),
549 48 : deletion_queue_client,
550 48 : ));
551 48 :
552 48 : // Do all the hard work in the background
553 48 : let tenant_clone = Arc::clone(&tenant);
554 48 :
555 48 : let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
556 48 : task_mgr::spawn(
557 48 : &tokio::runtime::Handle::current(),
558 48 : TaskKind::Attach,
559 48 : Some(tenant_id),
560 48 : None,
561 48 : "attach tenant",
562 : false,
563 48 : async move {
564 48 : // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
565 48 : let make_broken = |t: &Tenant, err: anyhow::Error| {
566 6 : error!("attach failed, setting tenant state to Broken: {err:?}");
567 6 : t.state.send_modify(|state| {
568 6 : assert_eq!(
569 : *state,
570 : TenantState::Attaching,
571 UBC 0 : "the attach task owns the tenant state until activation is complete"
572 : );
573 CBC 6 : *state = TenantState::broken_from_reason(err.to_string());
574 6 : });
575 6 : };
576 :
577 48 : let pending_deletion = {
578 48 : match DeleteTenantFlow::should_resume_deletion(
579 48 : conf,
580 48 : remote_storage.as_ref(),
581 48 : &tenant_clone,
582 48 : )
583 144 : .await
584 : {
585 48 : Ok(should_resume_deletion) => should_resume_deletion,
586 UBC 0 : Err(err) => {
587 0 : make_broken(&tenant_clone, anyhow::anyhow!(err));
588 0 : return Ok(());
589 : }
590 : }
591 : };
592 :
593 CBC 48 : info!("pending_deletion {}", pending_deletion.is_some());
594 :
595 48 : if let Some(deletion) = pending_deletion {
596 3 : match DeleteTenantFlow::resume_from_attach(
597 3 : deletion,
598 3 : &tenant_clone,
599 3 : tenants,
600 3 : &ctx,
601 3 : )
602 189 : .await
603 : {
604 UBC 0 : Err(err) => {
605 0 : make_broken(&tenant_clone, anyhow::anyhow!(err));
606 0 : return Ok(());
607 : }
608 CBC 3 : Ok(()) => return Ok(()),
609 : }
610 45 : }
611 45 :
612 316 : match tenant_clone.attach(&ctx, expect_marker).await {
613 : Ok(()) => {
614 39 : info!("attach finished, activating");
615 39 : tenant_clone.activate(broker_client, None, &ctx);
616 : }
617 6 : Err(e) => {
618 6 : make_broken(&tenant_clone, anyhow::anyhow!(e));
619 6 : }
620 : }
621 45 : Ok(())
622 48 : }
623 48 : .instrument({
624 48 : let span = tracing::info_span!(parent: None, "attach", tenant_id=%tenant_id);
625 48 : span.follows_from(Span::current());
626 48 : span
627 48 : }),
628 48 : );
629 48 : Ok(tenant)
630 48 : }
631 :
632 : ///
633 : /// Background task that downloads all data for a tenant and brings it to Active state.
634 : ///
635 : /// No background tasks are started as part of this routine.
636 : ///
637 48 : async fn attach(
638 48 : self: &Arc<Tenant>,
639 48 : ctx: &RequestContext,
640 48 : expect_marker: AttachMarkerMode,
641 48 : ) -> anyhow::Result<()> {
642 48 : span::debug_assert_current_span_has_tenant_id();
643 48 :
644 48 : let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
645 48 : if let AttachMarkerMode::Expect = expect_marker {
646 48 : if !tokio::fs::try_exists(&marker_file)
647 48 : .await
648 48 : .context("check for existence of marker file")?
649 : {
650 UBC 0 : anyhow::bail!(
651 0 : "implementation error: marker file should exist at beginning of this function"
652 0 : );
653 CBC 48 : }
654 UBC 0 : }
655 :
656 : // Get list of remote timelines
657 : // download index files for every tenant timeline
658 CBC 48 : info!("listing remote timelines");
659 :
660 48 : let remote_storage = self
661 48 : .remote_storage
662 48 : .as_ref()
663 48 : .ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?;
664 :
665 42 : let remote_timeline_ids =
666 137 : remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?;
667 :
668 42 : info!("found {} timelines", remote_timeline_ids.len());
669 :
670 : // Download & parse index parts
671 42 : let mut part_downloads = JoinSet::new();
672 100 : for timeline_id in remote_timeline_ids {
673 58 : let client = RemoteTimelineClient::new(
674 58 : remote_storage.clone(),
675 58 : self.deletion_queue_client.clone(),
676 58 : self.conf,
677 58 : self.tenant_id,
678 58 : timeline_id,
679 58 : self.generation,
680 58 : );
681 58 : part_downloads.spawn(
682 58 : async move {
683 58 : debug!("starting index part download");
684 :
685 58 : let index_part = client
686 58 : .download_index_file()
687 234 : .await
688 58 : .context("download index file")?;
689 :
690 58 : debug!("finished index part download");
691 :
692 58 : Result::<_, anyhow::Error>::Ok((timeline_id, client, index_part))
693 58 : }
694 58 : .map(move |res| {
695 58 : res.with_context(|| format!("download index part for timeline {timeline_id}"))
696 58 : })
697 58 : .instrument(info_span!("download_index_part", %timeline_id)),
698 : );
699 : }
700 :
701 42 : let mut timelines_to_resume_deletions = vec![];
702 42 :
703 42 : // Wait for all the download tasks to complete & collect results.
704 42 : let mut remote_index_and_client = HashMap::new();
705 42 : let mut timeline_ancestors = HashMap::new();
706 100 : while let Some(result) = part_downloads.join_next().await {
707 : // NB: we already added timeline_id as context to the error
708 58 : let result: Result<_, anyhow::Error> = result.context("joinset task join")?;
709 58 : let (timeline_id, client, index_part) = result?;
710 UBC 0 : debug!("successfully downloaded index part for timeline {timeline_id}");
711 CBC 58 : match index_part {
712 52 : MaybeDeletedIndexPart::IndexPart(index_part) => {
713 52 : timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
714 52 : remote_index_and_client.insert(timeline_id, (index_part, client));
715 52 : }
716 6 : MaybeDeletedIndexPart::Deleted(index_part) => {
717 6 : info!(
718 6 : "timeline {} is deleted, picking to resume deletion",
719 6 : timeline_id
720 6 : );
721 6 : timelines_to_resume_deletions.push((timeline_id, index_part, client));
722 : }
723 : }
724 : }
725 :
726 : // For every timeline, download the metadata file, scan the local directory,
727 : // and build a layer map that contains an entry for each remote and local
728 : // layer file.
729 52 : let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
730 94 : for (timeline_id, remote_metadata) in sorted_timelines {
731 52 : let (index_part, remote_client) = remote_index_and_client
732 52 : .remove(&timeline_id)
733 52 : .expect("just put it in above");
734 52 :
735 52 : // TODO again handle early failure
736 52 : self.load_remote_timeline(
737 52 : timeline_id,
738 52 : index_part,
739 52 : remote_metadata,
740 52 : TimelineResources {
741 52 : remote_client: Some(remote_client),
742 52 : deletion_queue_client: self.deletion_queue_client.clone(),
743 52 : },
744 52 : ctx,
745 52 : )
746 104 : .await
747 52 : .with_context(|| {
748 UBC 0 : format!(
749 0 : "failed to load remote timeline {} for tenant {}",
750 0 : timeline_id, self.tenant_id
751 0 : )
752 CBC 52 : })?;
753 : }
754 :
755 : // Walk through deleted timelines, resume deletion
756 48 : for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
757 6 : remote_timeline_client
758 6 : .init_upload_queue_stopped_to_continue_deletion(&index_part)
759 6 : .context("init queue stopped")
760 6 : .map_err(LoadLocalTimelineError::ResumeDeletion)?;
761 :
762 6 : DeleteTimelineFlow::resume_deletion(
763 6 : Arc::clone(self),
764 6 : timeline_id,
765 6 : &index_part.metadata,
766 6 : Some(remote_timeline_client),
767 6 : self.deletion_queue_client.clone(),
768 6 : None,
769 6 : )
770 UBC 0 : .await
771 CBC 6 : .context("resume_deletion")
772 6 : .map_err(LoadLocalTimelineError::ResumeDeletion)?;
773 : }
774 :
775 42 : if let AttachMarkerMode::Expect = expect_marker {
776 42 : std::fs::remove_file(&marker_file)
777 42 : .with_context(|| format!("unlink attach marker file {marker_file}"))?;
778 42 : crashsafe::fsync(marker_file.parent().expect("marker file has parent dir"))
779 42 : .context("fsync tenant directory after unlinking attach marker file")?;
780 UBC 0 : }
781 :
782 CBC 21 : crate::failpoint_support::sleep_millis_async!("attach-before-activate");
783 :
784 42 : info!("Done");
785 :
786 42 : Ok(())
787 48 : }
788 :
789 : /// Get sum of all remote timelines sizes
790 : ///
791 : /// This function relies on the index_part instead of listing the remote storage
792 15 : pub fn remote_size(&self) -> u64 {
793 15 : let mut size = 0;
794 :
795 15 : for timeline in self.list_timelines() {
796 12 : if let Some(remote_client) = &timeline.remote_client {
797 12 : size += remote_client.get_remote_physical_size();
798 12 : }
799 : }
800 :
801 15 : size
802 15 : }
803 :
804 156 : #[instrument(skip_all, fields(timeline_id=%timeline_id))]
805 : async fn load_remote_timeline(
806 : &self,
807 : timeline_id: TimelineId,
808 : index_part: IndexPart,
809 : remote_metadata: TimelineMetadata,
810 : resources: TimelineResources,
811 : ctx: &RequestContext,
812 : ) -> anyhow::Result<()> {
813 : span::debug_assert_current_span_has_tenant_id();
814 :
815 52 : info!("downloading index file for timeline {}", timeline_id);
816 : tokio::fs::create_dir_all(self.conf.timeline_path(&self.tenant_id, &timeline_id))
817 : .await
818 : .context("Failed to create new timeline directory")?;
819 :
820 : let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
821 : let timelines = self.timelines.lock().unwrap();
822 : Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
823 UBC 0 : || {
824 0 : anyhow::anyhow!(
825 0 : "cannot find ancestor timeline {ancestor_id} for timeline {timeline_id}"
826 0 : )
827 0 : },
828 : )?))
829 : } else {
830 : None
831 : };
832 :
833 : // we can load remote timelines during init, but they are assumed to be so rare that
834 : // initialization order is not passed to here.
835 : let init_order = None;
836 :
837 : // timeline loading after attach expects to find metadata file for each metadata
838 : save_metadata(self.conf, &self.tenant_id, &timeline_id, &remote_metadata)
839 : .await
840 : .context("save_metadata")
841 : .map_err(LoadLocalTimelineError::Load)?;
842 :
843 : self.timeline_init_and_sync(
844 : timeline_id,
845 : resources,
846 : Some(index_part),
847 : remote_metadata,
848 : ancestor,
849 : init_order,
850 : ctx,
851 : )
852 : .await
853 : }
854 :
855 : /// Create a placeholder Tenant object for a broken tenant
856 0 : pub fn create_broken_tenant(
857 0 : conf: &'static PageServerConf,
858 0 : tenant_id: TenantId,
859 0 : reason: String,
860 0 : ) -> Arc<Tenant> {
861 0 : let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
862 0 : conf, tenant_id,
863 0 : )));
864 0 : Arc::new(Tenant::new(
865 0 : TenantState::Broken {
866 0 : reason,
867 0 : backtrace: String::new(),
868 0 : },
869 0 : conf,
870 0 : AttachedTenantConf::try_from(LocationConf::default()).unwrap(),
871 0 : wal_redo_manager,
872 0 : tenant_id,
873 0 : None,
874 0 : DeletionQueueClient::broken(),
875 0 : ))
876 0 : }
877 :
878 : /// Load a tenant that's available on local disk
879 : ///
880 : /// This is used at pageserver startup, to rebuild the in-memory
881 : /// structures from on-disk state. This is similar to attaching a tenant,
882 : /// but the index files already exist on local disk, as well as some layer
883 : /// files.
884 : ///
885 : /// If the loading fails for some reason, the Tenant will go into Broken
886 : /// state.
887 CBC 658 : #[instrument(skip_all, fields(tenant_id=%tenant_id))]
888 : pub(crate) fn spawn_load(
889 : conf: &'static PageServerConf,
890 : tenant_id: TenantId,
891 : attached_conf: AttachedTenantConf,
892 : resources: TenantSharedResources,
893 : init_order: Option<InitializationOrder>,
894 : tenants: &'static tokio::sync::RwLock<TenantsMap>,
895 : ctx: &RequestContext,
896 : ) -> Arc<Tenant> {
897 : span::debug_assert_current_span_has_tenant_id();
898 :
899 : let broker_client = resources.broker_client;
900 : let remote_storage = resources.remote_storage;
901 :
902 : let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
903 : conf, tenant_id,
904 : )));
905 : let tenant = Tenant::new(
906 : TenantState::Loading,
907 : conf,
908 : attached_conf,
909 : wal_redo_manager,
910 : tenant_id,
911 : remote_storage.clone(),
912 : resources.deletion_queue_client.clone(),
913 : );
914 : let tenant = Arc::new(tenant);
915 :
916 : // Do all the hard work in a background task
917 : let tenant_clone = Arc::clone(&tenant);
918 :
919 : let ctx = ctx.detached_child(TaskKind::InitialLoad, DownloadBehavior::Warn);
920 : let _ = task_mgr::spawn(
921 : &tokio::runtime::Handle::current(),
922 : TaskKind::InitialLoad,
923 : Some(tenant_id),
924 : None,
925 : "initial tenant load",
926 : false,
927 658 : async move {
928 658 : // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
929 658 : let make_broken = |t: &Tenant, err: anyhow::Error| {
930 5 : error!("load failed, setting tenant state to Broken: {err:?}");
931 5 : t.state.send_modify(|state| {
932 5 : assert!(
933 5 : matches!(*state, TenantState::Loading | TenantState::Stopping { .. }),
934 UBC 0 : "the loading task owns the tenant state until activation is complete"
935 : );
936 CBC 5 : *state = TenantState::broken_from_reason(err.to_string());
937 5 : });
938 5 : };
939 :
940 658 : let mut init_order = init_order;
941 658 :
942 658 : // take the completion because initial tenant loading will complete when all of
943 658 : // these tasks complete.
944 658 : let _completion = init_order
945 658 : .as_mut()
946 658 : .and_then(|x| x.initial_tenant_load.take());
947 658 : let remote_load_completion = init_order
948 658 : .as_mut()
949 658 : .and_then(|x| x.initial_tenant_load_remote.take());
950 :
951 : // Dont block pageserver startup on figuring out deletion status
952 658 : let pending_deletion = {
953 658 : match DeleteTenantFlow::should_resume_deletion(
954 658 : conf,
955 658 : remote_storage.as_ref(),
956 658 : &tenant_clone,
957 658 : )
958 1572 : .await
959 : {
960 658 : Ok(should_resume_deletion) => should_resume_deletion,
961 UBC 0 : Err(err) => {
962 0 : make_broken(&tenant_clone, anyhow::anyhow!(err));
963 0 : return Ok(());
964 : }
965 : }
966 : };
967 :
968 CBC 658 : info!("pending deletion {}", pending_deletion.is_some());
969 :
970 658 : if let Some(deletion) = pending_deletion {
971 : // as we are no longer loading, signal completion by dropping
972 : // the completion while we resume deletion
973 20 : drop(_completion);
974 20 : drop(remote_load_completion);
975 20 : // do not hold to initial_logical_size_attempt as it will prevent loading from proceeding without timeout
976 20 : let _ = init_order
977 20 : .as_mut()
978 20 : .and_then(|x| x.initial_logical_size_attempt.take());
979 20 :
980 20 : match DeleteTenantFlow::resume_from_load(
981 20 : deletion,
982 20 : &tenant_clone,
983 20 : init_order.as_ref(),
984 20 : tenants,
985 20 : &ctx,
986 20 : )
987 1136 : .await
988 : {
989 UBC 0 : Err(err) => {
990 0 : make_broken(&tenant_clone, anyhow::anyhow!(err));
991 0 : return Ok(());
992 : }
993 CBC 20 : Ok(()) => return Ok(()),
994 : }
995 638 : }
996 638 :
997 638 : let background_jobs_can_start =
998 638 : init_order.as_ref().map(|x| &x.background_jobs_can_start);
999 638 :
1000 638 : match tenant_clone
1001 638 : .load(init_order.as_ref(), remote_load_completion, &ctx)
1002 1143 : .await
1003 : {
1004 : Ok(()) => {
1005 633 : debug!("load finished");
1006 :
1007 633 : tenant_clone.activate(broker_client, background_jobs_can_start, &ctx);
1008 : }
1009 5 : Err(err) => make_broken(&tenant_clone, err),
1010 : }
1011 :
1012 638 : Ok(())
1013 658 : }
1014 : .instrument({
1015 : let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id);
1016 : span.follows_from(Span::current());
1017 : span
1018 : }),
1019 : );
1020 :
1021 : tenant
1022 : }
1023 :
1024 698 : fn scan_and_sort_timelines_dir(self: Arc<Tenant>) -> anyhow::Result<TenantDirectoryScan> {
1025 698 : let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
1026 698 : // Note timelines_to_resume_deletion needs to be separate because it can be not sortable
1027 698 : // from the point of `tree_sort_timelines`. I e some parents can be missing because deletion
1028 698 : // completed in non topological order (for example because parent has smaller number of layer files in it)
1029 698 : let mut timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)> = vec![];
1030 698 :
1031 698 : let timelines_dir = self.conf.timelines_path(&self.tenant_id);
1032 :
1033 698 : for entry in timelines_dir
1034 698 : .read_dir_utf8()
1035 698 : .context("list timelines directory for tenant")?
1036 : {
1037 312 : let entry = entry.context("read timeline dir entry")?;
1038 312 : let timeline_dir = entry.path();
1039 312 :
1040 312 : if crate::is_temporary(timeline_dir) {
1041 UBC 0 : info!("Found temporary timeline directory, removing: {timeline_dir}");
1042 0 : if let Err(e) = std::fs::remove_dir_all(timeline_dir) {
1043 0 : error!("Failed to remove temporary directory '{timeline_dir}': {e:?}");
1044 0 : }
1045 CBC 312 : } else if is_uninit_mark(timeline_dir) {
1046 1 : if !timeline_dir.exists() {
1047 UBC 0 : warn!("Timeline dir entry become invalid: {timeline_dir}");
1048 0 : continue;
1049 CBC 1 : }
1050 1 :
1051 1 : let timeline_uninit_mark_file = &timeline_dir;
1052 1 : info!(
1053 1 : "Found an uninit mark file {timeline_uninit_mark_file}, removing the timeline and its uninit mark",
1054 1 : );
1055 1 : let timeline_id =
1056 1 : TimelineId::try_from(timeline_uninit_mark_file.file_stem())
1057 1 : .with_context(|| {
1058 UBC 0 : format!(
1059 0 : "Could not parse timeline id out of the timeline uninit mark name {timeline_uninit_mark_file}",
1060 0 : )
1061 CBC 1 : })?;
1062 1 : let timeline_dir = self.conf.timeline_path(&self.tenant_id, &timeline_id);
1063 UBC 0 : if let Err(e) =
1064 CBC 1 : remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
1065 : {
1066 UBC 0 : error!("Failed to clean up uninit marked timeline: {e:?}");
1067 CBC 1 : }
1068 311 : } else if crate::is_delete_mark(timeline_dir) {
1069 : // If metadata exists, load as usual, continue deletion
1070 23 : let timeline_id = TimelineId::try_from(timeline_dir.file_stem())
1071 23 : .with_context(|| {
1072 UBC 0 : format!(
1073 0 : "Could not parse timeline id out of the timeline uninit mark name {timeline_dir}",
1074 0 : )
1075 CBC 23 : })?;
1076 :
1077 23 : info!("Found deletion mark for timeline {}", timeline_id);
1078 :
1079 23 : match load_metadata(self.conf, &self.tenant_id, &timeline_id) {
1080 17 : Ok(metadata) => {
1081 17 : timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
1082 : }
1083 6 : Err(e) => match &e {
1084 6 : LoadMetadataError::Read(r) => {
1085 6 : if r.kind() != io::ErrorKind::NotFound {
1086 UBC 0 : return Err(anyhow::anyhow!(e)).with_context(|| {
1087 0 : format!("Failed to load metadata for timeline_id {timeline_id}")
1088 0 : });
1089 CBC 6 : }
1090 6 :
1091 6 : // If metadata doesnt exist it means that we've crashed without
1092 6 : // completing cleanup_remaining_timeline_fs_traces in DeleteTimelineFlow.
1093 6 : // So save timeline_id for later call to `DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`.
1094 6 : // We cant do it here because the method is async so we'd need block_on
1095 6 : // and here we're in spawn_blocking. cleanup_remaining_timeline_fs_traces uses fs operations
1096 6 : // so that basically results in a cycle:
1097 6 : // spawn_blocking
1098 6 : // - block_on
1099 6 : // - spawn_blocking
1100 6 : // which can lead to running out of threads in blocing pool.
1101 6 : timelines_to_resume_deletion.push((timeline_id, None));
1102 : }
1103 : _ => {
1104 UBC 0 : return Err(anyhow::anyhow!(e)).with_context(|| {
1105 0 : format!("Failed to load metadata for timeline_id {timeline_id}")
1106 0 : })
1107 : }
1108 : },
1109 : }
1110 : } else {
1111 CBC 288 : if !timeline_dir.exists() {
1112 1 : warn!("Timeline dir entry become invalid: {timeline_dir}");
1113 1 : continue;
1114 287 : }
1115 287 : let timeline_id = TimelineId::try_from(timeline_dir.file_name())
1116 287 : .with_context(|| {
1117 UBC 0 : format!(
1118 0 : "Could not parse timeline id out of the timeline dir name {timeline_dir}",
1119 0 : )
1120 CBC 287 : })?;
1121 287 : let timeline_uninit_mark_file = self
1122 287 : .conf
1123 287 : .timeline_uninit_mark_file_path(self.tenant_id, timeline_id);
1124 287 : if timeline_uninit_mark_file.exists() {
1125 UBC 0 : info!(
1126 0 : %timeline_id,
1127 0 : "Found an uninit mark file, removing the timeline and its uninit mark",
1128 0 : );
1129 0 : if let Err(e) =
1130 0 : remove_timeline_and_uninit_mark(timeline_dir, &timeline_uninit_mark_file)
1131 : {
1132 0 : error!("Failed to clean up uninit marked timeline: {e:?}");
1133 0 : }
1134 0 : continue;
1135 CBC 287 : }
1136 287 :
1137 287 : let timeline_delete_mark_file = self
1138 287 : .conf
1139 287 : .timeline_delete_mark_file_path(self.tenant_id, timeline_id);
1140 287 : if timeline_delete_mark_file.exists() {
1141 : // Cleanup should be done in `is_delete_mark` branch above
1142 19 : continue;
1143 268 : }
1144 268 :
1145 268 : let file_name = entry.file_name();
1146 268 : if let Ok(timeline_id) = file_name.parse::<TimelineId>() {
1147 268 : let metadata = load_metadata(self.conf, &self.tenant_id, &timeline_id)
1148 268 : .context("failed to load metadata")?;
1149 265 : timelines_to_load.insert(timeline_id, metadata);
1150 : } else {
1151 : // A file or directory that doesn't look like a timeline ID
1152 UBC 0 : warn!("unexpected file or directory in timelines directory: {file_name}");
1153 : }
1154 : }
1155 : }
1156 :
1157 : // Sort the array of timeline IDs into tree-order, so that parent comes before
1158 : // all its children.
1159 CBC 692 : tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
1160 692 : TenantDirectoryScan {
1161 692 : sorted_timelines_to_load: sorted_timelines,
1162 692 : timelines_to_resume_deletion,
1163 692 : }
1164 692 : })
1165 698 : }
1166 :
1167 692 : async fn load_timeline_metadata(
1168 692 : self: &Arc<Tenant>,
1169 692 : timeline_ids: HashSet<TimelineId>,
1170 692 : remote_storage: &GenericRemoteStorage,
1171 692 : ) -> anyhow::Result<HashMap<TimelineId, TimelinePreload>> {
1172 692 : let mut part_downloads = JoinSet::new();
1173 980 : for timeline_id in timeline_ids {
1174 288 : let client = RemoteTimelineClient::new(
1175 288 : remote_storage.clone(),
1176 288 : self.deletion_queue_client.clone(),
1177 288 : self.conf,
1178 288 : self.tenant_id,
1179 288 : timeline_id,
1180 288 : self.generation,
1181 288 : );
1182 288 : part_downloads.spawn(
1183 288 : async move {
1184 288 : debug!("starting index part download");
1185 :
1186 1114 : let index_part = client.download_index_file().await;
1187 :
1188 288 : debug!("finished index part download");
1189 :
1190 288 : Result::<_, anyhow::Error>::Ok(TimelinePreload {
1191 288 : client,
1192 288 : timeline_id,
1193 288 : index_part,
1194 288 : })
1195 288 : }
1196 288 : .map(move |res| {
1197 288 : res.with_context(|| format!("download index part for timeline {timeline_id}"))
1198 288 : })
1199 288 : .instrument(info_span!("download_index_part", %timeline_id)),
1200 : );
1201 : }
1202 :
1203 692 : let mut timeline_preloads: HashMap<TimelineId, TimelinePreload> = HashMap::new();
1204 980 : while let Some(result) = part_downloads.join_next().await {
1205 288 : let preload_result = result.context("join preload task")?;
1206 288 : let preload = preload_result?;
1207 288 : timeline_preloads.insert(preload.timeline_id, preload);
1208 : }
1209 :
1210 692 : Ok(timeline_preloads)
1211 692 : }
1212 :
1213 : ///
1214 : /// Background task to load in-memory data structures for this tenant, from
1215 : /// files on disk. Used at pageserver startup.
1216 : ///
1217 : /// No background tasks are started as part of this routine.
1218 698 : async fn load(
1219 698 : self: &Arc<Tenant>,
1220 698 : init_order: Option<&InitializationOrder>,
1221 698 : remote_completion: Option<Completion>,
1222 698 : ctx: &RequestContext,
1223 698 : ) -> anyhow::Result<()> {
1224 698 : span::debug_assert_current_span_has_tenant_id();
1225 :
1226 UBC 0 : debug!("loading tenant task");
1227 :
1228 : // Load in-memory state to reflect the local files on disk
1229 : //
1230 : // Scan the directory, peek into the metadata file of each timeline, and
1231 : // collect a list of timelines and their ancestors.
1232 CBC 698 : let span = info_span!("blocking");
1233 698 : let cloned = Arc::clone(self);
1234 :
1235 698 : let scan = tokio::task::spawn_blocking(move || {
1236 698 : let _g = span.entered();
1237 698 : cloned.scan_and_sort_timelines_dir()
1238 698 : })
1239 698 : .await
1240 698 : .context("load spawn_blocking")
1241 698 : .and_then(|res| res)?;
1242 :
1243 : // FIXME original collect_timeline_files contained one more check:
1244 : // 1. "Timeline has no ancestor and no layer files"
1245 :
1246 : // Load remote content for timelines in this tenant
1247 692 : let all_timeline_ids = scan
1248 692 : .sorted_timelines_to_load
1249 692 : .iter()
1250 692 : .map(|i| i.0)
1251 692 : .chain(scan.timelines_to_resume_deletion.iter().map(|i| i.0))
1252 692 : .collect();
1253 692 : let mut preload = if let Some(remote_storage) = &self.remote_storage {
1254 : Some(
1255 692 : self.load_timeline_metadata(all_timeline_ids, remote_storage)
1256 256 : .await?,
1257 : )
1258 : } else {
1259 UBC 0 : None
1260 : };
1261 :
1262 CBC 692 : drop(remote_completion);
1263 :
1264 6 : crate::failpoint_support::sleep_millis_async!("before-loading-tenant");
1265 :
1266 : // Process loadable timelines first
1267 957 : for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
1268 265 : let timeline_preload = preload.as_mut().map(|p| p.remove(&timeline_id).unwrap());
1269 265 : if let Err(e) = self
1270 265 : .load_local_timeline(
1271 265 : timeline_id,
1272 265 : local_metadata,
1273 265 : timeline_preload,
1274 265 : init_order,
1275 265 : ctx,
1276 265 : false,
1277 265 : )
1278 265 : .await
1279 : {
1280 UBC 0 : match e {
1281 0 : LoadLocalTimelineError::Load(source) => {
1282 0 : return Err(anyhow::anyhow!(source)).with_context(|| {
1283 0 : format!("Failed to load local timeline: {timeline_id}")
1284 0 : })
1285 : }
1286 0 : LoadLocalTimelineError::ResumeDeletion(source) => {
1287 : // Make sure resumed deletion wont fail loading for entire tenant.
1288 0 : error!("Failed to resume timeline deletion: {source:#}")
1289 : }
1290 : }
1291 CBC 265 : }
1292 : }
1293 :
1294 : // Resume deletion ones with deleted_mark
1295 715 : for (timeline_id, maybe_local_metadata) in scan.timelines_to_resume_deletion {
1296 23 : match maybe_local_metadata {
1297 : None => {
1298 : // See comment in `scan_and_sort_timelines_dir`.
1299 UBC 0 : if let Err(e) =
1300 CBC 6 : DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id)
1301 30 : .await
1302 : {
1303 UBC 0 : warn!(
1304 0 : "cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
1305 0 : timeline_id, e
1306 0 : );
1307 CBC 6 : }
1308 : }
1309 17 : Some(local_metadata) => {
1310 17 : let timeline_preload =
1311 17 : preload.as_mut().map(|p| p.remove(&timeline_id).unwrap());
1312 17 : if let Err(e) = self
1313 17 : .load_local_timeline(
1314 17 : timeline_id,
1315 17 : local_metadata,
1316 17 : timeline_preload,
1317 17 : init_order,
1318 17 : ctx,
1319 17 : true,
1320 17 : )
1321 10 : .await
1322 : {
1323 UBC 0 : match e {
1324 0 : LoadLocalTimelineError::Load(source) => {
1325 0 : // We tried to load deleted timeline, this is a bug.
1326 0 : return Err(anyhow::anyhow!(source).context(
1327 0 : format!("This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}")
1328 0 : ));
1329 : }
1330 0 : LoadLocalTimelineError::ResumeDeletion(source) => {
1331 : // Make sure resumed deletion wont fail loading for entire tenant.
1332 0 : error!("Failed to resume timeline deletion: {source:#}")
1333 : }
1334 : }
1335 CBC 17 : }
1336 : }
1337 : }
1338 : }
1339 :
1340 UBC 0 : trace!("Done");
1341 :
1342 CBC 692 : Ok(())
1343 698 : }
1344 :
1345 : /// Subroutine of `load_tenant`, to load an individual timeline
1346 : ///
1347 : /// NB: The parent is assumed to be already loaded!
1348 579 : #[instrument(skip(self, local_metadata, init_order, preload, ctx))]
1349 : async fn load_local_timeline(
1350 : self: &Arc<Self>,
1351 : timeline_id: TimelineId,
1352 : local_metadata: TimelineMetadata,
1353 : preload: Option<TimelinePreload>,
1354 : init_order: Option<&InitializationOrder>,
1355 : ctx: &RequestContext,
1356 : found_delete_mark: bool,
1357 : ) -> Result<(), LoadLocalTimelineError> {
1358 : span::debug_assert_current_span_has_tenant_id();
1359 :
1360 : let mut resources = self.build_timeline_resources(timeline_id);
1361 :
1362 : struct RemoteStartupData {
1363 : index_part: IndexPart,
1364 : remote_metadata: TimelineMetadata,
1365 : }
1366 :
1367 : let (remote_startup_data, remote_client) = match preload {
1368 : Some(preload) => {
1369 : let TimelinePreload {
1370 : index_part,
1371 : client: remote_client,
1372 : timeline_id: _timeline_id,
1373 : } = preload;
1374 : match index_part {
1375 : Ok(index_part) => {
1376 : let index_part = match index_part {
1377 : MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
1378 : MaybeDeletedIndexPart::Deleted(index_part) => {
1379 : // TODO: we won't reach here if remote storage gets de-configured after start of the deletion operation.
1380 : // Example:
1381 : // start deletion operation
1382 : // finishes upload of index part
1383 : // pageserver crashes
1384 : // remote storage gets de-configured
1385 : // pageserver starts
1386 : //
1387 : // We don't really anticipate remote storage to be de-configured, so, for now, this is fine.
1388 : // Also, maybe we'll remove that option entirely in the future, see https://github.com/neondatabase/neon/issues/4099.
1389 15 : info!("is_deleted is set on remote, resuming removal of timeline data originally done by timeline deletion handler");
1390 :
1391 : remote_client
1392 : .init_upload_queue_stopped_to_continue_deletion(&index_part)
1393 : .context("init queue stopped")
1394 : .map_err(LoadLocalTimelineError::ResumeDeletion)?;
1395 :
1396 : DeleteTimelineFlow::resume_deletion(
1397 : Arc::clone(self),
1398 : timeline_id,
1399 : &local_metadata,
1400 : Some(remote_client),
1401 : self.deletion_queue_client.clone(),
1402 : init_order,
1403 : )
1404 : .await
1405 : .context("resume deletion")
1406 : .map_err(LoadLocalTimelineError::ResumeDeletion)?;
1407 :
1408 : return Ok(());
1409 : }
1410 : };
1411 :
1412 : let remote_metadata = index_part.metadata.clone();
1413 : (
1414 : Some(RemoteStartupData {
1415 : index_part,
1416 : remote_metadata,
1417 : }),
1418 : Some(remote_client),
1419 : )
1420 : }
1421 : Err(DownloadError::NotFound) => {
1422 5 : info!(found_delete_mark, "no index file was found on the remote, resuming deletion or cleaning unuploaded up");
1423 :
1424 : if found_delete_mark {
1425 : // We could've resumed at a point where remote index was deleted, but metadata file wasnt.
1426 : // Cleanup:
1427 : return DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(
1428 : self,
1429 : timeline_id,
1430 : )
1431 : .await
1432 : .context("cleanup_remaining_timeline_fs_traces")
1433 : .map_err(LoadLocalTimelineError::ResumeDeletion);
1434 : }
1435 :
1436 : // as the remote index_part.json did not exist, this timeline is a
1437 : // not-yet-uploaded one. it should be deleted now, because the branching might
1438 : // not have been valid as it's ancestor may have been restored to earlier state
1439 : // as well. in practice, control plane will keep retrying.
1440 : //
1441 : // first ensure that the un-uploaded timeline looks like it should, as in we
1442 : // are not accidentially deleting a timeline which was ever active:
1443 : // - root timelines have metadata and one possibly partial layer
1444 : // - branched timelines have metadata
1445 : //
1446 : // if the timeline does not look like expected, fail loading of the tenant.
1447 : // cleaning the timeline up manually and reloading the tenant is possible via
1448 : // the above log message.
1449 : let path = self.conf.timeline_path(&self.tenant_id, &timeline_id);
1450 :
1451 : let span = tracing::Span::current();
1452 :
1453 : return tokio::task::spawn_blocking({
1454 3 : move || {
1455 3 : use std::str::FromStr;
1456 3 : use crate::tenant::storage_layer::LayerFileName;
1457 3 :
1458 3 : let _e = span.entered();
1459 3 : let mut metadata = false;
1460 3 : let mut layers = 0;
1461 3 : let mut others = 0;
1462 4 : for dentry in path.read_dir_utf8()? {
1463 4 : let dentry = dentry?;
1464 4 : let file_name = dentry.file_name();
1465 4 :
1466 4 : if file_name == METADATA_FILE_NAME {
1467 3 : metadata = true;
1468 3 : continue;
1469 1 : }
1470 1 :
1471 1 : if LayerFileName::from_str(file_name).is_ok()
1472 : {
1473 1 : layers += 1;
1474 1 : continue;
1475 UBC 0 : }
1476 0 :
1477 0 : others += 1;
1478 : }
1479 :
1480 : // bootstrapped have the one image layer file, or one partial temp
1481 : // file, branched have just the metadata
1482 CBC 3 : if !(metadata && layers + others <= 1) {
1483 UBC 0 : anyhow::bail!("unexpected assumed unuploaded, never been active timeline: found metadata={}, layers={}, others={}", metadata, layers, others);
1484 CBC 3 : }
1485 3 :
1486 3 : let tmp_path =
1487 3 : path.with_file_name(format!("{timeline_id}{}", TEMP_FILE_SUFFIX));
1488 3 : std::fs::rename(path, &tmp_path)?;
1489 3 : std::fs::remove_dir_all(&tmp_path)?;
1490 3 : Ok(())
1491 3 : }
1492 : })
1493 : .await
1494 : .map_err(anyhow::Error::new)
1495 3 : .and_then(|x| x)
1496 : .context("delete assumed unuploaded fresh timeline")
1497 : .map_err(LoadLocalTimelineError::Load);
1498 : }
1499 : Err(e) => return Err(LoadLocalTimelineError::Load(anyhow::Error::new(e))),
1500 : }
1501 : }
1502 : None => {
1503 : if found_delete_mark {
1504 : // There is no remote client, we found local metadata.
1505 : // Continue cleaning up local disk.
1506 : DeleteTimelineFlow::resume_deletion(
1507 : Arc::clone(self),
1508 : timeline_id,
1509 : &local_metadata,
1510 : None,
1511 : self.deletion_queue_client.clone(),
1512 : init_order,
1513 : )
1514 : .await
1515 : .context("resume deletion")
1516 : .map_err(LoadLocalTimelineError::ResumeDeletion)?;
1517 : return Ok(());
1518 : }
1519 :
1520 : (None, resources.remote_client)
1521 : }
1522 : };
1523 : resources.remote_client = remote_client;
1524 :
1525 : let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
1526 : let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
1527 UBC 0 : .with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))
1528 : .map_err(LoadLocalTimelineError::Load)?;
1529 : Some(ancestor_timeline)
1530 : } else {
1531 : None
1532 : };
1533 :
1534 : let (index_part, metadata) = match remote_startup_data {
1535 : Some(RemoteStartupData {
1536 : index_part,
1537 : remote_metadata,
1538 : }) => {
1539 : // always choose the remote metadata to be crash consistent (see RFC 27)
1540 : save_metadata(self.conf, &self.tenant_id, &timeline_id, &remote_metadata)
1541 : .await
1542 : .context("save_metadata")
1543 : .map_err(LoadLocalTimelineError::Load)?;
1544 :
1545 : (Some(index_part), remote_metadata)
1546 : }
1547 : None => (None, local_metadata),
1548 : };
1549 :
1550 : self.timeline_init_and_sync(
1551 : timeline_id,
1552 : resources,
1553 : index_part,
1554 : metadata,
1555 : ancestor,
1556 : init_order,
1557 : ctx,
1558 : )
1559 : .await
1560 : .map_err(LoadLocalTimelineError::Load)
1561 : }
1562 :
1563 CBC 1151 : pub fn tenant_id(&self) -> TenantId {
1564 1151 : self.tenant_id
1565 1151 : }
1566 :
1567 : /// Get Timeline handle for given Neon timeline ID.
1568 : /// This function is idempotent. It doesn't change internal state in any way.
1569 10445 : pub fn get_timeline(
1570 10445 : &self,
1571 10445 : timeline_id: TimelineId,
1572 10445 : active_only: bool,
1573 10445 : ) -> Result<Arc<Timeline>, GetTimelineError> {
1574 10445 : let timelines_accessor = self.timelines.lock().unwrap();
1575 10445 : let timeline = timelines_accessor
1576 10445 : .get(&timeline_id)
1577 10445 : .ok_or(GetTimelineError::NotFound {
1578 10445 : tenant_id: self.tenant_id,
1579 10445 : timeline_id,
1580 10445 : })?;
1581 :
1582 9521 : if active_only && !timeline.is_active() {
1583 1 : Err(GetTimelineError::NotActive {
1584 1 : tenant_id: self.tenant_id,
1585 1 : timeline_id,
1586 1 : state: timeline.current_state(),
1587 1 : })
1588 : } else {
1589 9520 : Ok(Arc::clone(timeline))
1590 : }
1591 10445 : }
1592 :
1593 : /// Lists timelines the tenant contains.
1594 : /// Up to tenant's implementation to omit certain timelines that ar not considered ready for use.
1595 631 : pub fn list_timelines(&self) -> Vec<Arc<Timeline>> {
1596 631 : self.timelines
1597 631 : .lock()
1598 631 : .unwrap()
1599 631 : .values()
1600 631 : .map(Arc::clone)
1601 631 : .collect()
1602 631 : }
1603 :
1604 : /// This is used to create the initial 'main' timeline during bootstrapping,
1605 : /// or when importing a new base backup. The caller is expected to load an
1606 : /// initial image of the datadir to the new timeline after this.
1607 : ///
1608 : /// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
1609 : /// and the timeline will fail to load at a restart.
1610 : ///
1611 : /// That's why we add an uninit mark file, and wrap it together witht the Timeline
1612 : /// in-memory object into UninitializedTimeline.
1613 : /// Once the caller is done setting up the timeline, they should call
1614 : /// `UninitializedTimeline::initialize_with_lock` to remove the uninit mark.
1615 : ///
1616 : /// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
1617 : /// minimum amount of keys required to get a writable timeline.
1618 : /// (Without it, `put` might fail due to `repartition` failing.)
1619 43 : pub async fn create_empty_timeline(
1620 43 : &self,
1621 43 : new_timeline_id: TimelineId,
1622 43 : initdb_lsn: Lsn,
1623 43 : pg_version: u32,
1624 43 : _ctx: &RequestContext,
1625 43 : ) -> anyhow::Result<UninitializedTimeline> {
1626 43 : anyhow::ensure!(
1627 43 : self.is_active(),
1628 UBC 0 : "Cannot create empty timelines on inactive tenant"
1629 : );
1630 :
1631 CBC 42 : let timeline_uninit_mark = {
1632 43 : let timelines = self.timelines.lock().unwrap();
1633 43 : self.create_timeline_uninit_mark(new_timeline_id, &timelines)?
1634 : };
1635 42 : let new_metadata = TimelineMetadata::new(
1636 42 : // Initialize disk_consistent LSN to 0, The caller must import some data to
1637 42 : // make it valid, before calling finish_creation()
1638 42 : Lsn(0),
1639 42 : None,
1640 42 : None,
1641 42 : Lsn(0),
1642 42 : initdb_lsn,
1643 42 : initdb_lsn,
1644 42 : pg_version,
1645 42 : );
1646 42 : self.prepare_new_timeline(
1647 42 : new_timeline_id,
1648 42 : &new_metadata,
1649 42 : timeline_uninit_mark,
1650 42 : initdb_lsn,
1651 42 : None,
1652 42 : )
1653 UBC 0 : .await
1654 CBC 43 : }
1655 :
1656 : /// Helper for unit tests to create an empty timeline.
1657 : ///
1658 : /// The timeline is has state value `Active` but its background loops are not running.
1659 : // This makes the various functions which anyhow::ensure! for Active state work in tests.
1660 : // Our current tests don't need the background loops.
1661 : #[cfg(test)]
1662 35 : pub async fn create_test_timeline(
1663 35 : &self,
1664 35 : new_timeline_id: TimelineId,
1665 35 : initdb_lsn: Lsn,
1666 35 : pg_version: u32,
1667 35 : ctx: &RequestContext,
1668 35 : ) -> anyhow::Result<Arc<Timeline>> {
1669 35 : let uninit_tl = self
1670 35 : .create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
1671 UBC 0 : .await?;
1672 CBC 35 : let tline = uninit_tl.raw_timeline().expect("we just created it");
1673 35 : assert_eq!(tline.get_last_record_lsn(), Lsn(0));
1674 :
1675 : // Setup minimum keys required for the timeline to be usable.
1676 35 : let mut modification = tline.begin_modification(initdb_lsn);
1677 35 : modification
1678 35 : .init_empty_test_timeline()
1679 35 : .context("init_empty_test_timeline")?;
1680 35 : modification
1681 35 : .commit(ctx)
1682 UBC 0 : .await
1683 CBC 35 : .context("commit init_empty_test_timeline modification")?;
1684 :
1685 : // Flush to disk so that uninit_tl's check for valid disk_consistent_lsn passes.
1686 35 : tline.maybe_spawn_flush_loop();
1687 35 : tline.freeze_and_flush().await.context("freeze_and_flush")?;
1688 :
1689 : // Make sure the freeze_and_flush reaches remote storage.
1690 35 : tline
1691 35 : .remote_client
1692 35 : .as_ref()
1693 35 : .unwrap()
1694 35 : .wait_completion()
1695 35 : .await
1696 35 : .unwrap();
1697 :
1698 35 : let tl = uninit_tl.finish_creation()?;
1699 : // The non-test code would call tl.activate() here.
1700 35 : tl.set_state(TimelineState::Active);
1701 35 : Ok(tl)
1702 35 : }
1703 :
1704 : /// Create a new timeline.
1705 : ///
1706 : /// Returns the new timeline ID and reference to its Timeline object.
1707 : ///
1708 : /// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
1709 : /// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
1710 847 : pub async fn create_timeline(
1711 847 : &self,
1712 847 : new_timeline_id: TimelineId,
1713 847 : ancestor_timeline_id: Option<TimelineId>,
1714 847 : mut ancestor_start_lsn: Option<Lsn>,
1715 847 : pg_version: u32,
1716 847 : broker_client: storage_broker::BrokerClientChannel,
1717 847 : ctx: &RequestContext,
1718 847 : ) -> Result<Arc<Timeline>, CreateTimelineError> {
1719 847 : if !self.is_active() {
1720 UBC 0 : return Err(CreateTimelineError::Other(anyhow::anyhow!(
1721 0 : "Cannot create timelines on inactive tenant"
1722 0 : )));
1723 CBC 847 : }
1724 :
1725 847 : if let Ok(existing) = self.get_timeline(new_timeline_id, false) {
1726 UBC 0 : debug!("timeline {new_timeline_id} already exists");
1727 :
1728 CBC 1 : if let Some(remote_client) = existing.remote_client.as_ref() {
1729 : // Wait for uploads to complete, so that when we return Ok, the timeline
1730 : // is known to be durable on remote storage. Just like we do at the end of
1731 : // this function, after we have created the timeline ourselves.
1732 : //
1733 : // We only really care that the initial version of `index_part.json` has
1734 : // been uploaded. That's enough to remember that the timeline
1735 : // exists. However, there is no function to wait specifically for that so
1736 : // we just wait for all in-progress uploads to finish.
1737 1 : remote_client
1738 1 : .wait_completion()
1739 1 : .await
1740 1 : .context("wait for timeline uploads to complete")?;
1741 UBC 0 : }
1742 :
1743 CBC 1 : return Err(CreateTimelineError::AlreadyExists);
1744 846 : }
1745 :
1746 846 : let loaded_timeline = match ancestor_timeline_id {
1747 271 : Some(ancestor_timeline_id) => {
1748 271 : let ancestor_timeline = self
1749 271 : .get_timeline(ancestor_timeline_id, false)
1750 271 : .context("Cannot branch off the timeline that's not present in pageserver")?;
1751 :
1752 : // instead of waiting around, just deny the request because ancestor is not yet
1753 : // ready for other purposes either.
1754 271 : if !ancestor_timeline.is_active() {
1755 18 : return Err(CreateTimelineError::AncestorNotActive);
1756 253 : }
1757 :
1758 253 : if let Some(lsn) = ancestor_start_lsn.as_mut() {
1759 35 : *lsn = lsn.align();
1760 35 :
1761 35 : let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn();
1762 35 : if ancestor_ancestor_lsn > *lsn {
1763 : // can we safely just branch from the ancestor instead?
1764 2 : return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
1765 2 : "invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
1766 2 : lsn,
1767 2 : ancestor_timeline_id,
1768 2 : ancestor_ancestor_lsn,
1769 2 : )));
1770 33 : }
1771 33 :
1772 33 : // Wait for the WAL to arrive and be processed on the parent branch up
1773 33 : // to the requested branch point. The repository code itself doesn't
1774 33 : // require it, but if we start to receive WAL on the new timeline,
1775 33 : // decoding the new WAL might need to look up previous pages, relation
1776 33 : // sizes etc. and that would get confused if the previous page versions
1777 33 : // are not in the repository yet.
1778 33 : ancestor_timeline.wait_lsn(*lsn, ctx).await?;
1779 218 : }
1780 :
1781 251 : self.branch_timeline(&ancestor_timeline, new_timeline_id, ancestor_start_lsn, ctx)
1782 6 : .await?
1783 : }
1784 : None => {
1785 575 : self.bootstrap_timeline(new_timeline_id, pg_version, ctx)
1786 2890234 : .await?
1787 : }
1788 : };
1789 :
1790 818 : if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
1791 : // Wait for the upload of the 'index_part.json` file to finish, so that when we return
1792 : // Ok, the timeline is durable in remote storage.
1793 818 : let kind = ancestor_timeline_id
1794 818 : .map(|_| "branched")
1795 818 : .unwrap_or("bootstrapped");
1796 818 : remote_client.wait_completion().await.with_context(|| {
1797 UBC 0 : format!("wait for {} timeline initial uploads to complete", kind)
1798 CBC 813 : })?;
1799 UBC 0 : }
1800 :
1801 CBC 813 : loaded_timeline.activate(broker_client, None, ctx);
1802 813 :
1803 813 : Ok(loaded_timeline)
1804 842 : }
1805 :
1806 : /// perform one garbage collection iteration, removing old data files from disk.
1807 : /// this function is periodically called by gc task.
1808 : /// also it can be explicitly requested through page server api 'do_gc' command.
1809 : ///
1810 : /// `target_timeline_id` specifies the timeline to GC, or None for all.
1811 : ///
1812 : /// The `horizon` an `pitr` parameters determine how much WAL history needs to be retained.
1813 : /// Also known as the retention period, or the GC cutoff point. `horizon` specifies
1814 : /// the amount of history, as LSN difference from current latest LSN on each timeline.
1815 : /// `pitr` specifies the same as a time difference from the current time. The effective
1816 : /// GC cutoff point is determined conservatively by either `horizon` and `pitr`, whichever
1817 : /// requires more history to be retained.
1818 : //
1819 406 : pub async fn gc_iteration(
1820 406 : &self,
1821 406 : target_timeline_id: Option<TimelineId>,
1822 406 : horizon: u64,
1823 406 : pitr: Duration,
1824 406 : ctx: &RequestContext,
1825 406 : ) -> anyhow::Result<GcResult> {
1826 406 : // there is a global allowed_error for this
1827 406 : anyhow::ensure!(
1828 406 : self.is_active(),
1829 UBC 0 : "Cannot run GC iteration on inactive tenant"
1830 : );
1831 :
1832 : {
1833 CBC 406 : let conf = self.tenant_conf.read().unwrap();
1834 406 :
1835 406 : if !conf.location.may_delete_layers_hint() {
1836 UBC 0 : info!("Skipping GC in location state {:?}", conf.location);
1837 0 : return Ok(GcResult::default());
1838 CBC 406 : }
1839 406 : }
1840 406 :
1841 406 : self.gc_iteration_internal(target_timeline_id, horizon, pitr, ctx)
1842 336932 : .await
1843 401 : }
1844 :
1845 : /// Perform one compaction iteration.
1846 : /// This function is periodically called by compactor task.
1847 : /// Also it can be explicitly requested per timeline through page server
1848 : /// api's 'compact' command.
1849 241 : pub async fn compaction_iteration(
1850 241 : &self,
1851 241 : cancel: &CancellationToken,
1852 241 : ctx: &RequestContext,
1853 241 : ) -> anyhow::Result<()> {
1854 241 : anyhow::ensure!(
1855 241 : self.is_active(),
1856 LBC (1) : "Cannot run compaction iteration on inactive tenant"
1857 : );
1858 :
1859 : {
1860 CBC 241 : let conf = self.tenant_conf.read().unwrap();
1861 241 : if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
1862 UBC 0 : info!("Skipping compaction in location state {:?}", conf.location);
1863 0 : return Ok(());
1864 CBC 241 : }
1865 241 : }
1866 241 :
1867 241 : // Scan through the hashmap and collect a list of all the timelines,
1868 241 : // while holding the lock. Then drop the lock and actually perform the
1869 241 : // compactions. We don't want to block everything else while the
1870 241 : // compaction runs.
1871 241 : let timelines_to_compact = {
1872 241 : let timelines = self.timelines.lock().unwrap();
1873 241 : let timelines_to_compact = timelines
1874 241 : .iter()
1875 419 : .filter_map(|(timeline_id, timeline)| {
1876 419 : if timeline.is_active() {
1877 414 : Some((*timeline_id, timeline.clone()))
1878 : } else {
1879 5 : None
1880 : }
1881 419 : })
1882 241 : .collect::<Vec<_>>();
1883 241 : drop(timelines);
1884 241 : timelines_to_compact
1885 : };
1886 :
1887 641 : for (timeline_id, timeline) in &timelines_to_compact {
1888 408 : timeline
1889 408 : .compact(cancel, ctx)
1890 408 : .instrument(info_span!("compact_timeline", %timeline_id))
1891 1040273 : .await?;
1892 : }
1893 :
1894 233 : Ok(())
1895 233 : }
1896 :
1897 13609 : pub fn current_state(&self) -> TenantState {
1898 13609 : self.state.borrow().clone()
1899 13609 : }
1900 :
1901 1537 : pub fn is_active(&self) -> bool {
1902 1537 : self.current_state() == TenantState::Active
1903 1537 : }
1904 :
1905 : /// Changes tenant status to active, unless shutdown was already requested.
1906 : ///
1907 : /// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
1908 : /// to delay background jobs. Background jobs can be started right away when None is given.
1909 672 : fn activate(
1910 672 : self: &Arc<Self>,
1911 672 : broker_client: BrokerClientChannel,
1912 672 : background_jobs_can_start: Option<&completion::Barrier>,
1913 672 : ctx: &RequestContext,
1914 672 : ) {
1915 672 : span::debug_assert_current_span_has_tenant_id();
1916 672 :
1917 672 : let mut activating = false;
1918 672 : self.state.send_modify(|current_state| {
1919 672 : use pageserver_api::models::ActivatingFrom;
1920 672 : match &*current_state {
1921 : TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => {
1922 UBC 0 : panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
1923 : }
1924 CBC 633 : TenantState::Loading => {
1925 633 : *current_state = TenantState::Activating(ActivatingFrom::Loading);
1926 633 : }
1927 39 : TenantState::Attaching => {
1928 39 : *current_state = TenantState::Activating(ActivatingFrom::Attaching);
1929 39 : }
1930 : }
1931 672 : debug!(tenant_id = %self.tenant_id, "Activating tenant");
1932 672 : activating = true;
1933 672 : // Continue outside the closure. We need to grab timelines.lock()
1934 672 : // and we plan to turn it into a tokio::sync::Mutex in a future patch.
1935 672 : });
1936 672 :
1937 672 : if activating {
1938 672 : let timelines_accessor = self.timelines.lock().unwrap();
1939 672 : let timelines_to_activate = timelines_accessor
1940 672 : .values()
1941 672 : .filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
1942 672 :
1943 672 : // Spawn gc and compaction loops. The loops will shut themselves
1944 672 : // down when they notice that the tenant is inactive.
1945 672 : tasks::start_background_loops(self, background_jobs_can_start);
1946 672 :
1947 672 : let mut activated_timelines = 0;
1948 :
1949 959 : for timeline in timelines_to_activate {
1950 287 : timeline.activate(broker_client.clone(), background_jobs_can_start, ctx);
1951 287 : activated_timelines += 1;
1952 287 : }
1953 :
1954 672 : self.state.send_modify(move |current_state| {
1955 672 : assert!(
1956 672 : matches!(current_state, TenantState::Activating(_)),
1957 UBC 0 : "set_stopping and set_broken wait for us to leave Activating state",
1958 : );
1959 CBC 672 : *current_state = TenantState::Active;
1960 672 :
1961 672 : let elapsed = self.loading_started_at.elapsed();
1962 672 : let total_timelines = timelines_accessor.len();
1963 672 :
1964 672 : // log a lot of stuff, because some tenants sometimes suffer from user-visible
1965 672 : // times to activate. see https://github.com/neondatabase/neon/issues/4025
1966 672 : info!(
1967 672 : since_creation_millis = elapsed.as_millis(),
1968 672 : tenant_id = %self.tenant_id,
1969 672 : activated_timelines,
1970 672 : total_timelines,
1971 672 : post_state = <&'static str>::from(&*current_state),
1972 672 : "activation attempt finished"
1973 672 : );
1974 :
1975 672 : TENANT_ACTIVATION.observe(elapsed.as_secs_f64());
1976 672 : });
1977 UBC 0 : }
1978 CBC 672 : }
1979 :
1980 : /// Shutdown the tenant and join all of the spawned tasks.
1981 : ///
1982 : /// The method caters for all use-cases:
1983 : /// - pageserver shutdown (freeze_and_flush == true)
1984 : /// - detach + ignore (freeze_and_flush == false)
1985 : ///
1986 : /// This will attempt to shutdown even if tenant is broken.
1987 : ///
1988 : /// `shutdown_progress` is a [`completion::Barrier`] for the shutdown initiated by this call.
1989 : /// If the tenant is already shutting down, we return a clone of the first shutdown call's
1990 : /// `Barrier` as an `Err`. This not-first caller can use the returned barrier to join with
1991 : /// the ongoing shutdown.
1992 287 : async fn shutdown(
1993 287 : &self,
1994 287 : shutdown_progress: completion::Barrier,
1995 287 : freeze_and_flush: bool,
1996 287 : ) -> Result<(), completion::Barrier> {
1997 287 : span::debug_assert_current_span_has_tenant_id();
1998 287 : // Set tenant (and its timlines) to Stoppping state.
1999 287 : //
2000 287 : // Since we can only transition into Stopping state after activation is complete,
2001 287 : // run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed.
2002 287 : //
2003 287 : // Transitioning tenants to Stopping state has a couple of non-obvious side effects:
2004 287 : // 1. Lock out any new requests to the tenants.
2005 287 : // 2. Signal cancellation to WAL receivers (we wait on it below).
2006 287 : // 3. Signal cancellation for other tenant background loops.
2007 287 : // 4. ???
2008 287 : //
2009 287 : // The waiting for the cancellation is not done uniformly.
2010 287 : // We certainly wait for WAL receivers to shut down.
2011 287 : // That is necessary so that no new data comes in before the freeze_and_flush.
2012 287 : // But the tenant background loops are joined-on in our caller.
2013 287 : // It's mesed up.
2014 287 : // we just ignore the failure to stop
2015 287 :
2016 287 : match self.set_stopping(shutdown_progress, false, false).await {
2017 226 : Ok(()) => {}
2018 58 : Err(SetStoppingError::Broken) => {
2019 58 : // assume that this is acceptable
2020 58 : }
2021 3 : Err(SetStoppingError::AlreadyStopping(other)) => {
2022 3 : // give caller the option to wait for this this shutdown
2023 3 : return Err(other);
2024 : }
2025 : };
2026 :
2027 284 : let mut js = tokio::task::JoinSet::new();
2028 284 : {
2029 284 : let timelines = self.timelines.lock().unwrap();
2030 431 : timelines.values().for_each(|timeline| {
2031 431 : let timeline = Arc::clone(timeline);
2032 431 : let span = Span::current();
2033 512 : js.spawn(async move { timeline.shutdown(freeze_and_flush).instrument(span).await });
2034 431 : })
2035 : };
2036 715 : while let Some(res) = js.join_next().await {
2037 UBC 0 : match res {
2038 CBC 431 : Ok(()) => {}
2039 UBC 0 : Err(je) if je.is_cancelled() => unreachable!("no cancelling used"),
2040 0 : Err(je) if je.is_panic() => { /* logged already */ }
2041 0 : Err(je) => warn!("unexpected JoinError: {je:?}"),
2042 : }
2043 : }
2044 :
2045 : // shutdown all tenant and timeline tasks: gc, compaction, page service
2046 : // No new tasks will be started for this tenant because it's in `Stopping` state.
2047 : //
2048 : // this will additionally shutdown and await all timeline tasks.
2049 CBC 578 : task_mgr::shutdown_tasks(None, Some(self.tenant_id), None).await;
2050 :
2051 284 : Ok(())
2052 287 : }
2053 :
2054 : /// Change tenant status to Stopping, to mark that it is being shut down.
2055 : ///
2056 : /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
2057 : ///
2058 : /// This function is not cancel-safe!
2059 : ///
2060 : /// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
2061 : /// `allow_transition_from_attaching` is needed for the special case of attaching deleted tenant.
2062 310 : async fn set_stopping(
2063 310 : &self,
2064 310 : progress: completion::Barrier,
2065 310 : allow_transition_from_loading: bool,
2066 310 : allow_transition_from_attaching: bool,
2067 310 : ) -> Result<(), SetStoppingError> {
2068 310 : let mut rx = self.state.subscribe();
2069 310 :
2070 310 : // cannot stop before we're done activating, so wait out until we're done activating
2071 310 : rx.wait_for(|state| match state {
2072 3 : TenantState::Attaching if allow_transition_from_attaching => true,
2073 : TenantState::Activating(_) | TenantState::Attaching => {
2074 6 : info!(
2075 6 : "waiting for {} to turn Active|Broken|Stopping",
2076 6 : <&'static str>::from(state)
2077 6 : );
2078 6 : false
2079 : }
2080 20 : TenantState::Loading => allow_transition_from_loading,
2081 287 : TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
2082 316 : })
2083 6 : .await
2084 310 : .expect("cannot drop self.state while on a &self method");
2085 310 :
2086 310 : // we now know we're done activating, let's see whether this task is the winner to transition into Stopping
2087 310 : let mut err = None;
2088 310 : let stopping = self.state.send_if_modified(|current_state| match current_state {
2089 : TenantState::Activating(_) => {
2090 UBC 0 : unreachable!("1we ensured above that we're done with activation, and, there is no re-activation")
2091 : }
2092 : TenantState::Attaching => {
2093 CBC 3 : if !allow_transition_from_attaching {
2094 UBC 0 : unreachable!("2we ensured above that we're done with activation, and, there is no re-activation")
2095 CBC 3 : };
2096 3 : *current_state = TenantState::Stopping { progress };
2097 3 : true
2098 : }
2099 : TenantState::Loading => {
2100 20 : if !allow_transition_from_loading {
2101 UBC 0 : unreachable!("3we ensured above that we're done with activation, and, there is no re-activation")
2102 CBC 20 : };
2103 20 : *current_state = TenantState::Stopping { progress };
2104 20 : true
2105 : }
2106 : TenantState::Active => {
2107 : // FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
2108 : // are created after the transition to Stopping. That's harmless, as the Timelines
2109 : // won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
2110 226 : *current_state = TenantState::Stopping { progress };
2111 226 : // Continue stopping outside the closure. We need to grab timelines.lock()
2112 226 : // and we plan to turn it into a tokio::sync::Mutex in a future patch.
2113 226 : true
2114 : }
2115 58 : TenantState::Broken { reason, .. } => {
2116 58 : info!(
2117 58 : "Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
2118 58 : );
2119 58 : err = Some(SetStoppingError::Broken);
2120 58 : false
2121 : }
2122 3 : TenantState::Stopping { progress } => {
2123 3 : info!("Tenant is already in Stopping state");
2124 3 : err = Some(SetStoppingError::AlreadyStopping(progress.clone()));
2125 3 : false
2126 : }
2127 310 : });
2128 310 : match (stopping, err) {
2129 249 : (true, None) => {} // continue
2130 61 : (false, Some(err)) => return Err(err),
2131 UBC 0 : (true, Some(_)) => unreachable!(
2132 0 : "send_if_modified closure must error out if not transitioning to Stopping"
2133 0 : ),
2134 0 : (false, None) => unreachable!(
2135 0 : "send_if_modified closure must return true if transitioning to Stopping"
2136 0 : ),
2137 : }
2138 :
2139 CBC 249 : let timelines_accessor = self.timelines.lock().unwrap();
2140 249 : let not_broken_timelines = timelines_accessor
2141 249 : .values()
2142 365 : .filter(|timeline| !timeline.is_broken());
2143 596 : for timeline in not_broken_timelines {
2144 347 : timeline.set_state(TimelineState::Stopping);
2145 347 : }
2146 249 : Ok(())
2147 310 : }
2148 :
2149 : /// Method for tenant::mgr to transition us into Broken state in case of a late failure in
2150 : /// `remove_tenant_from_memory`
2151 : ///
2152 : /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
2153 : ///
2154 : /// In tests, we also use this to set tenants to Broken state on purpose.
2155 53 : pub(crate) async fn set_broken(&self, reason: String) {
2156 53 : let mut rx = self.state.subscribe();
2157 53 :
2158 53 : // The load & attach routines own the tenant state until it has reached `Active`.
2159 53 : // So, wait until it's done.
2160 53 : rx.wait_for(|state| match state {
2161 : TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
2162 UBC 0 : info!(
2163 0 : "waiting for {} to turn Active|Broken|Stopping",
2164 0 : <&'static str>::from(state)
2165 0 : );
2166 0 : false
2167 : }
2168 CBC 53 : TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
2169 53 : })
2170 UBC 0 : .await
2171 CBC 53 : .expect("cannot drop self.state while on a &self method");
2172 53 :
2173 53 : // we now know we're done activating, let's see whether this task is the winner to transition into Broken
2174 53 : self.set_broken_no_wait(reason)
2175 53 : }
2176 :
2177 53 : pub(crate) fn set_broken_no_wait(&self, reason: impl Display) {
2178 53 : let reason = reason.to_string();
2179 53 : self.state.send_modify(|current_state| {
2180 53 : match *current_state {
2181 : TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
2182 UBC 0 : unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
2183 : }
2184 : TenantState::Active => {
2185 CBC 2 : if cfg!(feature = "testing") {
2186 2 : warn!("Changing Active tenant to Broken state, reason: {}", reason);
2187 2 : *current_state = TenantState::broken_from_reason(reason);
2188 : } else {
2189 UBC 0 : unreachable!("not allowed to call set_broken on Active tenants in non-testing builds")
2190 : }
2191 : }
2192 : TenantState::Broken { .. } => {
2193 0 : warn!("Tenant is already in Broken state");
2194 : }
2195 : // This is the only "expected" path, any other path is a bug.
2196 : TenantState::Stopping { .. } => {
2197 CBC 51 : warn!(
2198 51 : "Marking Stopping tenant as Broken state, reason: {}",
2199 51 : reason
2200 51 : );
2201 51 : *current_state = TenantState::broken_from_reason(reason);
2202 : }
2203 : }
2204 53 : });
2205 53 : }
2206 :
2207 41 : pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
2208 41 : self.state.subscribe()
2209 41 : }
2210 :
2211 5548 : pub(crate) async fn wait_to_become_active(&self) -> Result<(), WaitToBecomeActiveError> {
2212 5548 : let mut receiver = self.state.subscribe();
2213 6228 : loop {
2214 6228 : let current_state = receiver.borrow_and_update().clone();
2215 6228 : match current_state {
2216 : TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
2217 : // in these states, there's a chance that we can reach ::Active
2218 700 : receiver.changed().await.map_err(
2219 680 : |_e: tokio::sync::watch::error::RecvError| {
2220 UBC 0 : WaitToBecomeActiveError::TenantDropped {
2221 0 : tenant_id: self.tenant_id,
2222 0 : }
2223 CBC 680 : },
2224 680 : )?;
2225 : }
2226 : TenantState::Active { .. } => {
2227 5517 : return Ok(());
2228 : }
2229 : TenantState::Broken { .. } | TenantState::Stopping { .. } => {
2230 : // There's no chance the tenant can transition back into ::Active
2231 31 : return Err(WaitToBecomeActiveError::WillNotBecomeActive {
2232 31 : tenant_id: self.tenant_id,
2233 31 : state: current_state,
2234 31 : });
2235 : }
2236 : }
2237 : }
2238 5548 : }
2239 :
2240 UBC 0 : pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
2241 0 : self.tenant_conf
2242 0 : .read()
2243 0 : .unwrap()
2244 0 : .location
2245 0 : .attach_mode
2246 0 : .clone()
2247 0 : }
2248 : }
2249 :
2250 : /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
2251 : /// perform a topological sort, so that the parent of each timeline comes
2252 : /// before the children.
2253 : /// E extracts the ancestor from T
2254 : /// This allows for T to be different. It can be TimelineMetadata, can be Timeline itself, etc.
2255 CBC 829 : fn tree_sort_timelines<T, E>(
2256 829 : timelines: HashMap<TimelineId, T>,
2257 829 : extractor: E,
2258 829 : ) -> anyhow::Result<Vec<(TimelineId, T)>>
2259 829 : where
2260 829 : E: Fn(&T) -> Option<TimelineId>,
2261 829 : {
2262 829 : let mut result = Vec::with_capacity(timelines.len());
2263 829 :
2264 829 : let mut now = Vec::with_capacity(timelines.len());
2265 829 : // (ancestor, children)
2266 829 : let mut later: HashMap<TimelineId, Vec<(TimelineId, T)>> =
2267 829 : HashMap::with_capacity(timelines.len());
2268 :
2269 1307 : for (timeline_id, value) in timelines {
2270 478 : if let Some(ancestor_id) = extractor(&value) {
2271 55 : let children = later.entry(ancestor_id).or_default();
2272 55 : children.push((timeline_id, value));
2273 423 : } else {
2274 423 : now.push((timeline_id, value));
2275 423 : }
2276 : }
2277 :
2278 1307 : while let Some((timeline_id, metadata)) = now.pop() {
2279 478 : result.push((timeline_id, metadata));
2280 : // All children of this can be loaded now
2281 478 : if let Some(mut children) = later.remove(&timeline_id) {
2282 44 : now.append(&mut children);
2283 434 : }
2284 : }
2285 :
2286 : // All timelines should be visited now. Unless there were timelines with missing ancestors.
2287 829 : if !later.is_empty() {
2288 UBC 0 : for (missing_id, orphan_ids) in later {
2289 0 : for (orphan_id, _) in orphan_ids {
2290 0 : error!("could not load timeline {orphan_id} because its ancestor timeline {missing_id} could not be loaded");
2291 : }
2292 : }
2293 0 : bail!("could not load tenant because some timelines are missing ancestors");
2294 CBC 829 : }
2295 829 :
2296 829 : Ok(result)
2297 829 : }
2298 :
2299 : impl Tenant {
2300 80 : pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
2301 80 : self.tenant_conf.read().unwrap().tenant_conf
2302 80 : }
2303 :
2304 40 : pub fn effective_config(&self) -> TenantConf {
2305 40 : self.tenant_specific_overrides()
2306 40 : .merge(self.conf.default_tenant_conf)
2307 40 : }
2308 :
2309 5 : pub fn get_checkpoint_distance(&self) -> u64 {
2310 5 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2311 5 : tenant_conf
2312 5 : .checkpoint_distance
2313 5 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
2314 5 : }
2315 :
2316 5 : pub fn get_checkpoint_timeout(&self) -> Duration {
2317 5 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2318 5 : tenant_conf
2319 5 : .checkpoint_timeout
2320 5 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
2321 5 : }
2322 :
2323 5 : pub fn get_compaction_target_size(&self) -> u64 {
2324 5 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2325 5 : tenant_conf
2326 5 : .compaction_target_size
2327 5 : .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
2328 5 : }
2329 :
2330 802 : pub fn get_compaction_period(&self) -> Duration {
2331 802 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2332 802 : tenant_conf
2333 802 : .compaction_period
2334 802 : .unwrap_or(self.conf.default_tenant_conf.compaction_period)
2335 802 : }
2336 :
2337 5 : pub fn get_compaction_threshold(&self) -> usize {
2338 5 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2339 5 : tenant_conf
2340 5 : .compaction_threshold
2341 5 : .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
2342 5 : }
2343 :
2344 372 : pub fn get_gc_horizon(&self) -> u64 {
2345 372 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2346 372 : tenant_conf
2347 372 : .gc_horizon
2348 372 : .unwrap_or(self.conf.default_tenant_conf.gc_horizon)
2349 372 : }
2350 :
2351 738 : pub fn get_gc_period(&self) -> Duration {
2352 738 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2353 738 : tenant_conf
2354 738 : .gc_period
2355 738 : .unwrap_or(self.conf.default_tenant_conf.gc_period)
2356 738 : }
2357 :
2358 5 : pub fn get_image_creation_threshold(&self) -> usize {
2359 5 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2360 5 : tenant_conf
2361 5 : .image_creation_threshold
2362 5 : .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
2363 5 : }
2364 :
2365 485 : pub fn get_pitr_interval(&self) -> Duration {
2366 485 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2367 485 : tenant_conf
2368 485 : .pitr_interval
2369 485 : .unwrap_or(self.conf.default_tenant_conf.pitr_interval)
2370 485 : }
2371 :
2372 4401 : pub fn get_trace_read_requests(&self) -> bool {
2373 4401 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2374 4401 : tenant_conf
2375 4401 : .trace_read_requests
2376 4401 : .unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
2377 4401 : }
2378 :
2379 13 : pub fn get_min_resident_size_override(&self) -> Option<u64> {
2380 13 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2381 13 : tenant_conf
2382 13 : .min_resident_size_override
2383 13 : .or(self.conf.default_tenant_conf.min_resident_size_override)
2384 13 : }
2385 :
2386 27 : pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
2387 27 : self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
2388 27 : // Don't hold self.timelines.lock() during the notifies.
2389 27 : // There's no risk of deadlock right now, but there could be if we consolidate
2390 27 : // mutexes in struct Timeline in the future.
2391 27 : let timelines = self.list_timelines();
2392 56 : for timeline in timelines {
2393 29 : timeline.tenant_conf_updated();
2394 29 : }
2395 27 : }
2396 :
2397 UBC 0 : pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) {
2398 0 : *self.tenant_conf.write().unwrap() = new_conf;
2399 0 : // Don't hold self.timelines.lock() during the notifies.
2400 0 : // There's no risk of deadlock right now, but there could be if we consolidate
2401 0 : // mutexes in struct Timeline in the future.
2402 0 : let timelines = self.list_timelines();
2403 0 : for timeline in timelines {
2404 0 : timeline.tenant_conf_updated();
2405 0 : }
2406 0 : }
2407 :
2408 : /// Helper function to create a new Timeline struct.
2409 : ///
2410 : /// The returned Timeline is in Loading state. The caller is responsible for
2411 : /// initializing any on-disk state, and for inserting the Timeline to the 'timelines'
2412 : /// map.
2413 : ///
2414 : /// `validate_ancestor == false` is used when a timeline is created for deletion
2415 : /// and we might not have the ancestor present anymore which is fine for to be
2416 : /// deleted timelines.
2417 CBC 1302 : fn create_timeline_struct(
2418 1302 : &self,
2419 1302 : new_timeline_id: TimelineId,
2420 1302 : new_metadata: &TimelineMetadata,
2421 1302 : ancestor: Option<Arc<Timeline>>,
2422 1302 : resources: TimelineResources,
2423 1302 : init_order: Option<&InitializationOrder>,
2424 1302 : cause: CreateTimelineCause,
2425 1302 : ) -> anyhow::Result<Arc<Timeline>> {
2426 1302 : let state = match cause {
2427 : CreateTimelineCause::Load => {
2428 1281 : let ancestor_id = new_metadata.ancestor_timeline();
2429 1281 : anyhow::ensure!(
2430 1281 : ancestor_id == ancestor.as_ref().map(|t| t.timeline_id),
2431 UBC 0 : "Timeline's {new_timeline_id} ancestor {ancestor_id:?} was not found"
2432 : );
2433 CBC 1281 : TimelineState::Loading
2434 : }
2435 21 : CreateTimelineCause::Delete => TimelineState::Stopping,
2436 : };
2437 :
2438 1302 : let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start);
2439 1302 : let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt);
2440 1302 :
2441 1302 : let pg_version = new_metadata.pg_version();
2442 1302 :
2443 1302 : let timeline = Timeline::new(
2444 1302 : self.conf,
2445 1302 : Arc::clone(&self.tenant_conf),
2446 1302 : new_metadata,
2447 1302 : ancestor,
2448 1302 : new_timeline_id,
2449 1302 : self.tenant_id,
2450 1302 : self.generation,
2451 1302 : Arc::clone(&self.walredo_mgr),
2452 1302 : resources,
2453 1302 : pg_version,
2454 1302 : initial_logical_size_can_start.cloned(),
2455 1302 : initial_logical_size_attempt.cloned().flatten(),
2456 1302 : state,
2457 1302 : );
2458 1302 :
2459 1302 : Ok(timeline)
2460 1302 : }
2461 :
2462 : // Allow too_many_arguments because a constructor's argument list naturally grows with the
2463 : // number of attributes in the struct: breaking these out into a builder wouldn't be helpful.
2464 : #[allow(clippy::too_many_arguments)]
2465 748 : fn new(
2466 748 : state: TenantState,
2467 748 : conf: &'static PageServerConf,
2468 748 : attached_conf: AttachedTenantConf,
2469 748 : walredo_mgr: Arc<WalRedoManager>,
2470 748 : tenant_id: TenantId,
2471 748 : remote_storage: Option<GenericRemoteStorage>,
2472 748 : deletion_queue_client: DeletionQueueClient,
2473 748 : ) -> Tenant {
2474 748 : let (state, mut rx) = watch::channel(state);
2475 748 :
2476 748 : tokio::spawn(async move {
2477 748 : let tid = tenant_id.to_string();
2478 748 :
2479 2149 : fn inspect_state(state: &TenantState) -> ([&'static str; 1], bool) {
2480 2149 : ([state.into()], matches!(state, TenantState::Broken { .. }))
2481 2149 : }
2482 748 :
2483 748 : let mut tuple = inspect_state(&rx.borrow_and_update());
2484 748 :
2485 748 : let is_broken = tuple.1;
2486 748 : let mut counted_broken = if !is_broken {
2487 : // the tenant might be ignored and reloaded, so first remove any previous set
2488 : // element. it most likely has already been scraped, as these are manual operations
2489 : // right now. most likely we will add it back very soon.
2490 748 : drop(crate::metrics::BROKEN_TENANTS_SET.remove_label_values(&[&tid]));
2491 748 : false
2492 : } else {
2493 : // add the id to the set right away, there should not be any updates on the channel
2494 : // after
2495 UBC 0 : crate::metrics::BROKEN_TENANTS_SET
2496 0 : .with_label_values(&[&tid])
2497 0 : .set(1);
2498 0 : true
2499 : };
2500 :
2501 CBC 2149 : loop {
2502 2149 : let labels = &tuple.0;
2503 2149 : let current = TENANT_STATE_METRIC.with_label_values(labels);
2504 2149 : current.inc();
2505 2149 :
2506 2149 : if rx.changed().await.is_err() {
2507 : // tenant has been dropped; decrement the counter because a tenant with that
2508 : // state is no longer in tenant map, but allow any broken set item to exist
2509 : // still.
2510 101 : current.dec();
2511 101 : break;
2512 1401 : }
2513 1401 :
2514 1401 : current.dec();
2515 1401 : tuple = inspect_state(&rx.borrow_and_update());
2516 1401 :
2517 1401 : let is_broken = tuple.1;
2518 1401 : if is_broken && !counted_broken {
2519 64 : counted_broken = true;
2520 64 : // insert the tenant_id (back) into the set
2521 64 : crate::metrics::BROKEN_TENANTS_SET
2522 64 : .with_label_values(&[&tid])
2523 64 : .inc();
2524 1337 : }
2525 : }
2526 748 : });
2527 748 :
2528 748 : Tenant {
2529 748 : tenant_id,
2530 748 : generation: attached_conf.location.generation,
2531 748 : conf,
2532 748 : // using now here is good enough approximation to catch tenants with really long
2533 748 : // activation times.
2534 748 : loading_started_at: Instant::now(),
2535 748 : tenant_conf: Arc::new(RwLock::new(attached_conf)),
2536 748 : timelines: Mutex::new(HashMap::new()),
2537 748 : gc_cs: tokio::sync::Mutex::new(()),
2538 748 : walredo_mgr,
2539 748 : remote_storage,
2540 748 : deletion_queue_client,
2541 748 : state,
2542 748 : cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
2543 748 : cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
2544 748 : eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
2545 748 : delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
2546 748 : }
2547 748 : }
2548 :
2549 : /// Locate and load config
2550 217 : pub(super) fn load_tenant_config(
2551 217 : conf: &'static PageServerConf,
2552 217 : tenant_id: &TenantId,
2553 217 : ) -> anyhow::Result<LocationConf> {
2554 217 : let legacy_config_path = conf.tenant_config_path(tenant_id);
2555 217 : let config_path = conf.tenant_location_config_path(tenant_id);
2556 217 :
2557 217 : if config_path.exists() {
2558 : // New-style config takes precedence
2559 14 : let deserialized = Self::read_config(&config_path)?;
2560 14 : Ok(toml_edit::de::from_document::<LocationConf>(deserialized)?)
2561 203 : } else if legacy_config_path.exists() {
2562 : // Upgrade path: found an old-style configuration only
2563 199 : let deserialized = Self::read_config(&legacy_config_path)?;
2564 :
2565 199 : let mut tenant_conf = TenantConfOpt::default();
2566 199 : for (key, item) in deserialized.iter() {
2567 199 : match key {
2568 199 : "tenant_config" => {
2569 199 : tenant_conf = PageServerConf::parse_toml_tenant_conf(item).with_context(|| {
2570 UBC 0 : format!("Failed to parse config from file '{legacy_config_path}' as pageserver config")
2571 CBC 199 : })?;
2572 : }
2573 UBC 0 : _ => bail!(
2574 0 : "config file {legacy_config_path} has unrecognized pageserver option '{key}'"
2575 0 : ),
2576 : }
2577 : }
2578 :
2579 : // Legacy configs are implicitly in attached state
2580 CBC 199 : Ok(LocationConf::attached_single(
2581 199 : tenant_conf,
2582 199 : Generation::none(),
2583 199 : ))
2584 : } else {
2585 : // FIXME If the config file is not found, assume that we're attaching
2586 : // a detached tenant and config is passed via attach command.
2587 : // https://github.com/neondatabase/neon/issues/1555
2588 : // OR: we're loading after incomplete deletion that managed to remove config.
2589 4 : info!(
2590 4 : "tenant config not found in {} or {}",
2591 4 : config_path, legacy_config_path
2592 4 : );
2593 4 : Ok(LocationConf::default())
2594 : }
2595 217 : }
2596 :
2597 213 : fn read_config(path: &Utf8Path) -> anyhow::Result<toml_edit::Document> {
2598 213 : info!("loading tenant configuration from {path}");
2599 :
2600 : // load and parse file
2601 213 : let config = fs::read_to_string(path)
2602 213 : .with_context(|| format!("Failed to load config from path '{path}'"))?;
2603 :
2604 213 : config
2605 213 : .parse::<toml_edit::Document>()
2606 213 : .with_context(|| format!("Failed to parse config from file '{path}' as toml file"))
2607 213 : }
2608 :
2609 968 : #[tracing::instrument(skip_all, fields(%tenant_id))]
2610 : pub(super) async fn persist_tenant_config(
2611 : conf: &'static PageServerConf,
2612 : tenant_id: &TenantId,
2613 : location_conf: &LocationConf,
2614 : ) -> anyhow::Result<()> {
2615 : let legacy_config_path = conf.tenant_config_path(tenant_id);
2616 : let config_path = conf.tenant_location_config_path(tenant_id);
2617 : Self::persist_tenant_config_at(tenant_id, &config_path, &legacy_config_path, location_conf)
2618 : .await
2619 : }
2620 :
2621 2229 : #[tracing::instrument(skip_all, fields(%tenant_id))]
2622 : pub(super) async fn persist_tenant_config_at(
2623 : tenant_id: &TenantId,
2624 : config_path: &Utf8Path,
2625 : legacy_config_path: &Utf8Path,
2626 : location_conf: &LocationConf,
2627 : ) -> anyhow::Result<()> {
2628 : // Forward compat: write out an old-style configuration that old versions can read, in case we roll back
2629 : Self::persist_tenant_config_legacy(
2630 : tenant_id,
2631 : legacy_config_path,
2632 : &location_conf.tenant_conf,
2633 : )
2634 : .await?;
2635 :
2636 : if let LocationMode::Attached(attach_conf) = &location_conf.mode {
2637 : // Once we use LocationMode, generations are mandatory. If we aren't using generations,
2638 : // then drop out after writing legacy-style config.
2639 : if attach_conf.generation.is_none() {
2640 UBC 0 : tracing::debug!("Running without generations, not writing new-style LocationConf");
2641 : return Ok(());
2642 : }
2643 : }
2644 :
2645 CBC 27 : info!("persisting tenantconf to {config_path}");
2646 :
2647 : let mut conf_content = r#"# This file contains a specific per-tenant's config.
2648 : # It is read in case of pageserver restart.
2649 : "#
2650 : .to_string();
2651 :
2652 : // Convert the config to a toml file.
2653 : conf_content += &toml_edit::ser::to_string_pretty(&location_conf)?;
2654 :
2655 : let conf_content = conf_content.as_bytes();
2656 :
2657 : let temp_path = path_with_suffix_extension(config_path, TEMP_FILE_SUFFIX);
2658 : VirtualFile::crashsafe_overwrite(config_path, &temp_path, conf_content)
2659 : .await
2660 UBC 0 : .with_context(|| format!("write tenant {tenant_id} config to {config_path}"))?;
2661 : Ok(())
2662 : }
2663 :
2664 CBC 2202 : #[tracing::instrument(skip_all, fields(%tenant_id))]
2665 : async fn persist_tenant_config_legacy(
2666 : tenant_id: &TenantId,
2667 : target_config_path: &Utf8Path,
2668 : tenant_conf: &TenantConfOpt,
2669 : ) -> anyhow::Result<()> {
2670 734 : info!("persisting tenantconf to {target_config_path}");
2671 :
2672 : let mut conf_content = r#"# This file contains a specific per-tenant's config.
2673 : # It is read in case of pageserver restart.
2674 :
2675 : [tenant_config]
2676 : "#
2677 : .to_string();
2678 :
2679 : // Convert the config to a toml file.
2680 : conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
2681 :
2682 : let conf_content = conf_content.as_bytes();
2683 :
2684 : let temp_path = path_with_suffix_extension(target_config_path, TEMP_FILE_SUFFIX);
2685 : VirtualFile::crashsafe_overwrite(target_config_path, &temp_path, conf_content)
2686 : .await
2687 UBC 0 : .with_context(|| format!("write tenant {tenant_id} config to {target_config_path}"))?;
2688 : Ok(())
2689 : }
2690 :
2691 : //
2692 : // How garbage collection works:
2693 : //
2694 : // +--bar------------->
2695 : // /
2696 : // +----+-----foo---------------->
2697 : // /
2698 : // ----main--+-------------------------->
2699 : // \
2700 : // +-----baz-------->
2701 : //
2702 : //
2703 : // 1. Grab 'gc_cs' mutex to prevent new timelines from being created while Timeline's
2704 : // `gc_infos` are being refreshed
2705 : // 2. Scan collected timelines, and on each timeline, make note of the
2706 : // all the points where other timelines have been branched off.
2707 : // We will refrain from removing page versions at those LSNs.
2708 : // 3. For each timeline, scan all layer files on the timeline.
2709 : // Remove all files for which a newer file exists and which
2710 : // don't cover any branch point LSNs.
2711 : //
2712 : // TODO:
2713 : // - if a relation has a non-incremental persistent layer on a child branch, then we
2714 : // don't need to keep that in the parent anymore. But currently
2715 : // we do.
2716 CBC 406 : async fn gc_iteration_internal(
2717 406 : &self,
2718 406 : target_timeline_id: Option<TimelineId>,
2719 406 : horizon: u64,
2720 406 : pitr: Duration,
2721 406 : ctx: &RequestContext,
2722 406 : ) -> anyhow::Result<GcResult> {
2723 406 : let mut totals: GcResult = Default::default();
2724 406 : let now = Instant::now();
2725 :
2726 406 : let gc_timelines = self
2727 406 : .refresh_gc_info_internal(target_timeline_id, horizon, pitr, ctx)
2728 336850 : .await?;
2729 :
2730 3 : crate::failpoint_support::sleep_millis_async!(
2731 3 : "gc_iteration_internal_after_getting_gc_timelines"
2732 3 : );
2733 :
2734 : // If there is nothing to GC, we don't want any messages in the INFO log.
2735 405 : if !gc_timelines.is_empty() {
2736 402 : info!("{} timelines need GC", gc_timelines.len());
2737 : } else {
2738 UBC 0 : debug!("{} timelines need GC", gc_timelines.len());
2739 : }
2740 :
2741 : // Perform GC for each timeline.
2742 : //
2743 : // Note that we don't hold the GC lock here because we don't want
2744 : // to delay the branch creation task, which requires the GC lock.
2745 : // A timeline GC iteration can be slow because it may need to wait for
2746 : // compaction (both require `layer_removal_cs` lock),
2747 : // but the GC iteration can run concurrently with branch creation.
2748 : //
2749 : // See comments in [`Tenant::branch_timeline`] for more information
2750 : // about why branch creation task can run concurrently with timeline's GC iteration.
2751 CBC 945 : for timeline in gc_timelines {
2752 546 : if task_mgr::is_shutdown_requested() {
2753 : // We were requested to shut down. Stop and return with the progress we
2754 : // made.
2755 1 : break;
2756 545 : }
2757 545 : let result = timeline.gc().await?;
2758 540 : totals += result;
2759 : }
2760 :
2761 400 : totals.elapsed = now.elapsed();
2762 400 : Ok(totals)
2763 401 : }
2764 :
2765 : /// Refreshes the Timeline::gc_info for all timelines, returning the
2766 : /// vector of timelines which have [`Timeline::get_last_record_lsn`] past
2767 : /// [`Tenant::get_gc_horizon`].
2768 : ///
2769 : /// This is usually executed as part of periodic gc, but can now be triggered more often.
2770 78 : pub async fn refresh_gc_info(
2771 78 : &self,
2772 78 : ctx: &RequestContext,
2773 78 : ) -> anyhow::Result<Vec<Arc<Timeline>>> {
2774 78 : // since this method can now be called at different rates than the configured gc loop, it
2775 78 : // might be that these configuration values get applied faster than what it was previously,
2776 78 : // since these were only read from the gc task.
2777 78 : let horizon = self.get_gc_horizon();
2778 78 : let pitr = self.get_pitr_interval();
2779 78 :
2780 78 : // refresh all timelines
2781 78 : let target_timeline_id = None;
2782 78 :
2783 78 : self.refresh_gc_info_internal(target_timeline_id, horizon, pitr, ctx)
2784 20 : .await
2785 78 : }
2786 :
2787 484 : async fn refresh_gc_info_internal(
2788 484 : &self,
2789 484 : target_timeline_id: Option<TimelineId>,
2790 484 : horizon: u64,
2791 484 : pitr: Duration,
2792 484 : ctx: &RequestContext,
2793 484 : ) -> anyhow::Result<Vec<Arc<Timeline>>> {
2794 : // grab mutex to prevent new timelines from being created here.
2795 484 : let gc_cs = self.gc_cs.lock().await;
2796 :
2797 : // Scan all timelines. For each timeline, remember the timeline ID and
2798 : // the branch point where it was created.
2799 483 : let (all_branchpoints, timeline_ids): (BTreeSet<(TimelineId, Lsn)>, _) = {
2800 484 : let timelines = self.timelines.lock().unwrap();
2801 484 : let mut all_branchpoints = BTreeSet::new();
2802 483 : let timeline_ids = {
2803 484 : if let Some(target_timeline_id) = target_timeline_id.as_ref() {
2804 374 : if timelines.get(target_timeline_id).is_none() {
2805 1 : bail!("gc target timeline does not exist")
2806 373 : }
2807 110 : };
2808 :
2809 483 : timelines
2810 483 : .iter()
2811 483 : .map(|(timeline_id, timeline_entry)| {
2812 537 : if let Some(ancestor_timeline_id) =
2813 1019 : &timeline_entry.get_ancestor_timeline_id()
2814 : {
2815 : // If target_timeline is specified, we only need to know branchpoints of its children
2816 537 : if let Some(timeline_id) = target_timeline_id {
2817 352 : if ancestor_timeline_id == &timeline_id {
2818 6 : all_branchpoints.insert((
2819 6 : *ancestor_timeline_id,
2820 6 : timeline_entry.get_ancestor_lsn(),
2821 6 : ));
2822 346 : }
2823 : }
2824 : // Collect branchpoints for all timelines
2825 185 : else {
2826 185 : all_branchpoints.insert((
2827 185 : *ancestor_timeline_id,
2828 185 : timeline_entry.get_ancestor_lsn(),
2829 185 : ));
2830 185 : }
2831 482 : }
2832 :
2833 1019 : *timeline_id
2834 1019 : })
2835 483 : .collect::<Vec<_>>()
2836 483 : };
2837 483 : (all_branchpoints, timeline_ids)
2838 483 : };
2839 483 :
2840 483 : // Ok, we now know all the branch points.
2841 483 : // Update the GC information for each timeline.
2842 483 : let mut gc_timelines = Vec::with_capacity(timeline_ids.len());
2843 1502 : for timeline_id in timeline_ids {
2844 : // Timeline is known to be local and loaded.
2845 1019 : let timeline = self
2846 1019 : .get_timeline(timeline_id, false)
2847 1019 : .with_context(|| format!("Timeline {timeline_id} was not found"))?;
2848 :
2849 : // If target_timeline is specified, ignore all other timelines
2850 1019 : if let Some(target_timeline_id) = target_timeline_id {
2851 726 : if timeline_id != target_timeline_id {
2852 353 : continue;
2853 373 : }
2854 293 : }
2855 :
2856 666 : if let Some(cutoff) = timeline.get_last_record_lsn().checked_sub(horizon) {
2857 602 : let branchpoints: Vec<Lsn> = all_branchpoints
2858 602 : .range((
2859 602 : Included((timeline_id, Lsn(0))),
2860 602 : Included((timeline_id, Lsn(u64::MAX))),
2861 602 : ))
2862 602 : .map(|&x| x.1)
2863 602 : .collect();
2864 602 : timeline
2865 602 : .update_gc_info(branchpoints, cutoff, pitr, ctx)
2866 336870 : .await?;
2867 :
2868 602 : gc_timelines.push(timeline);
2869 64 : }
2870 : }
2871 483 : drop(gc_cs);
2872 483 : Ok(gc_timelines)
2873 484 : }
2874 :
2875 : /// A substitute for `branch_timeline` for use in unit tests.
2876 : /// The returned timeline will have state value `Active` to make various `anyhow::ensure!()`
2877 : /// calls pass, but, we do not actually call `.activate()` under the hood. So, none of the
2878 : /// timeline background tasks are launched, except the flush loop.
2879 : #[cfg(test)]
2880 107 : async fn branch_timeline_test(
2881 107 : &self,
2882 107 : src_timeline: &Arc<Timeline>,
2883 107 : dst_id: TimelineId,
2884 107 : start_lsn: Option<Lsn>,
2885 107 : ctx: &RequestContext,
2886 107 : ) -> Result<Arc<Timeline>, CreateTimelineError> {
2887 107 : let tl = self
2888 107 : .branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
2889 2 : .await?;
2890 105 : tl.set_state(TimelineState::Active);
2891 105 : Ok(tl)
2892 107 : }
2893 :
2894 : /// Branch an existing timeline.
2895 : ///
2896 : /// The caller is responsible for activating the returned timeline.
2897 251 : async fn branch_timeline(
2898 251 : &self,
2899 251 : src_timeline: &Arc<Timeline>,
2900 251 : dst_id: TimelineId,
2901 251 : start_lsn: Option<Lsn>,
2902 251 : ctx: &RequestContext,
2903 251 : ) -> Result<Arc<Timeline>, CreateTimelineError> {
2904 251 : self.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
2905 4 : .await
2906 251 : }
2907 :
2908 358 : async fn branch_timeline_impl(
2909 358 : &self,
2910 358 : src_timeline: &Arc<Timeline>,
2911 358 : dst_id: TimelineId,
2912 358 : start_lsn: Option<Lsn>,
2913 358 : _ctx: &RequestContext,
2914 358 : ) -> Result<Arc<Timeline>, CreateTimelineError> {
2915 358 : let src_id = src_timeline.timeline_id;
2916 :
2917 : // First acquire the GC lock so that another task cannot advance the GC
2918 : // cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
2919 : // creating the branch.
2920 358 : let _gc_cs = self.gc_cs.lock().await;
2921 :
2922 : // If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
2923 358 : let start_lsn = start_lsn.unwrap_or_else(|| {
2924 218 : let lsn = src_timeline.get_last_record_lsn();
2925 218 : info!("branching timeline {dst_id} from timeline {src_id} at last record LSN: {lsn}");
2926 218 : lsn
2927 358 : });
2928 :
2929 : // Create a placeholder for the new branch. This will error
2930 : // out if the new timeline ID is already in use.
2931 358 : let timeline_uninit_mark = {
2932 358 : let timelines = self.timelines.lock().unwrap();
2933 358 : self.create_timeline_uninit_mark(dst_id, &timelines)?
2934 : };
2935 :
2936 : // Ensure that `start_lsn` is valid, i.e. the LSN is within the PITR
2937 : // horizon on the source timeline
2938 : //
2939 : // We check it against both the planned GC cutoff stored in 'gc_info',
2940 : // and the 'latest_gc_cutoff' of the last GC that was performed. The
2941 : // planned GC cutoff in 'gc_info' is normally larger than
2942 : // 'latest_gc_cutoff_lsn', but beware of corner cases like if you just
2943 : // changed the GC settings for the tenant to make the PITR window
2944 : // larger, but some of the data was already removed by an earlier GC
2945 : // iteration.
2946 :
2947 : // check against last actual 'latest_gc_cutoff' first
2948 358 : let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
2949 358 : src_timeline
2950 358 : .check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
2951 358 : .context(format!(
2952 358 : "invalid branch start lsn: less than latest GC cutoff {}",
2953 358 : *latest_gc_cutoff_lsn,
2954 358 : ))
2955 358 : .map_err(CreateTimelineError::AncestorLsn)?;
2956 :
2957 : // and then the planned GC cutoff
2958 : {
2959 350 : let gc_info = src_timeline.gc_info.read().unwrap();
2960 350 : let cutoff = min(gc_info.pitr_cutoff, gc_info.horizon_cutoff);
2961 350 : if start_lsn < cutoff {
2962 UBC 0 : return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
2963 0 : "invalid branch start lsn: less than planned GC cutoff {cutoff}"
2964 0 : )));
2965 CBC 350 : }
2966 350 : }
2967 350 :
2968 350 : //
2969 350 : // The branch point is valid, and we are still holding the 'gc_cs' lock
2970 350 : // so that GC cannot advance the GC cutoff until we are finished.
2971 350 : // Proceed with the branch creation.
2972 350 : //
2973 350 :
2974 350 : // Determine prev-LSN for the new timeline. We can only determine it if
2975 350 : // the timeline was branched at the current end of the source timeline.
2976 350 : let RecordLsn {
2977 350 : last: src_last,
2978 350 : prev: src_prev,
2979 350 : } = src_timeline.get_last_record_rlsn();
2980 350 : let dst_prev = if src_last == start_lsn {
2981 327 : Some(src_prev)
2982 : } else {
2983 23 : None
2984 : };
2985 :
2986 : // Create the metadata file, noting the ancestor of the new timeline.
2987 : // There is initially no data in it, but all the read-calls know to look
2988 : // into the ancestor.
2989 350 : let metadata = TimelineMetadata::new(
2990 350 : start_lsn,
2991 350 : dst_prev,
2992 350 : Some(src_id),
2993 350 : start_lsn,
2994 350 : *src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
2995 350 : src_timeline.initdb_lsn,
2996 350 : src_timeline.pg_version,
2997 350 : );
2998 :
2999 350 : let uninitialized_timeline = self
3000 350 : .prepare_new_timeline(
3001 350 : dst_id,
3002 350 : &metadata,
3003 350 : timeline_uninit_mark,
3004 350 : start_lsn + 1,
3005 350 : Some(Arc::clone(src_timeline)),
3006 350 : )
3007 UBC 0 : .await?;
3008 :
3009 CBC 350 : let new_timeline = uninitialized_timeline.finish_creation()?;
3010 :
3011 : // Root timeline gets its layers during creation and uploads them along with the metadata.
3012 : // A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
3013 : // We still need to upload its metadata eagerly: if other nodes `attach` the tenant and miss this timeline, their GC
3014 : // could get incorrect information and remove more layers, than needed.
3015 : // See also https://github.com/neondatabase/neon/issues/3865
3016 350 : if let Some(remote_client) = new_timeline.remote_client.as_ref() {
3017 350 : remote_client
3018 350 : .schedule_index_upload_for_metadata_update(&metadata)
3019 350 : .context("branch initial metadata upload")?;
3020 UBC 0 : }
3021 :
3022 CBC 350 : info!("branched timeline {dst_id} from {src_id} at {start_lsn}");
3023 :
3024 350 : Ok(new_timeline)
3025 358 : }
3026 :
3027 : /// - run initdb to init temporary instance and get bootstrap data
3028 : /// - after initialization complete, remove the temp dir.
3029 : ///
3030 : /// The caller is responsible for activating the returned timeline.
3031 575 : async fn bootstrap_timeline(
3032 575 : &self,
3033 575 : timeline_id: TimelineId,
3034 575 : pg_version: u32,
3035 575 : ctx: &RequestContext,
3036 575 : ) -> anyhow::Result<Arc<Timeline>> {
3037 575 : let timeline_uninit_mark = {
3038 575 : let timelines = self.timelines.lock().unwrap();
3039 575 : self.create_timeline_uninit_mark(timeline_id, &timelines)?
3040 : };
3041 : // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
3042 : // temporary directory for basebackup files for the given timeline.
3043 575 : let initdb_path = path_with_suffix_extension(
3044 575 : self.conf
3045 575 : .timelines_path(&self.tenant_id)
3046 575 : .join(format!("basebackup-{timeline_id}")),
3047 575 : TEMP_FILE_SUFFIX,
3048 575 : );
3049 575 :
3050 575 : // an uninit mark was placed before, nothing else can access this timeline files
3051 575 : // current initdb was not run yet, so remove whatever was left from the previous runs
3052 575 : if initdb_path.exists() {
3053 UBC 0 : fs::remove_dir_all(&initdb_path).with_context(|| {
3054 0 : format!("Failed to remove already existing initdb directory: {initdb_path}")
3055 0 : })?;
3056 CBC 575 : }
3057 : // Init temporarily repo to get bootstrap data, this creates a directory in the `initdb_path` path
3058 575 : run_initdb(self.conf, &initdb_path, pg_version)?;
3059 : // this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
3060 575 : scopeguard::defer! {
3061 575 : if let Err(e) = fs::remove_dir_all(&initdb_path) {
3062 : // this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call
3063 UBC 0 : error!("Failed to remove temporary initdb directory '{initdb_path}': {e}");
3064 CBC 575 : }
3065 : }
3066 575 : let pgdata_path = &initdb_path;
3067 575 : let pgdata_lsn = import_datadir::get_lsn_from_controlfile(pgdata_path)?.align();
3068 575 :
3069 575 : // Import the contents of the data directory at the initial checkpoint
3070 575 : // LSN, and any WAL after that.
3071 575 : // Initdb lsn will be equal to last_record_lsn which will be set after import.
3072 575 : // Because we know it upfront avoid having an option or dummy zero value by passing it to the metadata.
3073 575 : let new_metadata = TimelineMetadata::new(
3074 575 : Lsn(0),
3075 575 : None,
3076 575 : None,
3077 575 : Lsn(0),
3078 575 : pgdata_lsn,
3079 575 : pgdata_lsn,
3080 575 : pg_version,
3081 575 : );
3082 575 : let raw_timeline = self
3083 575 : .prepare_new_timeline(
3084 575 : timeline_id,
3085 575 : &new_metadata,
3086 575 : timeline_uninit_mark,
3087 575 : pgdata_lsn,
3088 575 : None,
3089 575 : )
3090 1 : .await?;
3091 :
3092 574 : let tenant_id = raw_timeline.owning_tenant.tenant_id;
3093 574 : let unfinished_timeline = raw_timeline.raw_timeline()?;
3094 :
3095 574 : import_datadir::import_timeline_from_postgres_datadir(
3096 574 : unfinished_timeline,
3097 574 : pgdata_path,
3098 574 : pgdata_lsn,
3099 574 : ctx,
3100 574 : )
3101 2889661 : .await
3102 574 : .with_context(|| {
3103 UBC 0 : format!("Failed to import pgdatadir for timeline {tenant_id}/{timeline_id}")
3104 CBC 574 : })?;
3105 :
3106 : // Flush the new layer files to disk, before we make the timeline as available to
3107 : // the outside world.
3108 : //
3109 : // Flush loop needs to be spawned in order to be able to flush.
3110 574 : unfinished_timeline.maybe_spawn_flush_loop();
3111 574 :
3112 574 : fail::fail_point!("before-checkpoint-new-timeline", |_| {
3113 1 : anyhow::bail!("failpoint before-checkpoint-new-timeline");
3114 574 : });
3115 :
3116 573 : unfinished_timeline
3117 573 : .freeze_and_flush()
3118 573 : .await
3119 573 : .with_context(|| {
3120 UBC 0 : format!(
3121 0 : "Failed to flush after pgdatadir import for timeline {tenant_id}/{timeline_id}"
3122 0 : )
3123 CBC 573 : })?;
3124 :
3125 : // All done!
3126 573 : let timeline = raw_timeline.finish_creation()?;
3127 :
3128 573 : info!(
3129 573 : "created root timeline {} timeline.lsn {}",
3130 573 : timeline_id,
3131 573 : timeline.get_last_record_lsn()
3132 573 : );
3133 :
3134 573 : Ok(timeline)
3135 575 : }
3136 :
3137 : /// Call this before constructing a timeline, to build its required structures
3138 1249 : fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
3139 1249 : let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
3140 1249 : let remote_client = RemoteTimelineClient::new(
3141 1249 : remote_storage.clone(),
3142 1249 : self.deletion_queue_client.clone(),
3143 1249 : self.conf,
3144 1249 : self.tenant_id,
3145 1249 : timeline_id,
3146 1249 : self.generation,
3147 1249 : );
3148 1249 : Some(remote_client)
3149 : } else {
3150 UBC 0 : None
3151 : };
3152 :
3153 CBC 1249 : TimelineResources {
3154 1249 : remote_client,
3155 1249 : deletion_queue_client: self.deletion_queue_client.clone(),
3156 1249 : }
3157 1249 : }
3158 :
3159 : /// Creates intermediate timeline structure and its files.
3160 : ///
3161 : /// An empty layer map is initialized, and new data and WAL can be imported starting
3162 : /// at 'disk_consistent_lsn'. After any initial data has been imported, call
3163 : /// `finish_creation` to insert the Timeline into the timelines map and to remove the
3164 : /// uninit mark file.
3165 967 : async fn prepare_new_timeline(
3166 967 : &self,
3167 967 : new_timeline_id: TimelineId,
3168 967 : new_metadata: &TimelineMetadata,
3169 967 : uninit_mark: TimelineUninitMark,
3170 967 : start_lsn: Lsn,
3171 967 : ancestor: Option<Arc<Timeline>>,
3172 967 : ) -> anyhow::Result<UninitializedTimeline> {
3173 967 : let tenant_id = self.tenant_id;
3174 967 :
3175 967 : let resources = self.build_timeline_resources(new_timeline_id);
3176 967 : if let Some(remote_client) = &resources.remote_client {
3177 967 : remote_client.init_upload_queue_for_empty_remote(new_metadata)?;
3178 UBC 0 : }
3179 :
3180 CBC 967 : let timeline_struct = self
3181 967 : .create_timeline_struct(
3182 967 : new_timeline_id,
3183 967 : new_metadata,
3184 967 : ancestor,
3185 967 : resources,
3186 967 : None,
3187 967 : CreateTimelineCause::Load,
3188 967 : )
3189 967 : .context("Failed to create timeline data structure")?;
3190 :
3191 967 : timeline_struct.init_empty_layer_map(start_lsn);
3192 :
3193 967 : if let Err(e) = self
3194 967 : .create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
3195 UBC 0 : .await
3196 : {
3197 CBC 1 : error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}");
3198 1 : cleanup_timeline_directory(uninit_mark);
3199 1 : return Err(e);
3200 966 : }
3201 :
3202 UBC 0 : debug!("Successfully created initial files for timeline {tenant_id}/{new_timeline_id}");
3203 :
3204 CBC 966 : Ok(UninitializedTimeline::new(
3205 966 : self,
3206 966 : new_timeline_id,
3207 966 : Some((timeline_struct, uninit_mark)),
3208 966 : ))
3209 967 : }
3210 :
3211 967 : async fn create_timeline_files(
3212 967 : &self,
3213 967 : timeline_path: &Utf8Path,
3214 967 : new_timeline_id: &TimelineId,
3215 967 : new_metadata: &TimelineMetadata,
3216 967 : ) -> anyhow::Result<()> {
3217 967 : crashsafe::create_dir(timeline_path).context("Failed to create timeline directory")?;
3218 :
3219 967 : fail::fail_point!("after-timeline-uninit-mark-creation", |_| {
3220 1 : anyhow::bail!("failpoint after-timeline-uninit-mark-creation");
3221 967 : });
3222 :
3223 966 : save_metadata(self.conf, &self.tenant_id, new_timeline_id, new_metadata)
3224 UBC 0 : .await
3225 CBC 966 : .context("Failed to create timeline metadata")?;
3226 966 : Ok(())
3227 967 : }
3228 :
3229 : /// Attempts to create an uninit mark file for the timeline initialization.
3230 : /// Bails, if the timeline is already loaded into the memory (i.e. initialized before), or the uninit mark file already exists.
3231 : ///
3232 : /// This way, we need to hold the timelines lock only for small amount of time during the mark check/creation per timeline init.
3233 976 : fn create_timeline_uninit_mark(
3234 976 : &self,
3235 976 : timeline_id: TimelineId,
3236 976 : timelines: &MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
3237 976 : ) -> anyhow::Result<TimelineUninitMark> {
3238 976 : let tenant_id = self.tenant_id;
3239 976 :
3240 976 : anyhow::ensure!(
3241 976 : timelines.get(&timeline_id).is_none(),
3242 1 : "Timeline {tenant_id}/{timeline_id} already exists in pageserver's memory"
3243 : );
3244 975 : let timeline_path = self.conf.timeline_path(&tenant_id, &timeline_id);
3245 975 : anyhow::ensure!(
3246 975 : !timeline_path.exists(),
3247 UBC 0 : "Timeline {timeline_path} already exists, cannot create its uninit mark file",
3248 : );
3249 :
3250 CBC 975 : let uninit_mark_path = self
3251 975 : .conf
3252 975 : .timeline_uninit_mark_file_path(tenant_id, timeline_id);
3253 975 : fs::File::create(&uninit_mark_path)
3254 975 : .context("Failed to create uninit mark file")
3255 975 : .and_then(|_| {
3256 975 : crashsafe::fsync_file_and_parent(&uninit_mark_path)
3257 975 : .context("Failed to fsync uninit mark file")
3258 975 : })
3259 975 : .with_context(|| {
3260 UBC 0 : format!("Failed to crate uninit mark for timeline {tenant_id}/{timeline_id}")
3261 CBC 975 : })?;
3262 :
3263 975 : let uninit_mark = TimelineUninitMark::new(uninit_mark_path, timeline_path);
3264 975 :
3265 975 : Ok(uninit_mark)
3266 976 : }
3267 :
3268 : /// Gathers inputs from all of the timelines to produce a sizing model input.
3269 : ///
3270 : /// Future is cancellation safe. Only one calculation can be running at once per tenant.
3271 288 : #[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
3272 : pub async fn gather_size_inputs(
3273 : &self,
3274 : // `max_retention_period` overrides the cutoff that is used to calculate the size
3275 : // (only if it is shorter than the real cutoff).
3276 : max_retention_period: Option<u64>,
3277 : cause: LogicalSizeCalculationCause,
3278 : ctx: &RequestContext,
3279 : ) -> anyhow::Result<size::ModelInputs> {
3280 : let logical_sizes_at_once = self
3281 : .conf
3282 : .concurrent_tenant_size_logical_size_queries
3283 : .inner();
3284 :
3285 : // TODO: Having a single mutex block concurrent reads is not great for performance.
3286 : //
3287 : // But the only case where we need to run multiple of these at once is when we
3288 : // request a size for a tenant manually via API, while another background calculation
3289 : // is in progress (which is not a common case).
3290 : //
3291 : // See more for on the issue #2748 condenced out of the initial PR review.
3292 : let mut shared_cache = self.cached_logical_sizes.lock().await;
3293 :
3294 : size::gather_inputs(
3295 : self,
3296 : logical_sizes_at_once,
3297 : max_retention_period,
3298 : &mut shared_cache,
3299 : cause,
3300 : ctx,
3301 : )
3302 : .await
3303 : }
3304 :
3305 : /// Calculate synthetic tenant size and cache the result.
3306 : /// This is periodically called by background worker.
3307 : /// result is cached in tenant struct
3308 76 : #[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
3309 : pub async fn calculate_synthetic_size(
3310 : &self,
3311 : cause: LogicalSizeCalculationCause,
3312 : ctx: &RequestContext,
3313 : ) -> anyhow::Result<u64> {
3314 : let inputs = self.gather_size_inputs(None, cause, ctx).await?;
3315 :
3316 : let size = inputs.calculate()?;
3317 :
3318 : self.set_cached_synthetic_size(size);
3319 :
3320 : Ok(size)
3321 : }
3322 :
3323 : /// Cache given synthetic size and update the metric value
3324 19 : pub fn set_cached_synthetic_size(&self, size: u64) {
3325 19 : self.cached_synthetic_tenant_size
3326 19 : .store(size, Ordering::Relaxed);
3327 19 :
3328 19 : TENANT_SYNTHETIC_SIZE_METRIC
3329 19 : .get_metric_with_label_values(&[&self.tenant_id.to_string()])
3330 19 : .unwrap()
3331 19 : .set(size);
3332 19 : }
3333 :
3334 15 : pub fn cached_synthetic_size(&self) -> u64 {
3335 15 : self.cached_synthetic_tenant_size.load(Ordering::Relaxed)
3336 15 : }
3337 : }
3338 :
3339 : fn remove_timeline_and_uninit_mark(
3340 : timeline_dir: &Utf8Path,
3341 : uninit_mark: &Utf8Path,
3342 : ) -> anyhow::Result<()> {
3343 1 : fs::remove_dir_all(timeline_dir)
3344 1 : .or_else(|e| {
3345 UBC 0 : if e.kind() == std::io::ErrorKind::NotFound {
3346 : // we can leave the uninit mark without a timeline dir,
3347 : // just remove the mark then
3348 0 : Ok(())
3349 : } else {
3350 0 : Err(e)
3351 : }
3352 CBC 1 : })
3353 1 : .with_context(|| {
3354 UBC 0 : format!("Failed to remove unit marked timeline directory {timeline_dir}")
3355 CBC 1 : })?;
3356 1 : fs::remove_file(uninit_mark)
3357 1 : .with_context(|| format!("Failed to remove timeline uninit mark file {uninit_mark}"))?;
3358 :
3359 1 : Ok(())
3360 1 : }
3361 :
3362 : pub(crate) enum CreateTenantFilesMode {
3363 : Create,
3364 : Attach,
3365 : }
3366 :
3367 494 : pub(crate) async fn create_tenant_files(
3368 494 : conf: &'static PageServerConf,
3369 494 : location_conf: &LocationConf,
3370 494 : tenant_id: &TenantId,
3371 494 : mode: CreateTenantFilesMode,
3372 494 : ) -> anyhow::Result<Utf8PathBuf> {
3373 494 : let target_tenant_directory = conf.tenant_path(tenant_id);
3374 494 : anyhow::ensure!(
3375 494 : !target_tenant_directory
3376 494 : .try_exists()
3377 494 : .context("check existence of tenant directory")?,
3378 2 : "tenant directory already exists",
3379 : );
3380 :
3381 492 : let temporary_tenant_dir =
3382 492 : path_with_suffix_extension(&target_tenant_directory, TEMP_FILE_SUFFIX);
3383 UBC 0 : debug!("Creating temporary directory structure in {temporary_tenant_dir}");
3384 :
3385 : // top-level dir may exist if we are creating it through CLI
3386 CBC 492 : crashsafe::create_dir_all(&temporary_tenant_dir).with_context(|| {
3387 UBC 0 : format!("could not create temporary tenant directory {temporary_tenant_dir}")
3388 CBC 492 : })?;
3389 :
3390 492 : let creation_result = try_create_target_tenant_dir(
3391 492 : conf,
3392 492 : location_conf,
3393 492 : tenant_id,
3394 492 : mode,
3395 492 : &temporary_tenant_dir,
3396 492 : &target_tenant_directory,
3397 492 : )
3398 UBC 0 : .await;
3399 :
3400 CBC 492 : if creation_result.is_err() {
3401 1 : error!("Failed to create directory structure for tenant {tenant_id}, cleaning tmp data");
3402 1 : if let Err(e) = fs::remove_dir_all(&temporary_tenant_dir) {
3403 UBC 0 : error!("Failed to remove temporary tenant directory {temporary_tenant_dir:?}: {e}")
3404 CBC 1 : } else if let Err(e) = crashsafe::fsync(&temporary_tenant_dir) {
3405 1 : error!(
3406 1 : "Failed to fsync removed temporary tenant directory {temporary_tenant_dir:?}: {e}"
3407 1 : )
3408 UBC 0 : }
3409 CBC 491 : }
3410 :
3411 492 : creation_result?;
3412 :
3413 491 : Ok(target_tenant_directory)
3414 494 : }
3415 :
3416 492 : async fn try_create_target_tenant_dir(
3417 492 : conf: &'static PageServerConf,
3418 492 : location_conf: &LocationConf,
3419 492 : tenant_id: &TenantId,
3420 492 : mode: CreateTenantFilesMode,
3421 492 : temporary_tenant_dir: &Utf8Path,
3422 492 : target_tenant_directory: &Utf8Path,
3423 492 : ) -> Result<(), anyhow::Error> {
3424 492 : match mode {
3425 450 : CreateTenantFilesMode::Create => {} // needs no attach marker, writing tenant conf + atomic rename of dir is good enough
3426 : CreateTenantFilesMode::Attach => {
3427 42 : let attach_marker_path = temporary_tenant_dir.join(TENANT_ATTACHING_MARKER_FILENAME);
3428 42 : let file = std::fs::OpenOptions::new()
3429 42 : .create_new(true)
3430 42 : .write(true)
3431 42 : .open(&attach_marker_path)
3432 42 : .with_context(|| {
3433 UBC 0 : format!("could not create attach marker file {attach_marker_path:?}")
3434 CBC 42 : })?;
3435 42 : file.sync_all().with_context(|| {
3436 UBC 0 : format!("could not sync attach marker file: {attach_marker_path:?}")
3437 CBC 42 : })?;
3438 : // fsync of the directory in which the file resides comes later in this function
3439 : }
3440 : }
3441 :
3442 492 : let temporary_tenant_timelines_dir = rebase_directory(
3443 492 : &conf.timelines_path(tenant_id),
3444 492 : target_tenant_directory,
3445 492 : temporary_tenant_dir,
3446 492 : )
3447 492 : .with_context(|| format!("resolve tenant {tenant_id} temporary timelines dir"))?;
3448 492 : let temporary_legacy_tenant_config_path = rebase_directory(
3449 492 : &conf.tenant_config_path(tenant_id),
3450 492 : target_tenant_directory,
3451 492 : temporary_tenant_dir,
3452 492 : )
3453 492 : .with_context(|| format!("resolve tenant {tenant_id} temporary config path"))?;
3454 492 : let temporary_tenant_config_path = rebase_directory(
3455 492 : &conf.tenant_location_config_path(tenant_id),
3456 492 : target_tenant_directory,
3457 492 : temporary_tenant_dir,
3458 492 : )
3459 492 : .with_context(|| format!("resolve tenant {tenant_id} temporary config path"))?;
3460 :
3461 492 : Tenant::persist_tenant_config_at(
3462 492 : tenant_id,
3463 492 : &temporary_tenant_config_path,
3464 492 : &temporary_legacy_tenant_config_path,
3465 492 : location_conf,
3466 492 : )
3467 UBC 0 : .await?;
3468 :
3469 CBC 492 : crashsafe::create_dir(&temporary_tenant_timelines_dir).with_context(|| {
3470 UBC 0 : format!(
3471 0 : "create tenant {} temporary timelines directory {}",
3472 0 : tenant_id, temporary_tenant_timelines_dir,
3473 0 : )
3474 CBC 492 : })?;
3475 492 : fail::fail_point!("tenant-creation-before-tmp-rename", |_| {
3476 1 : anyhow::bail!("failpoint tenant-creation-before-tmp-rename");
3477 492 : });
3478 :
3479 : // Make sure the current tenant directory entries are durable before renaming.
3480 : // Without this, a crash may reorder any of the directory entry creations above.
3481 491 : crashsafe::fsync(temporary_tenant_dir)
3482 491 : .with_context(|| format!("sync temporary tenant directory {temporary_tenant_dir:?}"))?;
3483 :
3484 491 : fs::rename(temporary_tenant_dir, target_tenant_directory).with_context(|| {
3485 UBC 0 : format!(
3486 0 : "move tenant {} temporary directory {} into the permanent one {}",
3487 0 : tenant_id, temporary_tenant_dir, target_tenant_directory
3488 0 : )
3489 CBC 491 : })?;
3490 491 : let target_dir_parent = target_tenant_directory.parent().with_context(|| {
3491 UBC 0 : format!(
3492 0 : "get tenant {} dir parent for {}",
3493 0 : tenant_id, target_tenant_directory,
3494 0 : )
3495 CBC 491 : })?;
3496 491 : crashsafe::fsync(target_dir_parent).with_context(|| {
3497 UBC 0 : format!(
3498 0 : "fsync renamed directory's parent {} for tenant {}",
3499 0 : target_dir_parent, tenant_id,
3500 0 : )
3501 CBC 491 : })?;
3502 :
3503 491 : Ok(())
3504 492 : }
3505 :
3506 1476 : fn rebase_directory(
3507 1476 : original_path: &Utf8Path,
3508 1476 : base: &Utf8Path,
3509 1476 : new_base: &Utf8Path,
3510 1476 : ) -> anyhow::Result<Utf8PathBuf> {
3511 1476 : let relative_path = original_path.strip_prefix(base).with_context(|| {
3512 UBC 0 : format!(
3513 0 : "Failed to strip base prefix '{}' off path '{}'",
3514 0 : base, original_path
3515 0 : )
3516 CBC 1476 : })?;
3517 1476 : Ok(new_base.join(relative_path))
3518 1476 : }
3519 :
3520 : /// Create the cluster temporarily in 'initdbpath' directory inside the repository
3521 : /// to get bootstrap data for timeline initialization.
3522 575 : fn run_initdb(
3523 575 : conf: &'static PageServerConf,
3524 575 : initdb_target_dir: &Utf8Path,
3525 575 : pg_version: u32,
3526 575 : ) -> anyhow::Result<()> {
3527 575 : let initdb_bin_path = conf.pg_bin_dir(pg_version)?.join("initdb");
3528 575 : let initdb_lib_dir = conf.pg_lib_dir(pg_version)?;
3529 575 : info!(
3530 575 : "running {} in {}, libdir: {}",
3531 575 : initdb_bin_path, initdb_target_dir, initdb_lib_dir,
3532 575 : );
3533 :
3534 575 : let initdb_output = Command::new(&initdb_bin_path)
3535 575 : .args(["-D", initdb_target_dir.as_ref()])
3536 575 : .args(["-U", &conf.superuser])
3537 575 : .args(["-E", "utf8"])
3538 575 : .arg("--no-instructions")
3539 575 : // This is only used for a temporary installation that is deleted shortly after,
3540 575 : // so no need to fsync it
3541 575 : .arg("--no-sync")
3542 575 : .env_clear()
3543 575 : .env("LD_LIBRARY_PATH", &initdb_lib_dir)
3544 575 : .env("DYLD_LIBRARY_PATH", &initdb_lib_dir)
3545 575 : .stdout(Stdio::null())
3546 575 : .output()
3547 575 : .with_context(|| {
3548 UBC 0 : format!(
3549 0 : "failed to execute {} at target dir {}",
3550 0 : initdb_bin_path, initdb_target_dir,
3551 0 : )
3552 CBC 575 : })?;
3553 575 : if !initdb_output.status.success() {
3554 UBC 0 : bail!(
3555 0 : "initdb failed: '{}'",
3556 0 : String::from_utf8_lossy(&initdb_output.stderr)
3557 0 : );
3558 CBC 575 : }
3559 575 :
3560 575 : Ok(())
3561 575 : }
3562 :
3563 : impl Drop for Tenant {
3564 139 : fn drop(&mut self) {
3565 139 : remove_tenant_metrics(&self.tenant_id);
3566 139 : }
3567 : }
3568 : /// Dump contents of a layer file to stdout.
3569 UBC 0 : pub async fn dump_layerfile_from_path(
3570 0 : path: &Utf8Path,
3571 0 : verbose: bool,
3572 0 : ctx: &RequestContext,
3573 0 : ) -> anyhow::Result<()> {
3574 : use std::os::unix::fs::FileExt;
3575 :
3576 : // All layer files start with a two-byte "magic" value, to identify the kind of
3577 : // file.
3578 0 : let file = File::open(path)?;
3579 0 : let mut header_buf = [0u8; 2];
3580 0 : file.read_exact_at(&mut header_buf, 0)?;
3581 :
3582 0 : match u16::from_be_bytes(header_buf) {
3583 : crate::IMAGE_FILE_MAGIC => {
3584 0 : ImageLayer::new_for_path(path, file)?
3585 0 : .dump(verbose, ctx)
3586 0 : .await?
3587 : }
3588 : crate::DELTA_FILE_MAGIC => {
3589 0 : DeltaLayer::new_for_path(path, file)?
3590 0 : .dump(verbose, ctx)
3591 0 : .await?
3592 : }
3593 0 : magic => bail!("unrecognized magic identifier: {:?}", magic),
3594 : }
3595 :
3596 0 : Ok(())
3597 0 : }
3598 :
3599 : #[cfg(test)]
3600 : pub(crate) mod harness {
3601 : use bytes::{Bytes, BytesMut};
3602 : use once_cell::sync::OnceCell;
3603 : use std::fs;
3604 : use std::sync::Arc;
3605 : use utils::logging;
3606 : use utils::lsn::Lsn;
3607 :
3608 : use crate::deletion_queue::mock::MockDeletionQueue;
3609 : use crate::{
3610 : config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
3611 : };
3612 :
3613 : use super::*;
3614 : use crate::tenant::config::{TenantConf, TenantConfOpt};
3615 : use hex_literal::hex;
3616 : use utils::id::{TenantId, TimelineId};
3617 :
3618 : pub const TIMELINE_ID: TimelineId =
3619 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
3620 : pub const NEW_TIMELINE_ID: TimelineId =
3621 : TimelineId::from_array(hex!("AA223344556677881122334455667788"));
3622 :
3623 : /// Convenience function to create a page image with given string as the only content
3624 : #[allow(non_snake_case)]
3625 CBC 854140 : pub fn TEST_IMG(s: &str) -> Bytes {
3626 854140 : let mut buf = BytesMut::new();
3627 854140 : buf.extend_from_slice(s.as_bytes());
3628 854140 : buf.resize(64, 0);
3629 854140 :
3630 854140 : buf.freeze()
3631 854140 : }
3632 :
3633 : impl From<TenantConf> for TenantConfOpt {
3634 42 : fn from(tenant_conf: TenantConf) -> Self {
3635 42 : Self {
3636 42 : checkpoint_distance: Some(tenant_conf.checkpoint_distance),
3637 42 : checkpoint_timeout: Some(tenant_conf.checkpoint_timeout),
3638 42 : compaction_target_size: Some(tenant_conf.compaction_target_size),
3639 42 : compaction_period: Some(tenant_conf.compaction_period),
3640 42 : compaction_threshold: Some(tenant_conf.compaction_threshold),
3641 42 : gc_horizon: Some(tenant_conf.gc_horizon),
3642 42 : gc_period: Some(tenant_conf.gc_period),
3643 42 : image_creation_threshold: Some(tenant_conf.image_creation_threshold),
3644 42 : pitr_interval: Some(tenant_conf.pitr_interval),
3645 42 : walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
3646 42 : lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
3647 42 : max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
3648 42 : trace_read_requests: Some(tenant_conf.trace_read_requests),
3649 42 : eviction_policy: Some(tenant_conf.eviction_policy),
3650 42 : min_resident_size_override: tenant_conf.min_resident_size_override,
3651 42 : evictions_low_residence_duration_metric_threshold: Some(
3652 42 : tenant_conf.evictions_low_residence_duration_metric_threshold,
3653 42 : ),
3654 42 : gc_feedback: Some(tenant_conf.gc_feedback),
3655 42 : }
3656 42 : }
3657 : }
3658 :
3659 : pub struct TenantHarness {
3660 : pub conf: &'static PageServerConf,
3661 : pub tenant_conf: TenantConf,
3662 : pub tenant_id: TenantId,
3663 : pub generation: Generation,
3664 : pub remote_storage: GenericRemoteStorage,
3665 : pub remote_fs_dir: Utf8PathBuf,
3666 : pub deletion_queue: MockDeletionQueue,
3667 : }
3668 :
3669 : static LOG_HANDLE: OnceCell<()> = OnceCell::new();
3670 :
3671 : impl TenantHarness {
3672 41 : pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
3673 41 : LOG_HANDLE.get_or_init(|| {
3674 1 : logging::init(
3675 1 : logging::LogFormat::Test,
3676 1 : // enable it in case in case the tests exercise code paths that use
3677 1 : // debug_assert_current_span_has_tenant_and_timeline_id
3678 1 : logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
3679 1 : )
3680 1 : .expect("Failed to init test logging")
3681 41 : });
3682 41 :
3683 41 : let repo_dir = PageServerConf::test_repo_dir(test_name);
3684 41 : let _ = fs::remove_dir_all(&repo_dir);
3685 41 : fs::create_dir_all(&repo_dir)?;
3686 :
3687 41 : let conf = PageServerConf::dummy_conf(repo_dir);
3688 41 : // Make a static copy of the config. This can never be free'd, but that's
3689 41 : // OK in a test.
3690 41 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
3691 41 :
3692 41 : // Disable automatic GC and compaction to make the unit tests more deterministic.
3693 41 : // The tests perform them manually if needed.
3694 41 : let tenant_conf = TenantConf {
3695 41 : gc_period: Duration::ZERO,
3696 41 : compaction_period: Duration::ZERO,
3697 41 : ..TenantConf::default()
3698 41 : };
3699 41 :
3700 41 : let tenant_id = TenantId::generate();
3701 41 : fs::create_dir_all(conf.tenant_path(&tenant_id))?;
3702 41 : fs::create_dir_all(conf.timelines_path(&tenant_id))?;
3703 :
3704 : use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
3705 41 : let remote_fs_dir = conf.workdir.join("localfs");
3706 41 : std::fs::create_dir_all(&remote_fs_dir).unwrap();
3707 41 : let config = RemoteStorageConfig {
3708 41 : // TODO: why not remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
3709 41 : max_concurrent_syncs: std::num::NonZeroUsize::new(2_000_000).unwrap(),
3710 41 : // TODO: why not remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
3711 41 : max_sync_errors: std::num::NonZeroU32::new(3_000_000).unwrap(),
3712 41 : storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
3713 41 : };
3714 41 : let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
3715 41 : let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()));
3716 41 :
3717 41 : Ok(Self {
3718 41 : conf,
3719 41 : tenant_conf,
3720 41 : tenant_id,
3721 41 : generation: Generation::new(0xdeadbeef),
3722 41 : remote_storage,
3723 41 : remote_fs_dir,
3724 41 : deletion_queue,
3725 41 : })
3726 41 : }
3727 :
3728 39 : pub async fn load(&self) -> (Arc<Tenant>, RequestContext) {
3729 39 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
3730 39 : (
3731 39 : self.try_load(&ctx)
3732 45 : .await
3733 39 : .expect("failed to load test tenant"),
3734 39 : ctx,
3735 39 : )
3736 39 : }
3737 :
3738 42 : pub async fn try_load(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
3739 42 : let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
3740 42 :
3741 42 : let tenant = Arc::new(Tenant::new(
3742 42 : TenantState::Loading,
3743 42 : self.conf,
3744 42 : AttachedTenantConf::try_from(LocationConf::attached_single(
3745 42 : TenantConfOpt::from(self.tenant_conf),
3746 42 : self.generation,
3747 42 : ))
3748 42 : .unwrap(),
3749 42 : walredo_mgr,
3750 42 : self.tenant_id,
3751 42 : Some(self.remote_storage.clone()),
3752 42 : self.deletion_queue.new_client(),
3753 42 : ));
3754 42 : tenant
3755 42 : .load(None, None, ctx)
3756 42 : .instrument(info_span!("try_load", tenant_id=%self.tenant_id))
3757 48 : .await?;
3758 :
3759 : // TODO reuse Tenant::activate (needs broker)
3760 41 : tenant.state.send_replace(TenantState::Active);
3761 41 : for timeline in tenant.timelines.lock().unwrap().values() {
3762 3 : timeline.set_state(TimelineState::Active);
3763 3 : }
3764 41 : Ok(tenant)
3765 42 : }
3766 :
3767 9 : pub fn timeline_path(&self, timeline_id: &TimelineId) -> Utf8PathBuf {
3768 9 : self.conf.timeline_path(&self.tenant_id, timeline_id)
3769 9 : }
3770 : }
3771 :
3772 : // Mock WAL redo manager that doesn't do much
3773 : pub(crate) struct TestRedoManager;
3774 :
3775 : impl TestRedoManager {
3776 UBC 0 : pub async fn request_redo(
3777 0 : &self,
3778 0 : key: Key,
3779 0 : lsn: Lsn,
3780 0 : base_img: Option<(Lsn, Bytes)>,
3781 0 : records: Vec<(Lsn, NeonWalRecord)>,
3782 0 : _pg_version: u32,
3783 0 : ) -> anyhow::Result<Bytes> {
3784 0 : let s = format!(
3785 0 : "redo for {} to get to {}, with {} and {} records",
3786 0 : key,
3787 0 : lsn,
3788 0 : if base_img.is_some() {
3789 0 : "base image"
3790 : } else {
3791 0 : "no base image"
3792 : },
3793 0 : records.len()
3794 0 : );
3795 0 : println!("{s}");
3796 0 :
3797 0 : Ok(TEST_IMG(&s))
3798 0 : }
3799 : }
3800 : }
3801 :
3802 : #[cfg(test)]
3803 : mod tests {
3804 : use super::*;
3805 : use crate::keyspace::KeySpaceAccum;
3806 : use crate::repository::{Key, Value};
3807 : use crate::tenant::harness::*;
3808 : use crate::DEFAULT_PG_VERSION;
3809 : use crate::METADATA_FILE_NAME;
3810 : use bytes::BytesMut;
3811 : use hex_literal::hex;
3812 : use once_cell::sync::Lazy;
3813 : use rand::{thread_rng, Rng};
3814 : use tokio_util::sync::CancellationToken;
3815 :
3816 : static TEST_KEY: Lazy<Key> =
3817 CBC 1 : Lazy::new(|| Key::from_slice(&hex!("112222222233333333444444445500000001")));
3818 :
3819 1 : #[tokio::test]
3820 1 : async fn test_basic() -> anyhow::Result<()> {
3821 1 : let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
3822 1 : let tline = tenant
3823 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
3824 2 : .await?;
3825 :
3826 1 : let writer = tline.writer().await;
3827 1 : writer
3828 1 : .put(
3829 1 : *TEST_KEY,
3830 1 : Lsn(0x10),
3831 1 : &Value::Image(TEST_IMG("foo at 0x10")),
3832 1 : &ctx,
3833 1 : )
3834 UBC 0 : .await?;
3835 CBC 1 : writer.finish_write(Lsn(0x10));
3836 1 : drop(writer);
3837 :
3838 1 : let writer = tline.writer().await;
3839 1 : writer
3840 1 : .put(
3841 1 : *TEST_KEY,
3842 1 : Lsn(0x20),
3843 1 : &Value::Image(TEST_IMG("foo at 0x20")),
3844 1 : &ctx,
3845 1 : )
3846 UBC 0 : .await?;
3847 CBC 1 : writer.finish_write(Lsn(0x20));
3848 1 : drop(writer);
3849 :
3850 1 : assert_eq!(
3851 1 : tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
3852 1 : TEST_IMG("foo at 0x10")
3853 : );
3854 1 : assert_eq!(
3855 1 : tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
3856 1 : TEST_IMG("foo at 0x10")
3857 : );
3858 1 : assert_eq!(
3859 1 : tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
3860 1 : TEST_IMG("foo at 0x20")
3861 : );
3862 :
3863 1 : Ok(())
3864 : }
3865 :
3866 1 : #[tokio::test]
3867 1 : async fn no_duplicate_timelines() -> anyhow::Result<()> {
3868 1 : let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
3869 1 : .load()
3870 1 : .await;
3871 1 : let _ = tenant
3872 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
3873 2 : .await?;
3874 :
3875 1 : match tenant
3876 1 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
3877 UBC 0 : .await
3878 : {
3879 0 : Ok(_) => panic!("duplicate timeline creation should fail"),
3880 CBC 1 : Err(e) => assert_eq!(
3881 1 : e.to_string(),
3882 1 : format!(
3883 1 : "Timeline {}/{} already exists in pageserver's memory",
3884 1 : tenant.tenant_id, TIMELINE_ID
3885 1 : )
3886 1 : ),
3887 : }
3888 :
3889 1 : Ok(())
3890 : }
3891 :
3892 : /// Convenience function to create a page image with given string as the only content
3893 5 : pub fn test_value(s: &str) -> Value {
3894 5 : let mut buf = BytesMut::new();
3895 5 : buf.extend_from_slice(s.as_bytes());
3896 5 : Value::Image(buf.freeze())
3897 5 : }
3898 :
3899 : ///
3900 : /// Test branch creation
3901 : ///
3902 1 : #[tokio::test]
3903 1 : async fn test_branch() -> anyhow::Result<()> {
3904 : use std::str::from_utf8;
3905 :
3906 1 : let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
3907 1 : let tline = tenant
3908 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
3909 2 : .await?;
3910 1 : let writer = tline.writer().await;
3911 :
3912 : #[allow(non_snake_case)]
3913 1 : let TEST_KEY_A: Key = Key::from_hex("112222222233333333444444445500000001").unwrap();
3914 1 : #[allow(non_snake_case)]
3915 1 : let TEST_KEY_B: Key = Key::from_hex("112222222233333333444444445500000002").unwrap();
3916 1 :
3917 1 : // Insert a value on the timeline
3918 1 : writer
3919 1 : .put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"), &ctx)
3920 UBC 0 : .await?;
3921 CBC 1 : writer
3922 1 : .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"), &ctx)
3923 UBC 0 : .await?;
3924 CBC 1 : writer.finish_write(Lsn(0x20));
3925 1 :
3926 1 : writer
3927 1 : .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"), &ctx)
3928 UBC 0 : .await?;
3929 CBC 1 : writer.finish_write(Lsn(0x30));
3930 1 : writer
3931 1 : .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"), &ctx)
3932 UBC 0 : .await?;
3933 CBC 1 : writer.finish_write(Lsn(0x40));
3934 1 :
3935 1 : //assert_current_logical_size(&tline, Lsn(0x40));
3936 1 :
3937 1 : // Branch the history, modify relation differently on the new timeline
3938 1 : tenant
3939 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx)
3940 UBC 0 : .await?;
3941 CBC 1 : let newtline = tenant
3942 1 : .get_timeline(NEW_TIMELINE_ID, true)
3943 1 : .expect("Should have a local timeline");
3944 1 : let new_writer = newtline.writer().await;
3945 1 : new_writer
3946 1 : .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"), &ctx)
3947 UBC 0 : .await?;
3948 CBC 1 : new_writer.finish_write(Lsn(0x40));
3949 :
3950 : // Check page contents on both branches
3951 1 : assert_eq!(
3952 1 : from_utf8(&tline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
3953 : "foo at 0x40"
3954 : );
3955 1 : assert_eq!(
3956 1 : from_utf8(&newtline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
3957 : "bar at 0x40"
3958 : );
3959 1 : assert_eq!(
3960 1 : from_utf8(&newtline.get(TEST_KEY_B, Lsn(0x40), &ctx).await?)?,
3961 : "foobar at 0x20"
3962 : );
3963 :
3964 : //assert_current_logical_size(&tline, Lsn(0x40));
3965 :
3966 1 : Ok(())
3967 : }
3968 :
3969 10 : async fn make_some_layers(
3970 10 : tline: &Timeline,
3971 10 : start_lsn: Lsn,
3972 10 : ctx: &RequestContext,
3973 10 : ) -> anyhow::Result<()> {
3974 10 : let mut lsn = start_lsn;
3975 : #[allow(non_snake_case)]
3976 : {
3977 10 : let writer = tline.writer().await;
3978 : // Create a relation on the timeline
3979 10 : writer
3980 10 : .put(
3981 10 : *TEST_KEY,
3982 10 : lsn,
3983 10 : &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
3984 10 : ctx,
3985 10 : )
3986 UBC 0 : .await?;
3987 CBC 10 : writer.finish_write(lsn);
3988 10 : lsn += 0x10;
3989 10 : writer
3990 10 : .put(
3991 10 : *TEST_KEY,
3992 10 : lsn,
3993 10 : &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
3994 10 : ctx,
3995 10 : )
3996 UBC 0 : .await?;
3997 CBC 10 : writer.finish_write(lsn);
3998 10 : lsn += 0x10;
3999 10 : }
4000 10 : tline.freeze_and_flush().await?;
4001 : {
4002 10 : let writer = tline.writer().await;
4003 10 : writer
4004 10 : .put(
4005 10 : *TEST_KEY,
4006 10 : lsn,
4007 10 : &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
4008 10 : ctx,
4009 10 : )
4010 UBC 0 : .await?;
4011 CBC 10 : writer.finish_write(lsn);
4012 10 : lsn += 0x10;
4013 10 : writer
4014 10 : .put(
4015 10 : *TEST_KEY,
4016 10 : lsn,
4017 10 : &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
4018 10 : ctx,
4019 10 : )
4020 UBC 0 : .await?;
4021 CBC 10 : writer.finish_write(lsn);
4022 10 : }
4023 10 : tline.freeze_and_flush().await
4024 10 : }
4025 :
4026 1 : #[tokio::test]
4027 1 : async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
4028 1 : let (tenant, ctx) =
4029 1 : TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
4030 1 : .load()
4031 1 : .await;
4032 1 : let tline = tenant
4033 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4034 2 : .await?;
4035 2 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4036 :
4037 : // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
4038 : // FIXME: this doesn't actually remove any layer currently, given how the flushing
4039 : // and compaction works. But it does set the 'cutoff' point so that the cross check
4040 : // below should fail.
4041 1 : tenant
4042 1 : .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
4043 LBC (1) : .await?;
4044 :
4045 : // try to branch at lsn 25, should fail because we already garbage collected the data
4046 CBC 1 : match tenant
4047 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
4048 UBC 0 : .await
4049 : {
4050 0 : Ok(_) => panic!("branching should have failed"),
4051 CBC 1 : Err(err) => {
4052 1 : let CreateTimelineError::AncestorLsn(err) = err else {
4053 UBC 0 : panic!("wrong error type")
4054 : };
4055 CBC 1 : assert!(err.to_string().contains("invalid branch start lsn"));
4056 1 : assert!(err
4057 1 : .source()
4058 1 : .unwrap()
4059 1 : .to_string()
4060 1 : .contains("we might've already garbage collected needed data"))
4061 : }
4062 : }
4063 :
4064 1 : Ok(())
4065 : }
4066 :
4067 1 : #[tokio::test]
4068 1 : async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
4069 1 : let (tenant, ctx) =
4070 1 : TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?
4071 1 : .load()
4072 1 : .await;
4073 :
4074 1 : let tline = tenant
4075 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)
4076 2 : .await?;
4077 : // try to branch at lsn 0x25, should fail because initdb lsn is 0x50
4078 1 : match tenant
4079 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
4080 UBC 0 : .await
4081 : {
4082 0 : Ok(_) => panic!("branching should have failed"),
4083 CBC 1 : Err(err) => {
4084 1 : let CreateTimelineError::AncestorLsn(err) = err else {
4085 UBC 0 : panic!("wrong error type");
4086 : };
4087 CBC 1 : assert!(&err.to_string().contains("invalid branch start lsn"));
4088 1 : assert!(&err
4089 1 : .source()
4090 1 : .unwrap()
4091 1 : .to_string()
4092 1 : .contains("is earlier than latest GC horizon"));
4093 : }
4094 : }
4095 :
4096 1 : Ok(())
4097 : }
4098 :
4099 : /*
4100 : // FIXME: This currently fails to error out. Calling GC doesn't currently
4101 : // remove the old value, we'd need to work a little harder
4102 : #[tokio::test]
4103 : async fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
4104 : let repo =
4105 : RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
4106 : .load();
4107 :
4108 : let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
4109 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4110 :
4111 : repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
4112 : let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
4113 : assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
4114 : match tline.get(*TEST_KEY, Lsn(0x25)) {
4115 : Ok(_) => panic!("request for page should have failed"),
4116 : Err(err) => assert!(err.to_string().contains("not found at")),
4117 : }
4118 : Ok(())
4119 : }
4120 : */
4121 :
4122 1 : #[tokio::test]
4123 1 : async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> {
4124 1 : let (tenant, ctx) =
4125 1 : TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
4126 1 : .load()
4127 1 : .await;
4128 1 : let tline = tenant
4129 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4130 2 : .await?;
4131 2 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4132 :
4133 1 : tenant
4134 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
4135 UBC 0 : .await?;
4136 CBC 1 : let newtline = tenant
4137 1 : .get_timeline(NEW_TIMELINE_ID, true)
4138 1 : .expect("Should have a local timeline");
4139 1 :
4140 2 : make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
4141 :
4142 1 : tline.set_broken("test".to_owned());
4143 1 :
4144 1 : tenant
4145 1 : .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
4146 UBC 0 : .await?;
4147 :
4148 : // The branchpoints should contain all timelines, even ones marked
4149 : // as Broken.
4150 : {
4151 CBC 1 : let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
4152 1 : assert_eq!(branchpoints.len(), 1);
4153 1 : assert_eq!(branchpoints[0], Lsn(0x40));
4154 : }
4155 :
4156 : // You can read the key from the child branch even though the parent is
4157 : // Broken, as long as you don't need to access data from the parent.
4158 1 : assert_eq!(
4159 1 : newtline.get(*TEST_KEY, Lsn(0x70), &ctx).await?,
4160 1 : TEST_IMG(&format!("foo at {}", Lsn(0x70)))
4161 : );
4162 :
4163 : // This needs to traverse to the parent, and fails.
4164 1 : let err = newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await.unwrap_err();
4165 1 : assert!(err
4166 1 : .to_string()
4167 1 : .contains("will not become active. Current state: Broken"));
4168 :
4169 1 : Ok(())
4170 : }
4171 :
4172 1 : #[tokio::test]
4173 1 : async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
4174 1 : let (tenant, ctx) =
4175 1 : TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
4176 1 : .load()
4177 1 : .await;
4178 1 : let tline = tenant
4179 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4180 2 : .await?;
4181 2 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4182 :
4183 1 : tenant
4184 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
4185 UBC 0 : .await?;
4186 CBC 1 : let newtline = tenant
4187 1 : .get_timeline(NEW_TIMELINE_ID, true)
4188 1 : .expect("Should have a local timeline");
4189 1 : // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
4190 1 : tenant
4191 1 : .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
4192 1 : .await?;
4193 1 : assert!(newtline.get(*TEST_KEY, Lsn(0x25), &ctx).await.is_ok());
4194 :
4195 1 : Ok(())
4196 : }
4197 1 : #[tokio::test]
4198 1 : async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
4199 1 : let (tenant, ctx) =
4200 1 : TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
4201 1 : .load()
4202 1 : .await;
4203 1 : let tline = tenant
4204 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4205 2 : .await?;
4206 2 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4207 :
4208 1 : tenant
4209 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
4210 UBC 0 : .await?;
4211 CBC 1 : let newtline = tenant
4212 1 : .get_timeline(NEW_TIMELINE_ID, true)
4213 1 : .expect("Should have a local timeline");
4214 1 :
4215 2 : make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
4216 :
4217 : // run gc on parent
4218 1 : tenant
4219 1 : .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
4220 UBC 0 : .await?;
4221 :
4222 : // Check that the data is still accessible on the branch.
4223 CBC 1 : assert_eq!(
4224 1 : newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await?,
4225 1 : TEST_IMG(&format!("foo at {}", Lsn(0x40)))
4226 : );
4227 :
4228 1 : Ok(())
4229 : }
4230 :
4231 1 : #[tokio::test]
4232 1 : async fn timeline_load() -> anyhow::Result<()> {
4233 : const TEST_NAME: &str = "timeline_load";
4234 1 : let harness = TenantHarness::create(TEST_NAME)?;
4235 : {
4236 1 : let (tenant, ctx) = harness.load().await;
4237 1 : let tline = tenant
4238 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)
4239 2 : .await?;
4240 2 : make_some_layers(tline.as_ref(), Lsn(0x8000), &ctx).await?;
4241 : // so that all uploads finish & we can call harness.load() below again
4242 1 : tenant
4243 1 : .shutdown(Default::default(), true)
4244 1 : .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
4245 2 : .await
4246 1 : .ok()
4247 1 : .unwrap();
4248 : }
4249 :
4250 3 : let (tenant, _ctx) = harness.load().await;
4251 1 : tenant
4252 1 : .get_timeline(TIMELINE_ID, true)
4253 1 : .expect("cannot load timeline");
4254 1 :
4255 1 : Ok(())
4256 : }
4257 :
4258 1 : #[tokio::test]
4259 1 : async fn timeline_load_with_ancestor() -> anyhow::Result<()> {
4260 : const TEST_NAME: &str = "timeline_load_with_ancestor";
4261 1 : let harness = TenantHarness::create(TEST_NAME)?;
4262 : // create two timelines
4263 : {
4264 1 : let (tenant, ctx) = harness.load().await;
4265 1 : let tline = tenant
4266 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4267 2 : .await?;
4268 :
4269 2 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4270 :
4271 1 : let child_tline = tenant
4272 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
4273 UBC 0 : .await?;
4274 CBC 1 : child_tline.set_state(TimelineState::Active);
4275 1 :
4276 1 : let newtline = tenant
4277 1 : .get_timeline(NEW_TIMELINE_ID, true)
4278 1 : .expect("Should have a local timeline");
4279 1 :
4280 2 : make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
4281 :
4282 : // so that all uploads finish & we can call harness.load() below again
4283 1 : tenant
4284 1 : .shutdown(Default::default(), true)
4285 1 : .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
4286 3 : .await
4287 1 : .ok()
4288 1 : .unwrap();
4289 : }
4290 :
4291 : // check that both of them are initially unloaded
4292 5 : let (tenant, _ctx) = harness.load().await;
4293 :
4294 : // check that both, child and ancestor are loaded
4295 1 : let _child_tline = tenant
4296 1 : .get_timeline(NEW_TIMELINE_ID, true)
4297 1 : .expect("cannot get child timeline loaded");
4298 1 :
4299 1 : let _ancestor_tline = tenant
4300 1 : .get_timeline(TIMELINE_ID, true)
4301 1 : .expect("cannot get ancestor timeline loaded");
4302 1 :
4303 1 : Ok(())
4304 : }
4305 :
4306 1 : #[tokio::test]
4307 1 : async fn delta_layer_dumping() -> anyhow::Result<()> {
4308 1 : let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
4309 1 : let tline = tenant
4310 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4311 2 : .await?;
4312 2 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4313 :
4314 1 : let layer_map = tline.layers.read().await;
4315 1 : let level0_deltas = layer_map.layer_map().get_level0_deltas()?;
4316 :
4317 1 : assert!(!level0_deltas.is_empty());
4318 :
4319 3 : for delta in level0_deltas {
4320 2 : let delta = layer_map.get_from_desc(&delta);
4321 2 : // Ensure we are dumping a delta layer here
4322 2 : let delta = delta.downcast_delta_layer().unwrap();
4323 2 :
4324 2 : delta.dump(false, &ctx).await.unwrap();
4325 2 : delta.dump(true, &ctx).await.unwrap();
4326 : }
4327 :
4328 1 : Ok(())
4329 : }
4330 :
4331 1 : #[tokio::test]
4332 1 : async fn corrupt_metadata() -> anyhow::Result<()> {
4333 : const TEST_NAME: &str = "corrupt_metadata";
4334 1 : let harness = TenantHarness::create(TEST_NAME)?;
4335 1 : let (tenant, ctx) = harness.load().await;
4336 :
4337 1 : let tline = tenant
4338 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4339 2 : .await?;
4340 1 : drop(tline);
4341 1 : // so that all uploads finish & we can call harness.try_load() below again
4342 1 : tenant
4343 1 : .shutdown(Default::default(), true)
4344 1 : .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
4345 2 : .await
4346 1 : .ok()
4347 1 : .unwrap();
4348 1 : drop(tenant);
4349 1 :
4350 1 : let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
4351 1 :
4352 1 : assert!(metadata_path.is_file());
4353 :
4354 1 : let mut metadata_bytes = std::fs::read(&metadata_path)?;
4355 1 : assert_eq!(metadata_bytes.len(), 512);
4356 1 : metadata_bytes[8] ^= 1;
4357 1 : std::fs::write(metadata_path, metadata_bytes)?;
4358 :
4359 1 : let err = harness.try_load(&ctx).await.err().expect("should fail");
4360 1 : // get all the stack with all .context, not only the last one
4361 1 : let message = format!("{err:#}");
4362 1 : let expected = "failed to load metadata";
4363 1 : assert!(
4364 1 : message.contains(expected),
4365 UBC 0 : "message '{message}' expected to contain {expected}"
4366 : );
4367 :
4368 CBC 1 : let mut found_error_message = false;
4369 1 : let mut err_source = err.source();
4370 1 : while let Some(source) = err_source {
4371 1 : if source.to_string().contains("metadata checksum mismatch") {
4372 1 : found_error_message = true;
4373 1 : break;
4374 UBC 0 : }
4375 0 : err_source = source.source();
4376 : }
4377 CBC 1 : assert!(
4378 1 : found_error_message,
4379 UBC 0 : "didn't find the corrupted metadata error in {}",
4380 : message
4381 : );
4382 :
4383 CBC 1 : Ok(())
4384 : }
4385 :
4386 1 : #[tokio::test]
4387 1 : async fn test_images() -> anyhow::Result<()> {
4388 1 : let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
4389 1 : let tline = tenant
4390 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
4391 2 : .await?;
4392 :
4393 1 : let writer = tline.writer().await;
4394 1 : writer
4395 1 : .put(
4396 1 : *TEST_KEY,
4397 1 : Lsn(0x10),
4398 1 : &Value::Image(TEST_IMG("foo at 0x10")),
4399 1 : &ctx,
4400 1 : )
4401 UBC 0 : .await?;
4402 CBC 1 : writer.finish_write(Lsn(0x10));
4403 1 : drop(writer);
4404 1 :
4405 1 : tline.freeze_and_flush().await?;
4406 1 : tline.compact(&CancellationToken::new(), &ctx).await?;
4407 :
4408 1 : let writer = tline.writer().await;
4409 1 : writer
4410 1 : .put(
4411 1 : *TEST_KEY,
4412 1 : Lsn(0x20),
4413 1 : &Value::Image(TEST_IMG("foo at 0x20")),
4414 1 : &ctx,
4415 1 : )
4416 UBC 0 : .await?;
4417 CBC 1 : writer.finish_write(Lsn(0x20));
4418 1 : drop(writer);
4419 1 :
4420 1 : tline.freeze_and_flush().await?;
4421 1 : tline.compact(&CancellationToken::new(), &ctx).await?;
4422 :
4423 1 : let writer = tline.writer().await;
4424 1 : writer
4425 1 : .put(
4426 1 : *TEST_KEY,
4427 1 : Lsn(0x30),
4428 1 : &Value::Image(TEST_IMG("foo at 0x30")),
4429 1 : &ctx,
4430 1 : )
4431 UBC 0 : .await?;
4432 CBC 1 : writer.finish_write(Lsn(0x30));
4433 1 : drop(writer);
4434 1 :
4435 1 : tline.freeze_and_flush().await?;
4436 1 : tline.compact(&CancellationToken::new(), &ctx).await?;
4437 :
4438 1 : let writer = tline.writer().await;
4439 1 : writer
4440 1 : .put(
4441 1 : *TEST_KEY,
4442 1 : Lsn(0x40),
4443 1 : &Value::Image(TEST_IMG("foo at 0x40")),
4444 1 : &ctx,
4445 1 : )
4446 UBC 0 : .await?;
4447 CBC 1 : writer.finish_write(Lsn(0x40));
4448 1 : drop(writer);
4449 1 :
4450 1 : tline.freeze_and_flush().await?;
4451 1 : tline.compact(&CancellationToken::new(), &ctx).await?;
4452 :
4453 1 : assert_eq!(
4454 1 : tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
4455 1 : TEST_IMG("foo at 0x10")
4456 : );
4457 1 : assert_eq!(
4458 1 : tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
4459 1 : TEST_IMG("foo at 0x10")
4460 : );
4461 1 : assert_eq!(
4462 1 : tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
4463 1 : TEST_IMG("foo at 0x20")
4464 : );
4465 1 : assert_eq!(
4466 1 : tline.get(*TEST_KEY, Lsn(0x30), &ctx).await?,
4467 1 : TEST_IMG("foo at 0x30")
4468 : );
4469 1 : assert_eq!(
4470 1 : tline.get(*TEST_KEY, Lsn(0x40), &ctx).await?,
4471 1 : TEST_IMG("foo at 0x40")
4472 : );
4473 :
4474 1 : Ok(())
4475 : }
4476 :
4477 : //
4478 : // Insert 1000 key-value pairs with increasing keys, flush, compact, GC.
4479 : // Repeat 50 times.
4480 : //
4481 1 : #[tokio::test]
4482 1 : async fn test_bulk_insert() -> anyhow::Result<()> {
4483 1 : let harness = TenantHarness::create("test_bulk_insert")?;
4484 1 : let (tenant, ctx) = harness.load().await;
4485 1 : let tline = tenant
4486 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
4487 2 : .await?;
4488 :
4489 1 : let mut lsn = Lsn(0x10);
4490 1 :
4491 1 : let mut keyspace = KeySpaceAccum::new();
4492 1 :
4493 1 : let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap();
4494 1 : let mut blknum = 0;
4495 51 : for _ in 0..50 {
4496 500050 : for _ in 0..10000 {
4497 500000 : test_key.field6 = blknum;
4498 500000 : let writer = tline.writer().await;
4499 500000 : writer
4500 500000 : .put(
4501 500000 : test_key,
4502 500000 : lsn,
4503 500000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
4504 500000 : &ctx,
4505 500000 : )
4506 7804 : .await?;
4507 500000 : writer.finish_write(lsn);
4508 500000 : drop(writer);
4509 500000 :
4510 500000 : keyspace.add_key(test_key);
4511 500000 :
4512 500000 : lsn = Lsn(lsn.0 + 0x10);
4513 500000 : blknum += 1;
4514 : }
4515 :
4516 50 : let cutoff = tline.get_last_record_lsn();
4517 50 :
4518 50 : tline
4519 50 : .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
4520 UBC 0 : .await?;
4521 CBC 50 : tline.freeze_and_flush().await?;
4522 7920 : tline.compact(&CancellationToken::new(), &ctx).await?;
4523 50 : tline.gc().await?;
4524 : }
4525 :
4526 1 : Ok(())
4527 : }
4528 :
4529 1 : #[tokio::test]
4530 1 : async fn test_random_updates() -> anyhow::Result<()> {
4531 1 : let harness = TenantHarness::create("test_random_updates")?;
4532 1 : let (tenant, ctx) = harness.load().await;
4533 1 : let tline = tenant
4534 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4535 2 : .await?;
4536 :
4537 : const NUM_KEYS: usize = 1000;
4538 :
4539 1 : let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap();
4540 1 :
4541 1 : let mut keyspace = KeySpaceAccum::new();
4542 1 :
4543 1 : // Track when each page was last modified. Used to assert that
4544 1 : // a read sees the latest page version.
4545 1 : let mut updated = [Lsn(0); NUM_KEYS];
4546 1 :
4547 1 : let mut lsn = Lsn(0x10);
4548 : #[allow(clippy::needless_range_loop)]
4549 1001 : for blknum in 0..NUM_KEYS {
4550 1000 : lsn = Lsn(lsn.0 + 0x10);
4551 1000 : test_key.field6 = blknum as u32;
4552 1000 : let writer = tline.writer().await;
4553 1000 : writer
4554 1000 : .put(
4555 1000 : test_key,
4556 1000 : lsn,
4557 1000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
4558 1000 : &ctx,
4559 1000 : )
4560 17 : .await?;
4561 1000 : writer.finish_write(lsn);
4562 1000 : updated[blknum] = lsn;
4563 1000 : drop(writer);
4564 1000 :
4565 1000 : keyspace.add_key(test_key);
4566 : }
4567 :
4568 51 : for _ in 0..50 {
4569 50050 : for _ in 0..NUM_KEYS {
4570 50000 : lsn = Lsn(lsn.0 + 0x10);
4571 50000 : let blknum = thread_rng().gen_range(0..NUM_KEYS);
4572 50000 : test_key.field6 = blknum as u32;
4573 50000 : let writer = tline.writer().await;
4574 50000 : writer
4575 50000 : .put(
4576 50000 : test_key,
4577 50000 : lsn,
4578 50000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
4579 50000 : &ctx,
4580 50000 : )
4581 704 : .await?;
4582 50000 : writer.finish_write(lsn);
4583 50000 : drop(writer);
4584 50000 : updated[blknum] = lsn;
4585 : }
4586 :
4587 : // Read all the blocks
4588 50000 : for (blknum, last_lsn) in updated.iter().enumerate() {
4589 50000 : test_key.field6 = blknum as u32;
4590 50000 : assert_eq!(
4591 50000 : tline.get(test_key, lsn, &ctx).await?,
4592 50000 : TEST_IMG(&format!("{} at {}", blknum, last_lsn))
4593 : );
4594 : }
4595 :
4596 : // Perform a cycle of flush, compact, and GC
4597 50 : let cutoff = tline.get_last_record_lsn();
4598 50 : tline
4599 50 : .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
4600 UBC 0 : .await?;
4601 CBC 51 : tline.freeze_and_flush().await?;
4602 851 : tline.compact(&CancellationToken::new(), &ctx).await?;
4603 50 : tline.gc().await?;
4604 : }
4605 :
4606 1 : Ok(())
4607 : }
4608 :
4609 1 : #[tokio::test]
4610 1 : async fn test_traverse_branches() -> anyhow::Result<()> {
4611 1 : let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
4612 1 : .load()
4613 1 : .await;
4614 1 : let mut tline = tenant
4615 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4616 2 : .await?;
4617 :
4618 : const NUM_KEYS: usize = 1000;
4619 :
4620 1 : let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap();
4621 1 :
4622 1 : let mut keyspace = KeySpaceAccum::new();
4623 1 :
4624 1 : // Track when each page was last modified. Used to assert that
4625 1 : // a read sees the latest page version.
4626 1 : let mut updated = [Lsn(0); NUM_KEYS];
4627 1 :
4628 1 : let mut lsn = Lsn(0x10);
4629 : #[allow(clippy::needless_range_loop)]
4630 1001 : for blknum in 0..NUM_KEYS {
4631 1000 : lsn = Lsn(lsn.0 + 0x10);
4632 1000 : test_key.field6 = blknum as u32;
4633 1000 : let writer = tline.writer().await;
4634 1000 : writer
4635 1000 : .put(
4636 1000 : test_key,
4637 1000 : lsn,
4638 1000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
4639 1000 : &ctx,
4640 1000 : )
4641 17 : .await?;
4642 1000 : writer.finish_write(lsn);
4643 1000 : updated[blknum] = lsn;
4644 1000 : drop(writer);
4645 1000 :
4646 1000 : keyspace.add_key(test_key);
4647 : }
4648 :
4649 51 : for _ in 0..50 {
4650 50 : let new_tline_id = TimelineId::generate();
4651 50 : tenant
4652 50 : .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
4653 UBC 0 : .await?;
4654 CBC 50 : tline = tenant
4655 50 : .get_timeline(new_tline_id, true)
4656 50 : .expect("Should have the branched timeline");
4657 :
4658 50050 : for _ in 0..NUM_KEYS {
4659 50000 : lsn = Lsn(lsn.0 + 0x10);
4660 50000 : let blknum = thread_rng().gen_range(0..NUM_KEYS);
4661 50000 : test_key.field6 = blknum as u32;
4662 50000 : let writer = tline.writer().await;
4663 50000 : writer
4664 50000 : .put(
4665 50000 : test_key,
4666 50000 : lsn,
4667 50000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
4668 50000 : &ctx,
4669 50000 : )
4670 799 : .await?;
4671 50000 : println!("updating {} at {}", blknum, lsn);
4672 50000 : writer.finish_write(lsn);
4673 50000 : drop(writer);
4674 50000 : updated[blknum] = lsn;
4675 : }
4676 :
4677 : // Read all the blocks
4678 50000 : for (blknum, last_lsn) in updated.iter().enumerate() {
4679 50000 : test_key.field6 = blknum as u32;
4680 50000 : assert_eq!(
4681 50000 : tline.get(test_key, lsn, &ctx).await?,
4682 50000 : TEST_IMG(&format!("{} at {}", blknum, last_lsn))
4683 : );
4684 : }
4685 :
4686 : // Perform a cycle of flush, compact, and GC
4687 50 : let cutoff = tline.get_last_record_lsn();
4688 50 : tline
4689 50 : .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
4690 UBC 0 : .await?;
4691 CBC 51 : tline.freeze_and_flush().await?;
4692 168 : tline.compact(&CancellationToken::new(), &ctx).await?;
4693 50 : tline.gc().await?;
4694 : }
4695 :
4696 1 : Ok(())
4697 : }
4698 :
4699 1 : #[tokio::test]
4700 1 : async fn test_traverse_ancestors() -> anyhow::Result<()> {
4701 1 : let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
4702 1 : .load()
4703 1 : .await;
4704 1 : let mut tline = tenant
4705 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4706 2 : .await?;
4707 :
4708 : const NUM_KEYS: usize = 100;
4709 : const NUM_TLINES: usize = 50;
4710 :
4711 1 : let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap();
4712 1 : // Track page mutation lsns across different timelines.
4713 1 : let mut updated = [[Lsn(0); NUM_KEYS]; NUM_TLINES];
4714 1 :
4715 1 : let mut lsn = Lsn(0x10);
4716 :
4717 : #[allow(clippy::needless_range_loop)]
4718 51 : for idx in 0..NUM_TLINES {
4719 50 : let new_tline_id = TimelineId::generate();
4720 50 : tenant
4721 50 : .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
4722 UBC 0 : .await?;
4723 CBC 50 : tline = tenant
4724 50 : .get_timeline(new_tline_id, true)
4725 50 : .expect("Should have the branched timeline");
4726 :
4727 5050 : for _ in 0..NUM_KEYS {
4728 5000 : lsn = Lsn(lsn.0 + 0x10);
4729 5000 : let blknum = thread_rng().gen_range(0..NUM_KEYS);
4730 5000 : test_key.field6 = blknum as u32;
4731 5000 : let writer = tline.writer().await;
4732 5000 : writer
4733 5000 : .put(
4734 5000 : test_key,
4735 5000 : lsn,
4736 5000 : &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
4737 5000 : &ctx,
4738 5000 : )
4739 78 : .await?;
4740 5000 : println!("updating [{}][{}] at {}", idx, blknum, lsn);
4741 5000 : writer.finish_write(lsn);
4742 5000 : drop(writer);
4743 5000 : updated[idx][blknum] = lsn;
4744 : }
4745 : }
4746 :
4747 : // Read pages from leaf timeline across all ancestors.
4748 50 : for (idx, lsns) in updated.iter().enumerate() {
4749 5000 : for (blknum, lsn) in lsns.iter().enumerate() {
4750 : // Skip empty mutations.
4751 5000 : if lsn.0 == 0 {
4752 1810 : continue;
4753 3190 : }
4754 3190 : println!("checking [{idx}][{blknum}] at {lsn}");
4755 3190 : test_key.field6 = blknum as u32;
4756 3190 : assert_eq!(
4757 3190 : tline.get(test_key, *lsn, &ctx).await?,
4758 3190 : TEST_IMG(&format!("{idx} {blknum} at {lsn}"))
4759 : );
4760 : }
4761 : }
4762 1 : Ok(())
4763 : }
4764 :
4765 1 : #[tokio::test]
4766 1 : async fn test_write_at_initdb_lsn_takes_optimization_code_path() -> anyhow::Result<()> {
4767 1 : let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")?
4768 1 : .load()
4769 1 : .await;
4770 :
4771 1 : let initdb_lsn = Lsn(0x20);
4772 1 : let utline = tenant
4773 1 : .create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
4774 UBC 0 : .await?;
4775 CBC 1 : let tline = utline.raw_timeline().unwrap();
4776 1 :
4777 1 : // Spawn flush loop now so that we can set the `expect_initdb_optimization`
4778 1 : tline.maybe_spawn_flush_loop();
4779 1 :
4780 1 : // Make sure the timeline has the minimum set of required keys for operation.
4781 1 : // The only operation you can always do on an empty timeline is to `put` new data.
4782 1 : // Except if you `put` at `initdb_lsn`.
4783 1 : // In that case, there's an optimization to directly create image layers instead of delta layers.
4784 1 : // It uses `repartition()`, which assumes some keys to be present.
4785 1 : // Let's make sure the test timeline can handle that case.
4786 1 : {
4787 1 : let mut state = tline.flush_loop_state.lock().unwrap();
4788 1 : assert_eq!(
4789 1 : timeline::FlushLoopState::Running {
4790 1 : expect_initdb_optimization: false,
4791 1 : initdb_optimization_count: 0,
4792 1 : },
4793 1 : *state
4794 1 : );
4795 1 : *state = timeline::FlushLoopState::Running {
4796 1 : expect_initdb_optimization: true,
4797 1 : initdb_optimization_count: 0,
4798 1 : };
4799 1 : }
4800 1 :
4801 1 : // Make writes at the initdb_lsn. When we flush it below, it should be handled by the optimization.
4802 1 : // As explained above, the optimization requires some keys to be present.
4803 1 : // As per `create_empty_timeline` documentation, use init_empty to set them.
4804 1 : // This is what `create_test_timeline` does, by the way.
4805 1 : let mut modification = tline.begin_modification(initdb_lsn);
4806 1 : modification
4807 1 : .init_empty_test_timeline()
4808 1 : .context("init_empty_test_timeline")?;
4809 1 : modification
4810 1 : .commit(&ctx)
4811 UBC 0 : .await
4812 CBC 1 : .context("commit init_empty_test_timeline modification")?;
4813 :
4814 : // Do the flush. The flush code will check the expectations that we set above.
4815 1 : tline.freeze_and_flush().await?;
4816 :
4817 : // assert freeze_and_flush exercised the initdb optimization
4818 : {
4819 1 : let state = tline.flush_loop_state.lock().unwrap();
4820 : let timeline::FlushLoopState::Running {
4821 1 : expect_initdb_optimization,
4822 1 : initdb_optimization_count,
4823 1 : } = *state
4824 : else {
4825 UBC 0 : panic!("unexpected state: {:?}", *state);
4826 : };
4827 CBC 1 : assert!(expect_initdb_optimization);
4828 1 : assert!(initdb_optimization_count > 0);
4829 : }
4830 1 : Ok(())
4831 : }
4832 :
4833 1 : #[tokio::test]
4834 1 : async fn test_uninit_mark_crash() -> anyhow::Result<()> {
4835 1 : let name = "test_uninit_mark_crash";
4836 1 : let harness = TenantHarness::create(name)?;
4837 : {
4838 1 : let (tenant, ctx) = harness.load().await;
4839 1 : let tline = tenant
4840 1 : .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
4841 UBC 0 : .await?;
4842 : // Keeps uninit mark in place
4843 CBC 1 : let raw_tline = tline.raw_timeline().unwrap();
4844 1 : raw_tline
4845 1 : .shutdown(false)
4846 1 : .instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_id))
4847 UBC 0 : .await;
4848 CBC 1 : std::mem::forget(tline);
4849 : }
4850 :
4851 1 : let (tenant, _) = harness.load().await;
4852 1 : match tenant.get_timeline(TIMELINE_ID, false) {
4853 UBC 0 : Ok(_) => panic!("timeline should've been removed during load"),
4854 CBC 1 : Err(e) => {
4855 1 : assert_eq!(
4856 1 : e,
4857 1 : GetTimelineError::NotFound {
4858 1 : tenant_id: tenant.tenant_id,
4859 1 : timeline_id: TIMELINE_ID,
4860 1 : }
4861 1 : )
4862 : }
4863 : }
4864 :
4865 1 : assert!(!harness
4866 1 : .conf
4867 1 : .timeline_path(&tenant.tenant_id, &TIMELINE_ID)
4868 1 : .exists());
4869 :
4870 1 : assert!(!harness
4871 1 : .conf
4872 1 : .timeline_uninit_mark_file_path(tenant.tenant_id, TIMELINE_ID)
4873 1 : .exists());
4874 :
4875 1 : Ok(())
4876 : }
4877 : }
|