TLA Line data Source code
1 : //!
2 : //! Timeline repository implementation that keeps old data in files on disk, and
3 : //! the recent changes in memory. See tenant/*_layer.rs files.
4 : //! The functions here are responsible for locating the correct layer for the
5 : //! get/put call, walking back the timeline branching history as needed.
6 : //!
7 : //! The files are stored in the .neon/tenants/<tenant_id>/timelines/<timeline_id>
8 : //! directory. See docs/pageserver-storage.md for how the files are managed.
9 : //! In addition to the layer files, there is a metadata file in the same
10 : //! directory that contains information about the timeline, in particular its
11 : //! parent timeline, and the last LSN that has been written to disk.
12 : //!
13 :
14 : use anyhow::{bail, Context};
15 : use camino::{Utf8Path, Utf8PathBuf};
16 : use enumset::EnumSet;
17 : use futures::stream::FuturesUnordered;
18 : use futures::FutureExt;
19 : use futures::StreamExt;
20 : use pageserver_api::models::TimelineState;
21 : use pageserver_api::shard::ShardIdentity;
22 : use pageserver_api::shard::TenantShardId;
23 : use remote_storage::DownloadError;
24 : use remote_storage::GenericRemoteStorage;
25 : use std::fmt;
26 : use storage_broker::BrokerClientChannel;
27 : use tokio::io::BufReader;
28 : use tokio::runtime::Handle;
29 : use tokio::sync::watch;
30 : use tokio::task::JoinSet;
31 : use tokio_util::sync::CancellationToken;
32 : use tracing::*;
33 : use utils::backoff;
34 : use utils::completion;
35 : use utils::crashsafe::path_with_suffix_extension;
36 : use utils::failpoint_support;
37 : use utils::fs_ext;
38 : use utils::sync::gate::Gate;
39 : use utils::sync::gate::GateGuard;
40 : use utils::timeout::timeout_cancellable;
41 : use utils::timeout::TimeoutCancellableError;
42 :
43 : use self::config::AttachedLocationConfig;
44 : use self::config::AttachmentMode;
45 : use self::config::LocationConf;
46 : use self::config::TenantConf;
47 : use self::delete::DeleteTenantFlow;
48 : use self::metadata::LoadMetadataError;
49 : use self::metadata::TimelineMetadata;
50 : use self::mgr::GetActiveTenantError;
51 : use self::mgr::GetTenantError;
52 : use self::mgr::TenantsMap;
53 : use self::remote_timeline_client::RemoteTimelineClient;
54 : use self::timeline::uninit::TimelineExclusionError;
55 : use self::timeline::uninit::TimelineUninitMark;
56 : use self::timeline::uninit::UninitializedTimeline;
57 : use self::timeline::EvictionTaskTenantState;
58 : use self::timeline::TimelineResources;
59 : use self::timeline::WaitLsnError;
60 : use crate::config::PageServerConf;
61 : use crate::context::{DownloadBehavior, RequestContext};
62 : use crate::deletion_queue::DeletionQueueClient;
63 : use crate::deletion_queue::DeletionQueueError;
64 : use crate::import_datadir;
65 : use crate::is_uninit_mark;
66 : use crate::metrics::TENANT;
67 : use crate::metrics::{remove_tenant_metrics, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC};
68 : use crate::repository::GcResult;
69 : use crate::task_mgr;
70 : use crate::task_mgr::TaskKind;
71 : use crate::tenant::config::LocationMode;
72 : use crate::tenant::config::TenantConfOpt;
73 : use crate::tenant::metadata::load_metadata;
74 : pub use crate::tenant::remote_timeline_client::index::IndexPart;
75 : use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
76 : use crate::tenant::remote_timeline_client::INITDB_PATH;
77 : use crate::tenant::storage_layer::DeltaLayer;
78 : use crate::tenant::storage_layer::ImageLayer;
79 : use crate::InitializationOrder;
80 : use std::cmp::min;
81 : use std::collections::hash_map::Entry;
82 : use std::collections::BTreeSet;
83 : use std::collections::HashMap;
84 : use std::collections::HashSet;
85 : use std::fmt::Debug;
86 : use std::fmt::Display;
87 : use std::fs;
88 : use std::fs::File;
89 : use std::io;
90 : use std::ops::Bound::Included;
91 : use std::process::Stdio;
92 : use std::sync::atomic::AtomicU64;
93 : use std::sync::atomic::Ordering;
94 : use std::sync::Arc;
95 : use std::sync::{Mutex, RwLock};
96 : use std::time::{Duration, Instant};
97 :
98 : use crate::tenant::timeline::delete::DeleteTimelineFlow;
99 : use crate::tenant::timeline::uninit::cleanup_timeline_directory;
100 : use crate::virtual_file::VirtualFile;
101 : use crate::walredo::PostgresRedoManager;
102 : use crate::TEMP_FILE_SUFFIX;
103 : use once_cell::sync::Lazy;
104 : pub use pageserver_api::models::TenantState;
105 : use tokio::sync::Semaphore;
106 :
107 CBC 337 : static INIT_DB_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(8));
108 : use toml_edit;
109 : use utils::{
110 : crashsafe,
111 : generation::Generation,
112 : id::{TenantId, TimelineId},
113 : lsn::{Lsn, RecordLsn},
114 : };
115 :
116 : /// Declare a failpoint that can use the `pause` failpoint action.
117 : /// We don't want to block the executor thread, hence, spawn_blocking + await.
118 : macro_rules! pausable_failpoint {
119 : ($name:literal) => {
120 : if cfg!(feature = "testing") {
121 : tokio::task::spawn_blocking({
122 : let current = tracing::Span::current();
123 : move || {
124 : let _entered = current.entered();
125 : tracing::info!("at failpoint {}", $name);
126 : fail::fail_point!($name);
127 : }
128 : })
129 : .await
130 : .expect("spawn_blocking");
131 : }
132 : };
133 : }
134 :
135 : pub mod blob_io;
136 : pub mod block_io;
137 :
138 : pub mod disk_btree;
139 : pub(crate) mod ephemeral_file;
140 : pub mod layer_map;
141 : mod span;
142 :
143 : pub mod metadata;
144 : mod par_fsync;
145 : pub mod remote_timeline_client;
146 : pub mod storage_layer;
147 :
148 : pub mod config;
149 : pub mod delete;
150 : pub mod mgr;
151 : pub mod secondary;
152 : pub mod tasks;
153 : pub mod upload_queue;
154 :
155 : pub(crate) mod timeline;
156 :
157 : pub mod size;
158 :
159 : pub(crate) use timeline::span::debug_assert_current_span_has_tenant_and_timeline_id;
160 : pub(crate) use timeline::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
161 :
162 : // re-export for use in remote_timeline_client.rs
163 : pub use crate::tenant::metadata::save_metadata;
164 :
165 : // re-export for use in walreceiver
166 : pub use crate::tenant::timeline::WalReceiverInfo;
167 :
168 : /// The "tenants" part of `tenants/<tenant>/timelines...`
169 : pub const TENANTS_SEGMENT_NAME: &str = "tenants";
170 :
171 : /// Parts of the `.neon/tenants/<tenant_id>/timelines/<timeline_id>` directory prefix.
172 : pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
173 :
174 : pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
175 :
176 : /// References to shared objects that are passed into each tenant, such
177 : /// as the shared remote storage client and process initialization state.
178 273 : #[derive(Clone)]
179 : pub struct TenantSharedResources {
180 : pub broker_client: storage_broker::BrokerClientChannel,
181 : pub remote_storage: Option<GenericRemoteStorage>,
182 : pub deletion_queue_client: DeletionQueueClient,
183 : }
184 :
185 : /// A [`Tenant`] is really an _attached_ tenant. The configuration
186 : /// for an attached tenant is a subset of the [`LocationConf`], represented
187 : /// in this struct.
188 : pub(super) struct AttachedTenantConf {
189 : tenant_conf: TenantConfOpt,
190 : location: AttachedLocationConfig,
191 : }
192 :
193 : impl AttachedTenantConf {
194 798 : fn try_from(location_conf: LocationConf) -> anyhow::Result<Self> {
195 798 : match &location_conf.mode {
196 798 : LocationMode::Attached(attach_conf) => Ok(Self {
197 798 : tenant_conf: location_conf.tenant_conf,
198 798 : location: attach_conf.clone(),
199 798 : }),
200 : LocationMode::Secondary(_) => {
201 UBC 0 : anyhow::bail!("Attempted to construct AttachedTenantConf from a LocationConf in secondary mode")
202 : }
203 : }
204 CBC 798 : }
205 : }
206 : struct TimelinePreload {
207 : timeline_id: TimelineId,
208 : client: RemoteTimelineClient,
209 : index_part: Result<MaybeDeletedIndexPart, DownloadError>,
210 : }
211 :
212 : pub(crate) struct TenantPreload {
213 : deleting: bool,
214 : timelines: HashMap<TimelineId, TimelinePreload>,
215 : }
216 :
217 : /// When we spawn a tenant, there is a special mode for tenant creation that
218 : /// avoids trying to read anything from remote storage.
219 : pub(crate) enum SpawnMode {
220 : Normal,
221 : Create,
222 : }
223 :
224 : ///
225 : /// Tenant consists of multiple timelines. Keep them in a hash table.
226 : ///
227 : pub struct Tenant {
228 : // Global pageserver config parameters
229 : pub conf: &'static PageServerConf,
230 :
231 : /// The value creation timestamp, used to measure activation delay, see:
232 : /// <https://github.com/neondatabase/neon/issues/4025>
233 : constructed_at: Instant,
234 :
235 : state: watch::Sender<TenantState>,
236 :
237 : // Overridden tenant-specific config parameters.
238 : // We keep TenantConfOpt sturct here to preserve the information
239 : // about parameters that are not set.
240 : // This is necessary to allow global config updates.
241 : tenant_conf: Arc<RwLock<AttachedTenantConf>>,
242 :
243 : tenant_shard_id: TenantShardId,
244 :
245 : // The detailed sharding information, beyond the number/count in tenant_shard_id
246 : shard_identity: ShardIdentity,
247 :
248 : /// The remote storage generation, used to protect S3 objects from split-brain.
249 : /// Does not change over the lifetime of the [`Tenant`] object.
250 : ///
251 : /// This duplicates the generation stored in LocationConf, but that structure is mutable:
252 : /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
253 : generation: Generation,
254 :
255 : timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
256 :
257 : /// During timeline creation, we first insert the TimelineId to the
258 : /// creating map, then `timelines`, then remove it from the creating map.
259 : /// **Lock order**: if acquring both, acquire`timelines` before `timelines_creating`
260 : timelines_creating: std::sync::Mutex<HashSet<TimelineId>>,
261 :
262 : // This mutex prevents creation of new timelines during GC.
263 : // Adding yet another mutex (in addition to `timelines`) is needed because holding
264 : // `timelines` mutex during all GC iteration
265 : // may block for a long time `get_timeline`, `get_timelines_state`,... and other operations
266 : // with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
267 : // timeout...
268 : gc_cs: tokio::sync::Mutex<()>,
269 : walredo_mgr: Arc<WalRedoManager>,
270 :
271 : // provides access to timeline data sitting in the remote storage
272 : pub(crate) remote_storage: Option<GenericRemoteStorage>,
273 :
274 : // Access to global deletion queue for when this tenant wants to schedule a deletion
275 : deletion_queue_client: DeletionQueueClient,
276 :
277 : /// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
278 : cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
279 : cached_synthetic_tenant_size: Arc<AtomicU64>,
280 :
281 : eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
282 :
283 : /// If the tenant is in Activating state, notify this to encourage it
284 : /// to proceed to Active as soon as possible, rather than waiting for lazy
285 : /// background warmup.
286 : pub(crate) activate_now_sem: tokio::sync::Semaphore,
287 :
288 : pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
289 :
290 : // Cancellation token fires when we have entered shutdown(). This is a parent of
291 : // Timelines' cancellation token.
292 : pub(crate) cancel: CancellationToken,
293 :
294 : // Users of the Tenant such as the page service must take this Gate to avoid
295 : // trying to use a Tenant which is shutting down.
296 : pub(crate) gate: Gate,
297 : }
298 :
299 : impl std::fmt::Debug for Tenant {
300 1 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
301 1 : write!(f, "{} ({})", self.tenant_shard_id, self.current_state())
302 1 : }
303 : }
304 :
305 : pub(crate) enum WalRedoManager {
306 : Prod(PostgresRedoManager),
307 : #[cfg(test)]
308 : Test(harness::TestRedoManager),
309 : }
310 :
311 : impl From<PostgresRedoManager> for WalRedoManager {
312 736 : fn from(mgr: PostgresRedoManager) -> Self {
313 736 : Self::Prod(mgr)
314 736 : }
315 : }
316 :
317 : #[cfg(test)]
318 : impl From<harness::TestRedoManager> for WalRedoManager {
319 42 : fn from(mgr: harness::TestRedoManager) -> Self {
320 42 : Self::Test(mgr)
321 42 : }
322 : }
323 :
324 : impl WalRedoManager {
325 UBC 0 : pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
326 0 : match self {
327 CBC 727 : Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
328 727 : #[cfg(test)]
329 727 : Self::Test(_) => {
330 UBC 0 : // Not applicable to test redo manager
331 0 : }
332 CBC 727 : }
333 727 : }
334 :
335 : /// # Cancel-Safety
336 : ///
337 : /// This method is cancellation-safe.
338 2060218 : pub async fn request_redo(
339 2060218 : &self,
340 2060218 : key: crate::repository::Key,
341 2060218 : lsn: Lsn,
342 2060218 : base_img: Option<(Lsn, bytes::Bytes)>,
343 2060218 : records: Vec<(Lsn, crate::walrecord::NeonWalRecord)>,
344 2060218 : pg_version: u32,
345 2060218 : ) -> anyhow::Result<bytes::Bytes> {
346 UBC 0 : match self {
347 CBC 2060218 : Self::Prod(mgr) => {
348 UBC 0 : mgr.request_redo(key, lsn, base_img, records, pg_version)
349 0 : .await
350 : }
351 : #[cfg(test)]
352 0 : Self::Test(mgr) => {
353 0 : mgr.request_redo(key, lsn, base_img, records, pg_version)
354 0 : .await
355 : }
356 : }
357 CBC 2060218 : }
358 : }
359 :
360 100 : #[derive(Debug, thiserror::Error, PartialEq, Eq)]
361 : pub enum GetTimelineError {
362 : #[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")]
363 : NotActive {
364 : tenant_id: TenantId,
365 : timeline_id: TimelineId,
366 : state: TimelineState,
367 : },
368 : #[error("Timeline {tenant_id}/{timeline_id} was not found")]
369 : NotFound {
370 : tenant_id: TenantId,
371 : timeline_id: TimelineId,
372 : },
373 : }
374 :
375 UBC 0 : #[derive(Debug, thiserror::Error)]
376 : pub enum LoadLocalTimelineError {
377 : #[error("FailedToLoad")]
378 : Load(#[source] anyhow::Error),
379 : #[error("FailedToResumeDeletion")]
380 : ResumeDeletion(#[source] anyhow::Error),
381 : }
382 :
383 CBC 139 : #[derive(thiserror::Error)]
384 : pub enum DeleteTimelineError {
385 : #[error("NotFound")]
386 : NotFound,
387 :
388 : #[error("HasChildren")]
389 : HasChildren(Vec<TimelineId>),
390 :
391 : #[error("Timeline deletion is already in progress")]
392 : AlreadyInProgress(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>),
393 :
394 : #[error(transparent)]
395 : Other(#[from] anyhow::Error),
396 : }
397 :
398 : impl Debug for DeleteTimelineError {
399 UBC 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400 0 : match self {
401 0 : Self::NotFound => write!(f, "NotFound"),
402 0 : Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(),
403 0 : Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(),
404 0 : Self::Other(e) => f.debug_tuple("Other").field(e).finish(),
405 : }
406 0 : }
407 : }
408 :
409 : pub enum SetStoppingError {
410 : AlreadyStopping(completion::Barrier),
411 : Broken,
412 : }
413 :
414 : impl Debug for SetStoppingError {
415 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
416 0 : match self {
417 0 : Self::AlreadyStopping(_) => f.debug_tuple("AlreadyStopping").finish(),
418 0 : Self::Broken => write!(f, "Broken"),
419 : }
420 0 : }
421 : }
422 :
423 CBC 6 : #[derive(thiserror::Error, Debug)]
424 : pub enum CreateTimelineError {
425 : #[error("creation of timeline with the given ID is in progress")]
426 : AlreadyCreating,
427 : #[error("timeline already exists with different parameters")]
428 : Conflict,
429 : #[error(transparent)]
430 : AncestorLsn(anyhow::Error),
431 : #[error("ancestor timeline is not active")]
432 : AncestorNotActive,
433 : #[error("tenant shutting down")]
434 : ShuttingDown,
435 : #[error(transparent)]
436 : Other(#[from] anyhow::Error),
437 : }
438 :
439 UBC 0 : #[derive(thiserror::Error, Debug)]
440 : enum InitdbError {
441 : Other(anyhow::Error),
442 : Cancelled,
443 : Spawn(std::io::Result<()>),
444 : Failed(std::process::ExitStatus, Vec<u8>),
445 : }
446 :
447 : impl fmt::Display for InitdbError {
448 0 : fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
449 0 : match self {
450 0 : InitdbError::Cancelled => write!(f, "Operation was cancelled"),
451 0 : InitdbError::Spawn(e) => write!(f, "Spawn error: {:?}", e),
452 0 : InitdbError::Failed(status, stderr) => write!(
453 0 : f,
454 0 : "Command failed with status {:?}: {}",
455 0 : status,
456 0 : String::from_utf8_lossy(stderr)
457 0 : ),
458 0 : InitdbError::Other(e) => write!(f, "Error: {:?}", e),
459 : }
460 0 : }
461 : }
462 :
463 : impl From<std::io::Error> for InitdbError {
464 0 : fn from(error: std::io::Error) -> Self {
465 0 : InitdbError::Spawn(Err(error))
466 0 : }
467 : }
468 :
469 : struct TenantDirectoryScan {
470 : sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
471 : timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
472 : }
473 :
474 : enum CreateTimelineCause {
475 : Load,
476 : Delete,
477 : }
478 :
479 : impl Tenant {
480 : /// Yet another helper for timeline initialization.
481 : ///
482 : /// - Initializes the Timeline struct and inserts it into the tenant's hash map
483 : /// - Scans the local timeline directory for layer files and builds the layer map
484 : /// - Downloads remote index file and adds remote files to the layer map
485 : /// - Schedules remote upload tasks for any files that are present locally but missing from remote storage.
486 : ///
487 : /// If the operation fails, the timeline is left in the tenant's hash map in Broken state. On success,
488 : /// it is marked as Active.
489 : #[allow(clippy::too_many_arguments)]
490 CBC 357 : async fn timeline_init_and_sync(
491 357 : &self,
492 357 : timeline_id: TimelineId,
493 357 : resources: TimelineResources,
494 357 : index_part: Option<IndexPart>,
495 357 : metadata: TimelineMetadata,
496 357 : ancestor: Option<Arc<Timeline>>,
497 357 : _ctx: &RequestContext,
498 357 : ) -> anyhow::Result<()> {
499 357 : let tenant_id = self.tenant_shard_id;
500 :
501 357 : let timeline = self.create_timeline_struct(
502 357 : timeline_id,
503 357 : &metadata,
504 357 : ancestor.clone(),
505 357 : resources,
506 357 : CreateTimelineCause::Load,
507 357 : )?;
508 357 : let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
509 357 : anyhow::ensure!(
510 357 : disk_consistent_lsn.is_valid(),
511 UBC 0 : "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
512 : );
513 CBC 357 : assert_eq!(
514 357 : disk_consistent_lsn,
515 357 : metadata.disk_consistent_lsn(),
516 UBC 0 : "these are used interchangeably"
517 : );
518 :
519 CBC 357 : if let Some(index_part) = index_part.as_ref() {
520 357 : timeline
521 357 : .remote_client
522 357 : .as_ref()
523 357 : .unwrap()
524 357 : .init_upload_queue(index_part)?;
525 UBC 0 : } else if self.remote_storage.is_some() {
526 : // No data on the remote storage, but we have local metadata file. We can end up
527 : // here with timeline_create being interrupted before finishing index part upload.
528 : // By doing what we do here, the index part upload is retried.
529 : // If control plane retries timeline creation in the meantime, the mgmt API handler
530 : // for timeline creation will coalesce on the upload we queue here.
531 0 : let rtc = timeline.remote_client.as_ref().unwrap();
532 0 : rtc.init_upload_queue_for_empty_remote(&metadata)?;
533 0 : rtc.schedule_index_upload_for_metadata_update(&metadata)?;
534 0 : }
535 :
536 CBC 357 : timeline
537 357 : .load_layer_map(disk_consistent_lsn, index_part)
538 351 : .await
539 357 : .with_context(|| {
540 UBC 0 : format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}")
541 CBC 357 : })?;
542 :
543 : {
544 : // avoiding holding it across awaits
545 357 : let mut timelines_accessor = self.timelines.lock().unwrap();
546 357 : match timelines_accessor.entry(timeline_id) {
547 : Entry::Occupied(_) => {
548 : // The uninit mark file acts as a lock that prevents another task from
549 : // initializing the timeline at the same time.
550 UBC 0 : unreachable!(
551 0 : "Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
552 0 : );
553 : }
554 CBC 357 : Entry::Vacant(v) => {
555 357 : v.insert(Arc::clone(&timeline));
556 357 : timeline.maybe_spawn_flush_loop();
557 357 : }
558 357 : }
559 357 : };
560 357 :
561 357 : // Sanity check: a timeline should have some content.
562 357 : anyhow::ensure!(
563 357 : ancestor.is_some()
564 307 : || timeline
565 307 : .layers
566 307 : .read()
567 UBC 0 : .await
568 CBC 307 : .layer_map()
569 307 : .iter_historic_layers()
570 307 : .next()
571 307 : .is_some(),
572 UBC 0 : "Timeline has no ancestor and no layer files"
573 : );
574 :
575 CBC 357 : Ok(())
576 357 : }
577 :
578 : /// Attach a tenant that's available in cloud storage.
579 : ///
580 : /// This returns quickly, after just creating the in-memory object
581 : /// Tenant struct and launching a background task to download
582 : /// the remote index files. On return, the tenant is most likely still in
583 : /// Attaching state, and it will become Active once the background task
584 : /// finishes. You can use wait_until_active() to wait for the task to
585 : /// complete.
586 : ///
587 : #[allow(clippy::too_many_arguments)]
588 736 : pub(crate) fn spawn(
589 736 : conf: &'static PageServerConf,
590 736 : tenant_shard_id: TenantShardId,
591 736 : resources: TenantSharedResources,
592 736 : attached_conf: AttachedTenantConf,
593 736 : shard_identity: ShardIdentity,
594 736 : init_order: Option<InitializationOrder>,
595 736 : tenants: &'static std::sync::RwLock<TenantsMap>,
596 736 : mode: SpawnMode,
597 736 : ctx: &RequestContext,
598 736 : ) -> anyhow::Result<Arc<Tenant>> {
599 736 : let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
600 736 : conf,
601 736 : tenant_shard_id,
602 736 : )));
603 736 :
604 736 : let TenantSharedResources {
605 736 : broker_client,
606 736 : remote_storage,
607 736 : deletion_queue_client,
608 736 : } = resources;
609 736 :
610 736 : let tenant = Arc::new(Tenant::new(
611 736 : TenantState::Attaching,
612 736 : conf,
613 736 : attached_conf,
614 736 : shard_identity,
615 736 : wal_redo_manager,
616 736 : tenant_shard_id,
617 736 : remote_storage.clone(),
618 736 : deletion_queue_client,
619 736 : ));
620 736 :
621 736 : // Do all the hard work in the background
622 736 : let tenant_clone = Arc::clone(&tenant);
623 736 :
624 736 : let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
625 736 : task_mgr::spawn(
626 736 : &tokio::runtime::Handle::current(),
627 736 : TaskKind::Attach,
628 736 : Some(tenant_shard_id),
629 736 : None,
630 736 : "attach tenant",
631 : false,
632 736 : async move {
633 736 : // Is this tenant being spawned as part of process startup?
634 736 : let starting_up = init_order.is_some();
635 728 : scopeguard::defer! {
636 728 : if starting_up {
637 194 : TENANT.startup_complete.inc();
638 534 : }
639 : }
640 :
641 : // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
642 736 : let make_broken =
643 6 : |t: &Tenant, err: anyhow::Error| {
644 6 : error!("attach failed, setting tenant state to Broken: {err:?}");
645 6 : t.state.send_modify(|state| {
646 : // The Stopping case is for when we have passed control on to DeleteTenantFlow:
647 : // if it errors, we will call make_broken when tenant is already in Stopping.
648 6 : assert!(
649 6 : matches!(*state, TenantState::Attaching | TenantState::Stopping { .. }),
650 UBC 0 : "the attach task owns the tenant state until activation is complete"
651 : );
652 :
653 CBC 6 : *state = TenantState::broken_from_reason(err.to_string());
654 6 : });
655 6 : };
656 :
657 736 : let mut init_order = init_order;
658 736 : // take the completion because initial tenant loading will complete when all of
659 736 : // these tasks complete.
660 736 : let _completion = init_order
661 736 : .as_mut()
662 736 : .and_then(|x| x.initial_tenant_load.take());
663 736 : let remote_load_completion = init_order
664 736 : .as_mut()
665 736 : .and_then(|x| x.initial_tenant_load_remote.take());
666 :
667 : enum AttachType<'a> {
668 : // During pageserver startup, we are attaching this tenant lazily in the background
669 : Warmup(tokio::sync::SemaphorePermit<'a>),
670 : // During pageserver startup, we are attaching this tenant as soon as we can,
671 : // because a client tried to access it.
672 : OnDemand,
673 : // During normal operations after startup, we are attaching a tenant.
674 : Normal,
675 : }
676 :
677 : // Before doing any I/O, wait for either or:
678 : // - A client to attempt to access to this tenant (on-demand loading)
679 : // - A permit to become available in the warmup semaphore (background warmup)
680 : //
681 : // Some-ness of init_order is how we know if we're attaching during startup or later
682 : // in process lifetime.
683 736 : let attach_type = if init_order.is_some() {
684 202 : tokio::select!(
685 : _ = tenant_clone.activate_now_sem.acquire() => {
686 3 : tracing::info!("Activating tenant (on-demand)");
687 : AttachType::OnDemand
688 : },
689 199 : permit_result = conf.concurrent_tenant_warmup.inner().acquire() => {
690 : match permit_result {
691 : Ok(p) => {
692 199 : tracing::info!("Activating tenant (warmup)");
693 : AttachType::Warmup(p)
694 : }
695 : Err(_) => {
696 : // This is unexpected: the warmup semaphore should stay alive
697 : // for the lifetime of init_order. Log a warning and proceed.
698 UBC 0 : tracing::warn!("warmup_limit semaphore unexpectedly closed");
699 : AttachType::Normal
700 : }
701 : }
702 :
703 : }
704 : _ = tenant_clone.cancel.cancelled() => {
705 : // This is safe, but should be pretty rare: it is interesting if a tenant
706 : // stayed in Activating for such a long time that shutdown found it in
707 : // that state.
708 0 : tracing::info!(state=%tenant_clone.current_state(), "Tenant shut down before activation");
709 : return Ok(());
710 : },
711 : )
712 : } else {
713 CBC 534 : AttachType::Normal
714 : };
715 :
716 736 : let preload_timer = TENANT.preload.start_timer();
717 736 : let preload = match mode {
718 : SpawnMode::Create => {
719 : // Don't count the skipped preload into the histogram of preload durations
720 430 : preload_timer.stop_and_discard();
721 430 : None
722 : },
723 : SpawnMode::Normal => {
724 306 : match &remote_storage {
725 306 : Some(remote_storage) => Some(
726 306 : match tenant_clone
727 306 : .preload(remote_storage, task_mgr::shutdown_token())
728 306 : .instrument(
729 306 : tracing::info_span!(parent: None, "attach_preload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()),
730 : )
731 1410 : .await {
732 300 : Ok(p) => {
733 300 : preload_timer.observe_duration();
734 300 : p
735 : }
736 : ,
737 6 : Err(e) => {
738 6 : make_broken(&tenant_clone, anyhow::anyhow!(e));
739 6 : return Ok(());
740 : }
741 : },
742 : ),
743 UBC 0 : None => None,
744 : }
745 : }
746 : };
747 :
748 : // Remote preload is complete.
749 CBC 730 : drop(remote_load_completion);
750 :
751 730 : let pending_deletion = {
752 730 : match DeleteTenantFlow::should_resume_deletion(
753 730 : conf,
754 730 : preload.as_ref().map(|p| p.deleting).unwrap_or(false),
755 730 : &tenant_clone,
756 730 : )
757 UBC 0 : .await
758 : {
759 CBC 730 : Ok(should_resume_deletion) => should_resume_deletion,
760 UBC 0 : Err(err) => {
761 0 : make_broken(&tenant_clone, anyhow::anyhow!(err));
762 0 : return Ok(());
763 : }
764 : }
765 : };
766 :
767 CBC 730 : info!("pending_deletion {}", pending_deletion.is_some());
768 :
769 730 : if let Some(deletion) = pending_deletion {
770 : // as we are no longer loading, signal completion by dropping
771 : // the completion while we resume deletion
772 21 : drop(_completion);
773 21 : let background_jobs_can_start =
774 21 : init_order.as_ref().map(|x| &x.background_jobs_can_start);
775 21 : if let Some(background) = background_jobs_can_start {
776 20 : info!("waiting for backgound jobs barrier");
777 20 : background.clone().wait().await;
778 20 : info!("ready for backgound jobs barrier");
779 1 : }
780 :
781 21 : match DeleteTenantFlow::resume_from_attach(
782 21 : deletion,
783 21 : &tenant_clone,
784 21 : preload,
785 21 : tenants,
786 21 : &ctx,
787 21 : )
788 763 : .await
789 : {
790 UBC 0 : Err(err) => {
791 0 : make_broken(&tenant_clone, anyhow::anyhow!(err));
792 0 : return Ok(());
793 : }
794 CBC 21 : Ok(()) => return Ok(()),
795 : }
796 709 : }
797 :
798 : // We will time the duration of the attach phase unless this is a creation (attach will do no work)
799 709 : let attach_timer = match mode {
800 430 : SpawnMode::Create => None,
801 279 : SpawnMode::Normal => {Some(TENANT.attach.start_timer())}
802 : };
803 1085 : match tenant_clone.attach(preload, &ctx).await {
804 : Ok(()) => {
805 709 : info!("attach finished, activating");
806 709 : if let Some(t)= attach_timer {t.observe_duration();}
807 709 : tenant_clone.activate(broker_client, None, &ctx);
808 : }
809 UBC 0 : Err(e) => {
810 0 : if let Some(t)= attach_timer {t.observe_duration();}
811 0 : make_broken(&tenant_clone, anyhow::anyhow!(e));
812 : }
813 : }
814 :
815 : // If we are doing an opportunistic warmup attachment at startup, initialize
816 : // logical size at the same time. This is better than starting a bunch of idle tenants
817 : // with cold caches and then coming back later to initialize their logical sizes.
818 : //
819 : // It also prevents the warmup proccess competing with the concurrency limit on
820 : // logical size calculations: if logical size calculation semaphore is saturated,
821 : // then warmup will wait for that before proceeding to the next tenant.
822 CBC 709 : if let AttachType::Warmup(_permit) = attach_type {
823 179 : let mut futs = FuturesUnordered::new();
824 179 : let timelines: Vec<_> = tenant_clone.timelines.lock().unwrap().values().cloned().collect();
825 410 : for t in timelines {
826 231 : futs.push(t.await_initial_logical_size())
827 : }
828 179 : tracing::info!("Waiting for initial logical sizes while warming up...");
829 402 : while futs.next().await.is_some() {
830 223 :
831 223 : }
832 171 : tracing::info!("Warm-up complete");
833 530 : }
834 :
835 701 : Ok(())
836 728 : }
837 736 : .instrument({
838 736 : let span = tracing::info_span!(parent: None, "attach", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug());
839 736 : span.follows_from(Span::current());
840 736 : span
841 736 : }),
842 736 : );
843 736 : Ok(tenant)
844 736 : }
845 :
846 308 : pub(crate) async fn preload(
847 308 : self: &Arc<Tenant>,
848 308 : remote_storage: &GenericRemoteStorage,
849 308 : cancel: CancellationToken,
850 308 : ) -> anyhow::Result<TenantPreload> {
851 : // Get list of remote timelines
852 : // download index files for every tenant timeline
853 308 : info!("listing remote timelines");
854 308 : let (remote_timeline_ids, other_keys) = remote_timeline_client::list_remote_timelines(
855 308 : remote_storage,
856 308 : self.tenant_shard_id,
857 308 : cancel.clone(),
858 308 : )
859 1055 : .await?;
860 :
861 302 : let deleting = other_keys.contains(TENANT_DELETED_MARKER_FILE_NAME);
862 302 : info!(
863 302 : "found {} timelines, deleting={}",
864 302 : remote_timeline_ids.len(),
865 302 : deleting
866 302 : );
867 :
868 317 : for k in other_keys {
869 15 : if k != TENANT_DELETED_MARKER_FILE_NAME {
870 UBC 0 : warn!("Unexpected non timeline key {k}");
871 CBC 15 : }
872 : }
873 :
874 : Ok(TenantPreload {
875 302 : deleting,
876 302 : timelines: self
877 302 : .load_timeline_metadata(remote_timeline_ids, remote_storage, cancel)
878 362 : .await?,
879 : })
880 308 : }
881 :
882 : ///
883 : /// Background task that downloads all data for a tenant and brings it to Active state.
884 : ///
885 : /// No background tasks are started as part of this routine.
886 : ///
887 732 : async fn attach(
888 732 : self: &Arc<Tenant>,
889 732 : preload: Option<TenantPreload>,
890 732 : ctx: &RequestContext,
891 732 : ) -> anyhow::Result<()> {
892 732 : span::debug_assert_current_span_has_tenant_id();
893 :
894 3 : failpoint_support::sleep_millis_async!("before-attaching-tenant");
895 :
896 732 : let preload = match preload {
897 302 : Some(p) => p,
898 : None => {
899 : // Deprecated dev mode: load from local disk state instead of remote storage
900 : // https://github.com/neondatabase/neon/issues/5624
901 430 : return self.load_local(ctx).await;
902 : }
903 : };
904 :
905 302 : let mut timelines_to_resume_deletions = vec![];
906 302 :
907 302 : let mut remote_index_and_client = HashMap::new();
908 302 : let mut timeline_ancestors = HashMap::new();
909 302 : let mut existent_timelines = HashSet::new();
910 696 : for (timeline_id, preload) in preload.timelines {
911 369 : let index_part = match preload.index_part {
912 369 : Ok(i) => {
913 UBC 0 : debug!("remote index part exists for timeline {timeline_id}");
914 : // We found index_part on the remote, this is the standard case.
915 CBC 369 : existent_timelines.insert(timeline_id);
916 369 : i
917 : }
918 : Err(DownloadError::NotFound) => {
919 : // There is no index_part on the remote. We only get here
920 : // if there is some prefix for the timeline in the remote storage.
921 : // This can e.g. be the initdb.tar.zst archive, maybe a
922 : // remnant from a prior incomplete creation or deletion attempt.
923 : // Delete the local directory as the deciding criterion for a
924 : // timeline's existence is presence of index_part.
925 25 : info!(%timeline_id, "index_part not found on remote");
926 25 : continue;
927 : }
928 UBC 0 : Err(e) => {
929 : // Some (possibly ephemeral) error happened during index_part download.
930 : // Pretend the timeline exists to not delete the timeline directory,
931 : // as it might be a temporary issue and we don't want to re-download
932 : // everything after it resolves.
933 0 : warn!(%timeline_id, "Failed to load index_part from remote storage, failed creation? ({e})");
934 :
935 0 : existent_timelines.insert(timeline_id);
936 0 : continue;
937 : }
938 : };
939 CBC 369 : match index_part {
940 357 : MaybeDeletedIndexPart::IndexPart(index_part) => {
941 357 : timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
942 357 : remote_index_and_client.insert(timeline_id, (index_part, preload.client));
943 357 : }
944 12 : MaybeDeletedIndexPart::Deleted(index_part) => {
945 12 : info!(
946 12 : "timeline {} is deleted, picking to resume deletion",
947 12 : timeline_id
948 12 : );
949 12 : timelines_to_resume_deletions.push((timeline_id, index_part, preload.client));
950 : }
951 : }
952 : }
953 :
954 : // For every timeline, download the metadata file, scan the local directory,
955 : // and build a layer map that contains an entry for each remote and local
956 : // layer file.
957 357 : let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
958 659 : for (timeline_id, remote_metadata) in sorted_timelines {
959 357 : let (index_part, remote_client) = remote_index_and_client
960 357 : .remove(&timeline_id)
961 357 : .expect("just put it in above");
962 357 :
963 357 : // TODO again handle early failure
964 357 : self.load_remote_timeline(
965 357 : timeline_id,
966 357 : index_part,
967 357 : remote_metadata,
968 357 : TimelineResources {
969 357 : remote_client: Some(remote_client),
970 357 : deletion_queue_client: self.deletion_queue_client.clone(),
971 357 : },
972 357 : ctx,
973 357 : )
974 696 : .await
975 357 : .with_context(|| {
976 UBC 0 : format!(
977 0 : "failed to load remote timeline {} for tenant {}",
978 0 : timeline_id, self.tenant_shard_id
979 0 : )
980 CBC 357 : })?;
981 : }
982 :
983 : // Walk through deleted timelines, resume deletion
984 314 : for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
985 12 : remote_timeline_client
986 12 : .init_upload_queue_stopped_to_continue_deletion(&index_part)
987 12 : .context("init queue stopped")
988 12 : .map_err(LoadLocalTimelineError::ResumeDeletion)?;
989 :
990 12 : DeleteTimelineFlow::resume_deletion(
991 12 : Arc::clone(self),
992 12 : timeline_id,
993 12 : &index_part.metadata,
994 12 : Some(remote_timeline_client),
995 12 : self.deletion_queue_client.clone(),
996 12 : )
997 UBC 0 : .await
998 CBC 12 : .context("resume_deletion")
999 12 : .map_err(LoadLocalTimelineError::ResumeDeletion)?;
1000 : }
1001 :
1002 : // The local filesystem contents are a cache of what's in the remote IndexPart;
1003 : // IndexPart is the source of truth.
1004 302 : self.clean_up_timelines(&existent_timelines)?;
1005 :
1006 9 : failpoint_support::sleep_millis_async!("attach-before-activate");
1007 :
1008 302 : info!("Done");
1009 :
1010 302 : Ok(())
1011 732 : }
1012 :
1013 : /// Check for any local timeline directories that are temporary, or do not correspond to a
1014 : /// timeline that still exists: this can happen if we crashed during a deletion/creation, or
1015 : /// if a timeline was deleted while the tenant was attached to a different pageserver.
1016 302 : fn clean_up_timelines(&self, existent_timelines: &HashSet<TimelineId>) -> anyhow::Result<()> {
1017 302 : let timelines_dir = self.conf.timelines_path(&self.tenant_shard_id);
1018 :
1019 302 : let entries = match timelines_dir.read_dir_utf8() {
1020 300 : Ok(d) => d,
1021 2 : Err(e) => {
1022 2 : if e.kind() == std::io::ErrorKind::NotFound {
1023 2 : return Ok(());
1024 : } else {
1025 UBC 0 : return Err(e).context("list timelines directory for tenant");
1026 : }
1027 : }
1028 : };
1029 :
1030 CBC 676 : for entry in entries {
1031 376 : let entry = entry.context("read timeline dir entry")?;
1032 376 : let entry_path = entry.path();
1033 :
1034 376 : let purge = if crate::is_temporary(entry_path)
1035 : // TODO: uninit_mark isn't needed any more, since uninitialized timelines are already
1036 : // covered by the check that the timeline must exist in remote storage.
1037 374 : || is_uninit_mark(entry_path)
1038 372 : || crate::is_delete_mark(entry_path)
1039 : {
1040 4 : true
1041 : } else {
1042 372 : match TimelineId::try_from(entry_path.file_name()) {
1043 372 : Ok(i) => {
1044 372 : // Purge if the timeline ID does not exist in remote storage: remote storage is the authority.
1045 372 : !existent_timelines.contains(&i)
1046 : }
1047 UBC 0 : Err(e) => {
1048 0 : tracing::warn!(
1049 0 : "Unparseable directory in timelines directory: {entry_path}, ignoring ({e})"
1050 0 : );
1051 : // Do not purge junk: if we don't recognize it, be cautious and leave it for a human.
1052 0 : false
1053 : }
1054 : }
1055 : };
1056 :
1057 CBC 376 : if purge {
1058 9 : tracing::info!("Purging stale timeline dentry {entry_path}");
1059 9 : if let Err(e) = match entry.file_type() {
1060 9 : Ok(t) => if t.is_dir() {
1061 7 : std::fs::remove_dir_all(entry_path)
1062 : } else {
1063 2 : std::fs::remove_file(entry_path)
1064 : }
1065 9 : .or_else(fs_ext::ignore_not_found),
1066 UBC 0 : Err(e) => Err(e),
1067 : } {
1068 LBC (1) : tracing::warn!("Failed to purge stale timeline dentry {entry_path}: {e}");
1069 CBC 9 : }
1070 367 : }
1071 : }
1072 :
1073 300 : Ok(())
1074 302 : }
1075 :
1076 : /// Get sum of all remote timelines sizes
1077 : ///
1078 : /// This function relies on the index_part instead of listing the remote storage
1079 27 : pub fn remote_size(&self) -> u64 {
1080 27 : let mut size = 0;
1081 :
1082 27 : for timeline in self.list_timelines() {
1083 18 : if let Some(remote_client) = &timeline.remote_client {
1084 18 : size += remote_client.get_remote_physical_size();
1085 18 : }
1086 : }
1087 :
1088 27 : size
1089 27 : }
1090 :
1091 357 : #[instrument(skip_all, fields(timeline_id=%timeline_id))]
1092 : async fn load_remote_timeline(
1093 : &self,
1094 : timeline_id: TimelineId,
1095 : index_part: IndexPart,
1096 : remote_metadata: TimelineMetadata,
1097 : resources: TimelineResources,
1098 : ctx: &RequestContext,
1099 : ) -> anyhow::Result<()> {
1100 : span::debug_assert_current_span_has_tenant_id();
1101 :
1102 357 : info!("downloading index file for timeline {}", timeline_id);
1103 : tokio::fs::create_dir_all(self.conf.timeline_path(&self.tenant_shard_id, &timeline_id))
1104 : .await
1105 : .context("Failed to create new timeline directory")?;
1106 :
1107 : let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
1108 : let timelines = self.timelines.lock().unwrap();
1109 : Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
1110 UBC 0 : || {
1111 0 : anyhow::anyhow!(
1112 0 : "cannot find ancestor timeline {ancestor_id} for timeline {timeline_id}"
1113 0 : )
1114 0 : },
1115 : )?))
1116 : } else {
1117 : None
1118 : };
1119 :
1120 : // timeline loading after attach expects to find metadata file for each metadata
1121 : save_metadata(
1122 : self.conf,
1123 : &self.tenant_shard_id,
1124 : &timeline_id,
1125 : &remote_metadata,
1126 : )
1127 : .await
1128 : .context("save_metadata")
1129 : .map_err(LoadLocalTimelineError::Load)?;
1130 :
1131 : self.timeline_init_and_sync(
1132 : timeline_id,
1133 : resources,
1134 : Some(index_part),
1135 : remote_metadata,
1136 : ancestor,
1137 : ctx,
1138 : )
1139 : .await
1140 : }
1141 :
1142 : /// Create a placeholder Tenant object for a broken tenant
1143 0 : pub fn create_broken_tenant(
1144 0 : conf: &'static PageServerConf,
1145 0 : tenant_shard_id: TenantShardId,
1146 0 : reason: String,
1147 0 : ) -> Arc<Tenant> {
1148 0 : let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
1149 0 : conf,
1150 0 : tenant_shard_id,
1151 0 : )));
1152 0 : Arc::new(Tenant::new(
1153 0 : TenantState::Broken {
1154 0 : reason,
1155 0 : backtrace: String::new(),
1156 0 : },
1157 0 : conf,
1158 0 : AttachedTenantConf::try_from(LocationConf::default()).unwrap(),
1159 0 : // Shard identity isn't meaningful for a broken tenant: it's just a placeholder
1160 0 : // to occupy the slot for this TenantShardId.
1161 0 : ShardIdentity::broken(tenant_shard_id.shard_number, tenant_shard_id.shard_count),
1162 0 : wal_redo_manager,
1163 0 : tenant_shard_id,
1164 0 : None,
1165 0 : DeletionQueueClient::broken(),
1166 0 : ))
1167 0 : }
1168 :
1169 CBC 470 : fn scan_and_sort_timelines_dir(self: Arc<Tenant>) -> anyhow::Result<TenantDirectoryScan> {
1170 470 : let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
1171 470 : // Note timelines_to_resume_deletion needs to be separate because it can be not sortable
1172 470 : // from the point of `tree_sort_timelines`. I e some parents can be missing because deletion
1173 470 : // completed in non topological order (for example because parent has smaller number of layer files in it)
1174 470 : let mut timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)> = vec![];
1175 470 :
1176 470 : let timelines_dir = self.conf.timelines_path(&self.tenant_shard_id);
1177 :
1178 470 : for entry in timelines_dir
1179 470 : .read_dir_utf8()
1180 470 : .context("list timelines directory for tenant")?
1181 : {
1182 3 : let entry = entry.context("read timeline dir entry")?;
1183 3 : let timeline_dir = entry.path();
1184 3 :
1185 3 : if crate::is_temporary(timeline_dir) {
1186 UBC 0 : info!("Found temporary timeline directory, removing: {timeline_dir}");
1187 0 : if let Err(e) = std::fs::remove_dir_all(timeline_dir) {
1188 0 : error!("Failed to remove temporary directory '{timeline_dir}': {e:?}");
1189 0 : }
1190 CBC 3 : } else if is_uninit_mark(timeline_dir) {
1191 1 : if !timeline_dir.exists() {
1192 GBC 1 : warn!("Timeline dir entry become invalid: {timeline_dir}");
1193 1 : continue;
1194 LBC (1) : }
1195 (1) :
1196 (1) : let timeline_uninit_mark_file = &timeline_dir;
1197 (1) : info!(
1198 (1) : "Found an uninit mark file {timeline_uninit_mark_file}, removing the timeline and its uninit mark",
1199 (1) : );
1200 (1) : let timeline_id =
1201 (1) : TimelineId::try_from(timeline_uninit_mark_file.file_stem())
1202 (1) : .with_context(|| {
1203 UBC 0 : format!(
1204 0 : "Could not parse timeline id out of the timeline uninit mark name {timeline_uninit_mark_file}",
1205 0 : )
1206 LBC (1) : })?;
1207 (1) : let timeline_dir = self.conf.timeline_path(&self.tenant_shard_id, &timeline_id);
1208 UBC 0 : if let Err(e) =
1209 LBC (1) : remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
1210 : {
1211 UBC 0 : error!("Failed to clean up uninit marked timeline: {e:?}");
1212 LBC (1) : }
1213 CBC 2 : } else if crate::is_delete_mark(timeline_dir) {
1214 : // If metadata exists, load as usual, continue deletion
1215 UBC 0 : let timeline_id = TimelineId::try_from(timeline_dir.file_stem())
1216 0 : .with_context(|| {
1217 0 : format!(
1218 0 : "Could not parse timeline id out of the timeline uninit mark name {timeline_dir}",
1219 0 : )
1220 0 : })?;
1221 :
1222 0 : info!("Found deletion mark for timeline {}", timeline_id);
1223 :
1224 0 : match load_metadata(self.conf, &self.tenant_shard_id, &timeline_id) {
1225 0 : Ok(metadata) => {
1226 0 : timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
1227 : }
1228 0 : Err(e) => match &e {
1229 0 : LoadMetadataError::Read(r) => {
1230 0 : if r.kind() != io::ErrorKind::NotFound {
1231 0 : return Err(anyhow::anyhow!(e)).with_context(|| {
1232 0 : format!("Failed to load metadata for timeline_id {timeline_id}")
1233 0 : });
1234 0 : }
1235 0 :
1236 0 : // If metadata doesnt exist it means that we've crashed without
1237 0 : // completing cleanup_remaining_timeline_fs_traces in DeleteTimelineFlow.
1238 0 : // So save timeline_id for later call to `DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`.
1239 0 : // We cant do it here because the method is async so we'd need block_on
1240 0 : // and here we're in spawn_blocking. cleanup_remaining_timeline_fs_traces uses fs operations
1241 0 : // so that basically results in a cycle:
1242 0 : // spawn_blocking
1243 0 : // - block_on
1244 0 : // - spawn_blocking
1245 0 : // which can lead to running out of threads in blocing pool.
1246 0 : timelines_to_resume_deletion.push((timeline_id, None));
1247 : }
1248 : _ => {
1249 0 : return Err(anyhow::anyhow!(e)).with_context(|| {
1250 0 : format!("Failed to load metadata for timeline_id {timeline_id}")
1251 0 : })
1252 : }
1253 : },
1254 : }
1255 : } else {
1256 CBC 2 : if !timeline_dir.exists() {
1257 LBC (1) : warn!("Timeline dir entry become invalid: {timeline_dir}");
1258 (1) : continue;
1259 CBC 2 : }
1260 2 : let timeline_id = TimelineId::try_from(timeline_dir.file_name())
1261 2 : .with_context(|| {
1262 UBC 0 : format!(
1263 0 : "Could not parse timeline id out of the timeline dir name {timeline_dir}",
1264 0 : )
1265 CBC 2 : })?;
1266 2 : let timeline_uninit_mark_file = self
1267 2 : .conf
1268 2 : .timeline_uninit_mark_file_path(self.tenant_shard_id, timeline_id);
1269 2 : if timeline_uninit_mark_file.exists() {
1270 GBC 1 : info!(
1271 1 : %timeline_id,
1272 1 : "Found an uninit mark file, removing the timeline and its uninit mark",
1273 1 : );
1274 UBC 0 : if let Err(e) =
1275 GBC 1 : remove_timeline_and_uninit_mark(timeline_dir, &timeline_uninit_mark_file)
1276 : {
1277 UBC 0 : error!("Failed to clean up uninit marked timeline: {e:?}");
1278 GBC 1 : }
1279 1 : continue;
1280 CBC 1 : }
1281 1 :
1282 1 : let timeline_delete_mark_file = self
1283 1 : .conf
1284 1 : .timeline_delete_mark_file_path(self.tenant_shard_id, timeline_id);
1285 1 : if timeline_delete_mark_file.exists() {
1286 : // Cleanup should be done in `is_delete_mark` branch above
1287 UBC 0 : continue;
1288 CBC 1 : }
1289 1 :
1290 1 : let file_name = entry.file_name();
1291 1 : if let Ok(timeline_id) = file_name.parse::<TimelineId>() {
1292 1 : let metadata = load_metadata(self.conf, &self.tenant_shard_id, &timeline_id)
1293 1 : .context("failed to load metadata")?;
1294 UBC 0 : timelines_to_load.insert(timeline_id, metadata);
1295 : } else {
1296 : // A file or directory that doesn't look like a timeline ID
1297 0 : warn!("unexpected file or directory in timelines directory: {file_name}");
1298 : }
1299 : }
1300 : }
1301 :
1302 : // Sort the array of timeline IDs into tree-order, so that parent comes before
1303 : // all its children.
1304 CBC 469 : tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
1305 469 : TenantDirectoryScan {
1306 469 : sorted_timelines_to_load: sorted_timelines,
1307 469 : timelines_to_resume_deletion,
1308 469 : }
1309 469 : })
1310 470 : }
1311 :
1312 302 : async fn load_timeline_metadata(
1313 302 : self: &Arc<Tenant>,
1314 302 : timeline_ids: HashSet<TimelineId>,
1315 302 : remote_storage: &GenericRemoteStorage,
1316 302 : cancel: CancellationToken,
1317 302 : ) -> anyhow::Result<HashMap<TimelineId, TimelinePreload>> {
1318 302 : let mut part_downloads = JoinSet::new();
1319 696 : for timeline_id in timeline_ids {
1320 394 : let client = RemoteTimelineClient::new(
1321 394 : remote_storage.clone(),
1322 394 : self.deletion_queue_client.clone(),
1323 394 : self.conf,
1324 394 : self.tenant_shard_id,
1325 394 : timeline_id,
1326 394 : self.generation,
1327 394 : );
1328 394 : let cancel_clone = cancel.clone();
1329 394 : part_downloads.spawn(
1330 394 : async move {
1331 394 : debug!("starting index part download");
1332 :
1333 2975 : let index_part = client.download_index_file(cancel_clone).await;
1334 :
1335 394 : debug!("finished index part download");
1336 :
1337 394 : Result::<_, anyhow::Error>::Ok(TimelinePreload {
1338 394 : client,
1339 394 : timeline_id,
1340 394 : index_part,
1341 394 : })
1342 394 : }
1343 394 : .map(move |res| {
1344 394 : res.with_context(|| format!("download index part for timeline {timeline_id}"))
1345 394 : })
1346 394 : .instrument(info_span!("download_index_part", %timeline_id)),
1347 : );
1348 : }
1349 :
1350 302 : let mut timeline_preloads: HashMap<TimelineId, TimelinePreload> = HashMap::new();
1351 :
1352 696 : loop {
1353 1058 : tokio::select!(
1354 696 : next = part_downloads.join_next() => {
1355 : match next {
1356 : Some(result) => {
1357 : let preload_result = result.context("join preload task")?;
1358 : let preload = preload_result?;
1359 : timeline_preloads.insert(preload.timeline_id, preload);
1360 : },
1361 : None => {
1362 : break;
1363 : }
1364 : }
1365 : },
1366 : _ = cancel.cancelled() => {
1367 : anyhow::bail!("Cancelled while waiting for remote index download")
1368 : }
1369 696 : )
1370 696 : }
1371 :
1372 302 : Ok(timeline_preloads)
1373 302 : }
1374 :
1375 : ///
1376 : /// Background task to load in-memory data structures for this tenant, from
1377 : /// files on disk. Used at pageserver startup.
1378 : ///
1379 : /// No background tasks are started as part of this routine.
1380 470 : async fn load_local(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
1381 470 : span::debug_assert_current_span_has_tenant_id();
1382 :
1383 UBC 0 : debug!("loading tenant task");
1384 :
1385 : // Load in-memory state to reflect the local files on disk
1386 : //
1387 : // Scan the directory, peek into the metadata file of each timeline, and
1388 : // collect a list of timelines and their ancestors.
1389 CBC 470 : let span = info_span!("blocking");
1390 470 : let cloned = Arc::clone(self);
1391 :
1392 470 : let scan = tokio::task::spawn_blocking(move || {
1393 470 : let _g = span.entered();
1394 470 : cloned.scan_and_sort_timelines_dir()
1395 470 : })
1396 470 : .await
1397 470 : .context("load spawn_blocking")
1398 470 : .and_then(|res| res)?;
1399 :
1400 : // FIXME original collect_timeline_files contained one more check:
1401 : // 1. "Timeline has no ancestor and no layer files"
1402 :
1403 : // Process loadable timelines first
1404 469 : for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
1405 UBC 0 : if let Err(e) = self
1406 0 : .load_local_timeline(timeline_id, local_metadata, ctx, false)
1407 0 : .await
1408 : {
1409 0 : match e {
1410 0 : LoadLocalTimelineError::Load(source) => {
1411 0 : return Err(anyhow::anyhow!(source)).with_context(|| {
1412 0 : format!("Failed to load local timeline: {timeline_id}")
1413 0 : })
1414 : }
1415 0 : LoadLocalTimelineError::ResumeDeletion(source) => {
1416 : // Make sure resumed deletion wont fail loading for entire tenant.
1417 0 : error!("Failed to resume timeline deletion: {source:#}")
1418 : }
1419 : }
1420 0 : }
1421 : }
1422 :
1423 : // Resume deletion ones with deleted_mark
1424 CBC 469 : for (timeline_id, maybe_local_metadata) in scan.timelines_to_resume_deletion {
1425 UBC 0 : match maybe_local_metadata {
1426 : None => {
1427 : // See comment in `scan_and_sort_timelines_dir`.
1428 0 : if let Err(e) =
1429 0 : DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id)
1430 0 : .await
1431 : {
1432 0 : warn!(
1433 0 : "cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
1434 0 : timeline_id, e
1435 0 : );
1436 0 : }
1437 : }
1438 0 : Some(local_metadata) => {
1439 0 : if let Err(e) = self
1440 0 : .load_local_timeline(timeline_id, local_metadata, ctx, true)
1441 0 : .await
1442 : {
1443 0 : match e {
1444 0 : LoadLocalTimelineError::Load(source) => {
1445 0 : // We tried to load deleted timeline, this is a bug.
1446 0 : return Err(anyhow::anyhow!(source).context(
1447 0 : format!("This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}")
1448 0 : ));
1449 : }
1450 0 : LoadLocalTimelineError::ResumeDeletion(source) => {
1451 : // Make sure resumed deletion wont fail loading for entire tenant.
1452 0 : error!("Failed to resume timeline deletion: {source:#}")
1453 : }
1454 : }
1455 0 : }
1456 : }
1457 : }
1458 : }
1459 :
1460 0 : trace!("Done");
1461 :
1462 CBC 469 : Ok(())
1463 470 : }
1464 :
1465 : /// Subroutine of `load_tenant`, to load an individual timeline
1466 : ///
1467 : /// NB: The parent is assumed to be already loaded!
1468 UBC 0 : #[instrument(skip(self, local_metadata, ctx))]
1469 : async fn load_local_timeline(
1470 : self: &Arc<Self>,
1471 : timeline_id: TimelineId,
1472 : local_metadata: TimelineMetadata,
1473 : ctx: &RequestContext,
1474 : found_delete_mark: bool,
1475 : ) -> Result<(), LoadLocalTimelineError> {
1476 : span::debug_assert_current_span_has_tenant_id();
1477 :
1478 : let resources = self.build_timeline_resources(timeline_id);
1479 :
1480 : if found_delete_mark {
1481 : // There is no remote client, we found local metadata.
1482 : // Continue cleaning up local disk.
1483 : DeleteTimelineFlow::resume_deletion(
1484 : Arc::clone(self),
1485 : timeline_id,
1486 : &local_metadata,
1487 : None,
1488 : self.deletion_queue_client.clone(),
1489 : )
1490 : .await
1491 : .context("resume deletion")
1492 : .map_err(LoadLocalTimelineError::ResumeDeletion)?;
1493 : return Ok(());
1494 : }
1495 :
1496 : let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
1497 : let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
1498 0 : .with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))
1499 : .map_err(LoadLocalTimelineError::Load)?;
1500 : Some(ancestor_timeline)
1501 : } else {
1502 : None
1503 : };
1504 :
1505 : self.timeline_init_and_sync(timeline_id, resources, None, local_metadata, ancestor, ctx)
1506 : .await
1507 : .map_err(LoadLocalTimelineError::Load)
1508 : }
1509 :
1510 CBC 891 : pub(crate) fn tenant_id(&self) -> TenantId {
1511 891 : self.tenant_shard_id.tenant_id
1512 891 : }
1513 :
1514 6 : pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
1515 6 : self.tenant_shard_id
1516 6 : }
1517 :
1518 : /// Get Timeline handle for given Neon timeline ID.
1519 : /// This function is idempotent. It doesn't change internal state in any way.
1520 14488 : pub fn get_timeline(
1521 14488 : &self,
1522 14488 : timeline_id: TimelineId,
1523 14488 : active_only: bool,
1524 14488 : ) -> Result<Arc<Timeline>, GetTimelineError> {
1525 14488 : let timelines_accessor = self.timelines.lock().unwrap();
1526 14488 : let timeline = timelines_accessor
1527 14488 : .get(&timeline_id)
1528 14488 : .ok_or(GetTimelineError::NotFound {
1529 14488 : tenant_id: self.tenant_shard_id.tenant_id,
1530 14488 : timeline_id,
1531 14488 : })?;
1532 :
1533 14437 : if active_only && !timeline.is_active() {
1534 1 : Err(GetTimelineError::NotActive {
1535 1 : tenant_id: self.tenant_shard_id.tenant_id,
1536 1 : timeline_id,
1537 1 : state: timeline.current_state(),
1538 1 : })
1539 : } else {
1540 14436 : Ok(Arc::clone(timeline))
1541 : }
1542 14488 : }
1543 :
1544 : /// Lists timelines the tenant contains.
1545 : /// Up to tenant's implementation to omit certain timelines that ar not considered ready for use.
1546 730 : pub fn list_timelines(&self) -> Vec<Arc<Timeline>> {
1547 730 : self.timelines
1548 730 : .lock()
1549 730 : .unwrap()
1550 730 : .values()
1551 730 : .map(Arc::clone)
1552 730 : .collect()
1553 730 : }
1554 :
1555 466 : pub fn list_timeline_ids(&self) -> Vec<TimelineId> {
1556 466 : self.timelines.lock().unwrap().keys().cloned().collect()
1557 466 : }
1558 :
1559 : /// This is used to create the initial 'main' timeline during bootstrapping,
1560 : /// or when importing a new base backup. The caller is expected to load an
1561 : /// initial image of the datadir to the new timeline after this.
1562 : ///
1563 : /// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
1564 : /// and the timeline will fail to load at a restart.
1565 : ///
1566 : /// That's why we add an uninit mark file, and wrap it together witht the Timeline
1567 : /// in-memory object into UninitializedTimeline.
1568 : /// Once the caller is done setting up the timeline, they should call
1569 : /// `UninitializedTimeline::initialize_with_lock` to remove the uninit mark.
1570 : ///
1571 : /// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
1572 : /// minimum amount of keys required to get a writable timeline.
1573 : /// (Without it, `put` might fail due to `repartition` failing.)
1574 48 : pub(crate) async fn create_empty_timeline(
1575 48 : &self,
1576 48 : new_timeline_id: TimelineId,
1577 48 : initdb_lsn: Lsn,
1578 48 : pg_version: u32,
1579 48 : _ctx: &RequestContext,
1580 48 : ) -> anyhow::Result<UninitializedTimeline> {
1581 48 : anyhow::ensure!(
1582 48 : self.is_active(),
1583 UBC 0 : "Cannot create empty timelines on inactive tenant"
1584 : );
1585 :
1586 CBC 48 : let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id)?;
1587 47 : let new_metadata = TimelineMetadata::new(
1588 47 : // Initialize disk_consistent LSN to 0, The caller must import some data to
1589 47 : // make it valid, before calling finish_creation()
1590 47 : Lsn(0),
1591 47 : None,
1592 47 : None,
1593 47 : Lsn(0),
1594 47 : initdb_lsn,
1595 47 : initdb_lsn,
1596 47 : pg_version,
1597 47 : );
1598 47 : self.prepare_new_timeline(
1599 47 : new_timeline_id,
1600 47 : &new_metadata,
1601 47 : timeline_uninit_mark,
1602 47 : initdb_lsn,
1603 47 : None,
1604 47 : )
1605 UBC 0 : .await
1606 CBC 48 : }
1607 :
1608 : /// Helper for unit tests to create an empty timeline.
1609 : ///
1610 : /// The timeline is has state value `Active` but its background loops are not running.
1611 : // This makes the various functions which anyhow::ensure! for Active state work in tests.
1612 : // Our current tests don't need the background loops.
1613 : #[cfg(test)]
1614 34 : pub async fn create_test_timeline(
1615 34 : &self,
1616 34 : new_timeline_id: TimelineId,
1617 34 : initdb_lsn: Lsn,
1618 34 : pg_version: u32,
1619 34 : ctx: &RequestContext,
1620 34 : ) -> anyhow::Result<Arc<Timeline>> {
1621 34 : let uninit_tl = self
1622 34 : .create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
1623 UBC 0 : .await?;
1624 CBC 34 : let tline = uninit_tl.raw_timeline().expect("we just created it");
1625 34 : assert_eq!(tline.get_last_record_lsn(), Lsn(0));
1626 :
1627 : // Setup minimum keys required for the timeline to be usable.
1628 34 : let mut modification = tline.begin_modification(initdb_lsn);
1629 34 : modification
1630 34 : .init_empty_test_timeline()
1631 34 : .context("init_empty_test_timeline")?;
1632 34 : modification
1633 34 : .commit(ctx)
1634 UBC 0 : .await
1635 CBC 34 : .context("commit init_empty_test_timeline modification")?;
1636 :
1637 : // Flush to disk so that uninit_tl's check for valid disk_consistent_lsn passes.
1638 34 : tline.maybe_spawn_flush_loop();
1639 34 : tline.freeze_and_flush().await.context("freeze_and_flush")?;
1640 :
1641 : // Make sure the freeze_and_flush reaches remote storage.
1642 34 : tline
1643 34 : .remote_client
1644 34 : .as_ref()
1645 34 : .unwrap()
1646 34 : .wait_completion()
1647 9 : .await
1648 34 : .unwrap();
1649 :
1650 34 : let tl = uninit_tl.finish_creation()?;
1651 : // The non-test code would call tl.activate() here.
1652 34 : tl.set_state(TimelineState::Active);
1653 34 : Ok(tl)
1654 34 : }
1655 :
1656 : /// Create a new timeline.
1657 : ///
1658 : /// Returns the new timeline ID and reference to its Timeline object.
1659 : ///
1660 : /// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
1661 : /// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
1662 : #[allow(clippy::too_many_arguments)]
1663 783 : pub(crate) async fn create_timeline(
1664 783 : &self,
1665 783 : new_timeline_id: TimelineId,
1666 783 : ancestor_timeline_id: Option<TimelineId>,
1667 783 : mut ancestor_start_lsn: Option<Lsn>,
1668 783 : pg_version: u32,
1669 783 : load_existing_initdb: Option<TimelineId>,
1670 783 : broker_client: storage_broker::BrokerClientChannel,
1671 783 : ctx: &RequestContext,
1672 783 : ) -> Result<Arc<Timeline>, CreateTimelineError> {
1673 783 : if !self.is_active() {
1674 UBC 0 : return Err(CreateTimelineError::Other(anyhow::anyhow!(
1675 0 : "Cannot create timelines on inactive tenant"
1676 0 : )));
1677 CBC 783 : }
1678 :
1679 783 : let _gate = self
1680 783 : .gate
1681 783 : .enter()
1682 783 : .map_err(|_| CreateTimelineError::ShuttingDown)?;
1683 :
1684 : // Get exclusive access to the timeline ID: this ensures that it does not already exist,
1685 : // and that no other creation attempts will be allowed in while we are working. The
1686 : // uninit_mark is a guard.
1687 783 : let uninit_mark = match self.create_timeline_uninit_mark(new_timeline_id) {
1688 783 : Ok(m) => m,
1689 : Err(TimelineExclusionError::AlreadyCreating) => {
1690 : // Creation is in progress, we cannot create it again, and we cannot
1691 : // check if this request matches the existing one, so caller must try
1692 : // again later.
1693 UBC 0 : return Err(CreateTimelineError::AlreadyCreating);
1694 : }
1695 0 : Err(TimelineExclusionError::Other(e)) => {
1696 0 : return Err(CreateTimelineError::Other(e));
1697 : }
1698 0 : Err(TimelineExclusionError::AlreadyExists(existing)) => {
1699 0 : debug!("timeline {new_timeline_id} already exists");
1700 :
1701 : // Idempotency: creating the same timeline twice is not an error, unless
1702 : // the second creation has different parameters.
1703 0 : if existing.get_ancestor_timeline_id() != ancestor_timeline_id
1704 0 : || existing.pg_version != pg_version
1705 0 : || (ancestor_start_lsn.is_some()
1706 0 : && ancestor_start_lsn != Some(existing.get_ancestor_lsn()))
1707 : {
1708 0 : return Err(CreateTimelineError::Conflict);
1709 0 : }
1710 :
1711 0 : if let Some(remote_client) = existing.remote_client.as_ref() {
1712 : // Wait for uploads to complete, so that when we return Ok, the timeline
1713 : // is known to be durable on remote storage. Just like we do at the end of
1714 : // this function, after we have created the timeline ourselves.
1715 : //
1716 : // We only really care that the initial version of `index_part.json` has
1717 : // been uploaded. That's enough to remember that the timeline
1718 : // exists. However, there is no function to wait specifically for that so
1719 : // we just wait for all in-progress uploads to finish.
1720 0 : remote_client
1721 0 : .wait_completion()
1722 0 : .await
1723 0 : .context("wait for timeline uploads to complete")?;
1724 0 : }
1725 :
1726 0 : return Ok(existing);
1727 : }
1728 : };
1729 :
1730 CBC 783 : let loaded_timeline = match ancestor_timeline_id {
1731 258 : Some(ancestor_timeline_id) => {
1732 258 : let ancestor_timeline = self
1733 258 : .get_timeline(ancestor_timeline_id, false)
1734 258 : .context("Cannot branch off the timeline that's not present in pageserver")?;
1735 :
1736 : // instead of waiting around, just deny the request because ancestor is not yet
1737 : // ready for other purposes either.
1738 258 : if !ancestor_timeline.is_active() {
1739 6 : return Err(CreateTimelineError::AncestorNotActive);
1740 252 : }
1741 :
1742 252 : if let Some(lsn) = ancestor_start_lsn.as_mut() {
1743 30 : *lsn = lsn.align();
1744 30 :
1745 30 : let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn();
1746 30 : if ancestor_ancestor_lsn > *lsn {
1747 : // can we safely just branch from the ancestor instead?
1748 2 : return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
1749 2 : "invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
1750 2 : lsn,
1751 2 : ancestor_timeline_id,
1752 2 : ancestor_ancestor_lsn,
1753 2 : )));
1754 28 : }
1755 28 :
1756 28 : // Wait for the WAL to arrive and be processed on the parent branch up
1757 28 : // to the requested branch point. The repository code itself doesn't
1758 28 : // require it, but if we start to receive WAL on the new timeline,
1759 28 : // decoding the new WAL might need to look up previous pages, relation
1760 28 : // sizes etc. and that would get confused if the previous page versions
1761 28 : // are not in the repository yet.
1762 28 : ancestor_timeline
1763 28 : .wait_lsn(*lsn, ctx)
1764 10 : .await
1765 28 : .map_err(|e| match e {
1766 UBC 0 : e @ (WaitLsnError::Timeout(_) | WaitLsnError::BadState) => {
1767 0 : CreateTimelineError::AncestorLsn(anyhow::anyhow!(e))
1768 : }
1769 0 : WaitLsnError::Shutdown => CreateTimelineError::ShuttingDown,
1770 CBC 28 : })?;
1771 222 : }
1772 :
1773 250 : self.branch_timeline(
1774 250 : &ancestor_timeline,
1775 250 : new_timeline_id,
1776 250 : ancestor_start_lsn,
1777 250 : uninit_mark,
1778 250 : ctx,
1779 250 : )
1780 8 : .await?
1781 : }
1782 : None => {
1783 525 : self.bootstrap_timeline(
1784 525 : new_timeline_id,
1785 525 : pg_version,
1786 525 : load_existing_initdb,
1787 525 : uninit_mark,
1788 525 : ctx,
1789 525 : )
1790 6921368 : .await?
1791 : }
1792 : };
1793 :
1794 : // At this point we have dropped our guard on [`Self::timelines_creating`], and
1795 : // the timeline is visible in [`Self::timelines`], but it is _not_ durable yet. We must
1796 : // not send a success to the caller until it is. The same applies to handling retries,
1797 : // see the handling of [`TimelineExclusionError::AlreadyExists`] above.
1798 763 : if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
1799 763 : let kind = ancestor_timeline_id
1800 763 : .map(|_| "branched")
1801 763 : .unwrap_or("bootstrapped");
1802 763 : remote_client.wait_completion().await.with_context(|| {
1803 UBC 0 : format!("wait for {} timeline initial uploads to complete", kind)
1804 CBC 759 : })?;
1805 UBC 0 : }
1806 :
1807 CBC 759 : loaded_timeline.activate(broker_client, None, ctx);
1808 759 :
1809 759 : Ok(loaded_timeline)
1810 777 : }
1811 :
1812 60 : pub(crate) async fn delete_timeline(
1813 60 : self: Arc<Self>,
1814 60 : timeline_id: TimelineId,
1815 60 : ) -> Result<(), DeleteTimelineError> {
1816 346 : DeleteTimelineFlow::run(&self, timeline_id, false).await?;
1817 :
1818 51 : Ok(())
1819 60 : }
1820 :
1821 : /// perform one garbage collection iteration, removing old data files from disk.
1822 : /// this function is periodically called by gc task.
1823 : /// also it can be explicitly requested through page server api 'do_gc' command.
1824 : ///
1825 : /// `target_timeline_id` specifies the timeline to GC, or None for all.
1826 : ///
1827 : /// The `horizon` an `pitr` parameters determine how much WAL history needs to be retained.
1828 : /// Also known as the retention period, or the GC cutoff point. `horizon` specifies
1829 : /// the amount of history, as LSN difference from current latest LSN on each timeline.
1830 : /// `pitr` specifies the same as a time difference from the current time. The effective
1831 : /// GC cutoff point is determined conservatively by either `horizon` and `pitr`, whichever
1832 : /// requires more history to be retained.
1833 : //
1834 366 : pub async fn gc_iteration(
1835 366 : &self,
1836 366 : target_timeline_id: Option<TimelineId>,
1837 366 : horizon: u64,
1838 366 : pitr: Duration,
1839 366 : cancel: &CancellationToken,
1840 366 : ctx: &RequestContext,
1841 366 : ) -> anyhow::Result<GcResult> {
1842 366 : // Don't start doing work during shutdown
1843 366 : if let TenantState::Stopping { .. } = self.current_state() {
1844 UBC 0 : return Ok(GcResult::default());
1845 CBC 366 : }
1846 366 :
1847 366 : // there is a global allowed_error for this
1848 366 : anyhow::ensure!(
1849 366 : self.is_active(),
1850 UBC 0 : "Cannot run GC iteration on inactive tenant"
1851 : );
1852 :
1853 : {
1854 CBC 366 : let conf = self.tenant_conf.read().unwrap();
1855 366 :
1856 366 : if !conf.location.may_delete_layers_hint() {
1857 GBC 1 : info!("Skipping GC in location state {:?}", conf.location);
1858 1 : return Ok(GcResult::default());
1859 CBC 365 : }
1860 365 : }
1861 365 :
1862 365 : self.gc_iteration_internal(target_timeline_id, horizon, pitr, cancel, ctx)
1863 236069 : .await
1864 360 : }
1865 :
1866 : /// Perform one compaction iteration.
1867 : /// This function is periodically called by compactor task.
1868 : /// Also it can be explicitly requested per timeline through page server
1869 : /// api's 'compact' command.
1870 334 : async fn compaction_iteration(
1871 334 : &self,
1872 334 : cancel: &CancellationToken,
1873 334 : ctx: &RequestContext,
1874 334 : ) -> anyhow::Result<(), timeline::CompactionError> {
1875 334 : // Don't start doing work during shutdown, or when broken, we do not need those in the logs
1876 334 : if !self.is_active() {
1877 2 : return Ok(());
1878 332 : }
1879 332 :
1880 332 : {
1881 332 : let conf = self.tenant_conf.read().unwrap();
1882 332 : if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
1883 9 : info!("Skipping compaction in location state {:?}", conf.location);
1884 9 : return Ok(());
1885 323 : }
1886 323 : }
1887 323 :
1888 323 : // Scan through the hashmap and collect a list of all the timelines,
1889 323 : // while holding the lock. Then drop the lock and actually perform the
1890 323 : // compactions. We don't want to block everything else while the
1891 323 : // compaction runs.
1892 323 : let timelines_to_compact = {
1893 323 : let timelines = self.timelines.lock().unwrap();
1894 323 : let timelines_to_compact = timelines
1895 323 : .iter()
1896 530 : .filter_map(|(timeline_id, timeline)| {
1897 530 : if timeline.is_active() {
1898 528 : Some((*timeline_id, timeline.clone()))
1899 : } else {
1900 2 : None
1901 : }
1902 530 : })
1903 323 : .collect::<Vec<_>>();
1904 323 : drop(timelines);
1905 323 : timelines_to_compact
1906 : };
1907 :
1908 835 : for (timeline_id, timeline) in &timelines_to_compact {
1909 521 : timeline
1910 521 : .compact(cancel, EnumSet::empty(), ctx)
1911 521 : .instrument(info_span!("compact_timeline", %timeline_id))
1912 90272 : .await?;
1913 : }
1914 :
1915 314 : Ok(())
1916 325 : }
1917 :
1918 20287 : pub fn current_state(&self) -> TenantState {
1919 20287 : self.state.borrow().clone()
1920 20287 : }
1921 :
1922 2416 : pub fn is_active(&self) -> bool {
1923 2416 : self.current_state() == TenantState::Active
1924 2416 : }
1925 :
1926 : /// Changes tenant status to active, unless shutdown was already requested.
1927 : ///
1928 : /// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
1929 : /// to delay background jobs. Background jobs can be started right away when None is given.
1930 709 : fn activate(
1931 709 : self: &Arc<Self>,
1932 709 : broker_client: BrokerClientChannel,
1933 709 : background_jobs_can_start: Option<&completion::Barrier>,
1934 709 : ctx: &RequestContext,
1935 709 : ) {
1936 709 : span::debug_assert_current_span_has_tenant_id();
1937 709 :
1938 709 : let mut activating = false;
1939 709 : self.state.send_modify(|current_state| {
1940 709 : use pageserver_api::models::ActivatingFrom;
1941 709 : match &*current_state {
1942 : TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => {
1943 UBC 0 : panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
1944 : }
1945 0 : TenantState::Loading => {
1946 0 : *current_state = TenantState::Activating(ActivatingFrom::Loading);
1947 0 : }
1948 CBC 709 : TenantState::Attaching => {
1949 709 : *current_state = TenantState::Activating(ActivatingFrom::Attaching);
1950 709 : }
1951 : }
1952 709 : debug!(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), "Activating tenant");
1953 709 : activating = true;
1954 709 : // Continue outside the closure. We need to grab timelines.lock()
1955 709 : // and we plan to turn it into a tokio::sync::Mutex in a future patch.
1956 709 : });
1957 709 :
1958 709 : if activating {
1959 709 : let timelines_accessor = self.timelines.lock().unwrap();
1960 709 : let timelines_to_activate = timelines_accessor
1961 709 : .values()
1962 709 : .filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
1963 709 :
1964 709 : // Spawn gc and compaction loops. The loops will shut themselves
1965 709 : // down when they notice that the tenant is inactive.
1966 709 : tasks::start_background_loops(self, background_jobs_can_start);
1967 709 :
1968 709 : let mut activated_timelines = 0;
1969 :
1970 1043 : for timeline in timelines_to_activate {
1971 334 : timeline.activate(broker_client.clone(), background_jobs_can_start, ctx);
1972 334 : activated_timelines += 1;
1973 334 : }
1974 :
1975 709 : self.state.send_modify(move |current_state| {
1976 709 : assert!(
1977 709 : matches!(current_state, TenantState::Activating(_)),
1978 UBC 0 : "set_stopping and set_broken wait for us to leave Activating state",
1979 : );
1980 CBC 709 : *current_state = TenantState::Active;
1981 709 :
1982 709 : let elapsed = self.constructed_at.elapsed();
1983 709 : let total_timelines = timelines_accessor.len();
1984 709 :
1985 709 : // log a lot of stuff, because some tenants sometimes suffer from user-visible
1986 709 : // times to activate. see https://github.com/neondatabase/neon/issues/4025
1987 709 : info!(
1988 709 : since_creation_millis = elapsed.as_millis(),
1989 709 : tenant_id = %self.tenant_shard_id.tenant_id,
1990 709 : shard_id = %self.tenant_shard_id.shard_slug(),
1991 709 : activated_timelines,
1992 709 : total_timelines,
1993 709 : post_state = <&'static str>::from(&*current_state),
1994 709 : "activation attempt finished"
1995 709 : );
1996 :
1997 709 : TENANT.activation.observe(elapsed.as_secs_f64());
1998 709 : });
1999 UBC 0 : }
2000 CBC 709 : }
2001 :
2002 : /// Shutdown the tenant and join all of the spawned tasks.
2003 : ///
2004 : /// The method caters for all use-cases:
2005 : /// - pageserver shutdown (freeze_and_flush == true)
2006 : /// - detach + ignore (freeze_and_flush == false)
2007 : ///
2008 : /// This will attempt to shutdown even if tenant is broken.
2009 : ///
2010 : /// `shutdown_progress` is a [`completion::Barrier`] for the shutdown initiated by this call.
2011 : /// If the tenant is already shutting down, we return a clone of the first shutdown call's
2012 : /// `Barrier` as an `Err`. This not-first caller can use the returned barrier to join with
2013 : /// the ongoing shutdown.
2014 368 : async fn shutdown(
2015 368 : &self,
2016 368 : shutdown_progress: completion::Barrier,
2017 368 : freeze_and_flush: bool,
2018 368 : ) -> Result<(), completion::Barrier> {
2019 368 : span::debug_assert_current_span_has_tenant_id();
2020 368 :
2021 368 : // Set tenant (and its timlines) to Stoppping state.
2022 368 : //
2023 368 : // Since we can only transition into Stopping state after activation is complete,
2024 368 : // run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed.
2025 368 : //
2026 368 : // Transitioning tenants to Stopping state has a couple of non-obvious side effects:
2027 368 : // 1. Lock out any new requests to the tenants.
2028 368 : // 2. Signal cancellation to WAL receivers (we wait on it below).
2029 368 : // 3. Signal cancellation for other tenant background loops.
2030 368 : // 4. ???
2031 368 : //
2032 368 : // The waiting for the cancellation is not done uniformly.
2033 368 : // We certainly wait for WAL receivers to shut down.
2034 368 : // That is necessary so that no new data comes in before the freeze_and_flush.
2035 368 : // But the tenant background loops are joined-on in our caller.
2036 368 : // It's mesed up.
2037 368 : // we just ignore the failure to stop
2038 368 :
2039 368 : match self.set_stopping(shutdown_progress, false, false).await {
2040 312 : Ok(()) => {}
2041 56 : Err(SetStoppingError::Broken) => {
2042 56 : // assume that this is acceptable
2043 56 : }
2044 UBC 0 : Err(SetStoppingError::AlreadyStopping(other)) => {
2045 : // give caller the option to wait for this this shutdown
2046 0 : info!("Tenant::shutdown: AlreadyStopping");
2047 0 : return Err(other);
2048 : }
2049 : };
2050 :
2051 CBC 368 : let mut js = tokio::task::JoinSet::new();
2052 368 : {
2053 368 : let timelines = self.timelines.lock().unwrap();
2054 499 : timelines.values().for_each(|timeline| {
2055 499 : let timeline = Arc::clone(timeline);
2056 499 : let span = Span::current();
2057 499 : js.spawn(async move {
2058 499 : if freeze_and_flush {
2059 835 : timeline.flush_and_shutdown().instrument(span).await
2060 : } else {
2061 623 : timeline.shutdown().instrument(span).await
2062 : }
2063 499 : });
2064 499 : })
2065 : };
2066 : // test_long_timeline_create_then_tenant_delete is leaning on this message
2067 368 : tracing::info!("Waiting for timelines...");
2068 867 : while let Some(res) = js.join_next().await {
2069 UBC 0 : match res {
2070 CBC 499 : Ok(()) => {}
2071 UBC 0 : Err(je) if je.is_cancelled() => unreachable!("no cancelling used"),
2072 0 : Err(je) if je.is_panic() => { /* logged already */ }
2073 0 : Err(je) => warn!("unexpected JoinError: {je:?}"),
2074 : }
2075 : }
2076 :
2077 : // We cancel the Tenant's cancellation token _after_ the timelines have all shut down. This permits
2078 : // them to continue to do work during their shutdown methods, e.g. flushing data.
2079 0 : tracing::debug!("Cancelling CancellationToken");
2080 CBC 368 : self.cancel.cancel();
2081 :
2082 : // shutdown all tenant and timeline tasks: gc, compaction, page service
2083 : // No new tasks will be started for this tenant because it's in `Stopping` state.
2084 : //
2085 : // this will additionally shutdown and await all timeline tasks.
2086 UBC 0 : tracing::debug!("Waiting for tasks...");
2087 CBC 368 : task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), None).await;
2088 :
2089 : // Wait for any in-flight operations to complete
2090 368 : self.gate.close().await;
2091 :
2092 368 : Ok(())
2093 368 : }
2094 :
2095 : /// Change tenant status to Stopping, to mark that it is being shut down.
2096 : ///
2097 : /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
2098 : ///
2099 : /// This function is not cancel-safe!
2100 : ///
2101 : /// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
2102 : /// `allow_transition_from_attaching` is needed for the special case of attaching deleted tenant.
2103 389 : async fn set_stopping(
2104 389 : &self,
2105 389 : progress: completion::Barrier,
2106 389 : allow_transition_from_loading: bool,
2107 389 : allow_transition_from_attaching: bool,
2108 389 : ) -> Result<(), SetStoppingError> {
2109 389 : let mut rx = self.state.subscribe();
2110 389 :
2111 389 : // cannot stop before we're done activating, so wait out until we're done activating
2112 392 : rx.wait_for(|state| match state {
2113 21 : TenantState::Attaching if allow_transition_from_attaching => true,
2114 : TenantState::Activating(_) | TenantState::Attaching => {
2115 3 : info!(
2116 3 : "waiting for {} to turn Active|Broken|Stopping",
2117 3 : <&'static str>::from(state)
2118 3 : );
2119 3 : false
2120 : }
2121 UBC 0 : TenantState::Loading => allow_transition_from_loading,
2122 CBC 368 : TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
2123 392 : })
2124 3 : .await
2125 389 : .expect("cannot drop self.state while on a &self method");
2126 389 :
2127 389 : // we now know we're done activating, let's see whether this task is the winner to transition into Stopping
2128 389 : let mut err = None;
2129 389 : let stopping = self.state.send_if_modified(|current_state| match current_state {
2130 : TenantState::Activating(_) => {
2131 UBC 0 : unreachable!("1we ensured above that we're done with activation, and, there is no re-activation")
2132 : }
2133 : TenantState::Attaching => {
2134 CBC 21 : if !allow_transition_from_attaching {
2135 UBC 0 : unreachable!("2we ensured above that we're done with activation, and, there is no re-activation")
2136 CBC 21 : };
2137 21 : *current_state = TenantState::Stopping { progress };
2138 21 : true
2139 : }
2140 : TenantState::Loading => {
2141 UBC 0 : if !allow_transition_from_loading {
2142 0 : unreachable!("3we ensured above that we're done with activation, and, there is no re-activation")
2143 0 : };
2144 0 : *current_state = TenantState::Stopping { progress };
2145 0 : true
2146 : }
2147 : TenantState::Active => {
2148 : // FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
2149 : // are created after the transition to Stopping. That's harmless, as the Timelines
2150 : // won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
2151 CBC 312 : *current_state = TenantState::Stopping { progress };
2152 312 : // Continue stopping outside the closure. We need to grab timelines.lock()
2153 312 : // and we plan to turn it into a tokio::sync::Mutex in a future patch.
2154 312 : true
2155 : }
2156 56 : TenantState::Broken { reason, .. } => {
2157 56 : info!(
2158 56 : "Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
2159 56 : );
2160 56 : err = Some(SetStoppingError::Broken);
2161 56 : false
2162 : }
2163 UBC 0 : TenantState::Stopping { progress } => {
2164 0 : info!("Tenant is already in Stopping state");
2165 0 : err = Some(SetStoppingError::AlreadyStopping(progress.clone()));
2166 0 : false
2167 : }
2168 CBC 389 : });
2169 389 : match (stopping, err) {
2170 333 : (true, None) => {} // continue
2171 56 : (false, Some(err)) => return Err(err),
2172 UBC 0 : (true, Some(_)) => unreachable!(
2173 0 : "send_if_modified closure must error out if not transitioning to Stopping"
2174 0 : ),
2175 0 : (false, None) => unreachable!(
2176 0 : "send_if_modified closure must return true if transitioning to Stopping"
2177 0 : ),
2178 : }
2179 :
2180 CBC 333 : let timelines_accessor = self.timelines.lock().unwrap();
2181 333 : let not_broken_timelines = timelines_accessor
2182 333 : .values()
2183 439 : .filter(|timeline| !timeline.is_broken());
2184 763 : for timeline in not_broken_timelines {
2185 430 : timeline.set_state(TimelineState::Stopping);
2186 430 : }
2187 333 : Ok(())
2188 389 : }
2189 :
2190 : /// Method for tenant::mgr to transition us into Broken state in case of a late failure in
2191 : /// `remove_tenant_from_memory`
2192 : ///
2193 : /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
2194 : ///
2195 : /// In tests, we also use this to set tenants to Broken state on purpose.
2196 51 : pub(crate) async fn set_broken(&self, reason: String) {
2197 51 : let mut rx = self.state.subscribe();
2198 51 :
2199 51 : // The load & attach routines own the tenant state until it has reached `Active`.
2200 51 : // So, wait until it's done.
2201 51 : rx.wait_for(|state| match state {
2202 : TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
2203 UBC 0 : info!(
2204 0 : "waiting for {} to turn Active|Broken|Stopping",
2205 0 : <&'static str>::from(state)
2206 0 : );
2207 0 : false
2208 : }
2209 CBC 51 : TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
2210 51 : })
2211 UBC 0 : .await
2212 CBC 51 : .expect("cannot drop self.state while on a &self method");
2213 51 :
2214 51 : // we now know we're done activating, let's see whether this task is the winner to transition into Broken
2215 51 : self.set_broken_no_wait(reason)
2216 51 : }
2217 :
2218 51 : pub(crate) fn set_broken_no_wait(&self, reason: impl Display) {
2219 51 : let reason = reason.to_string();
2220 51 : self.state.send_modify(|current_state| {
2221 51 : match *current_state {
2222 : TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
2223 UBC 0 : unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
2224 : }
2225 : TenantState::Active => {
2226 CBC 2 : if cfg!(feature = "testing") {
2227 2 : warn!("Changing Active tenant to Broken state, reason: {}", reason);
2228 2 : *current_state = TenantState::broken_from_reason(reason);
2229 : } else {
2230 UBC 0 : unreachable!("not allowed to call set_broken on Active tenants in non-testing builds")
2231 : }
2232 : }
2233 : TenantState::Broken { .. } => {
2234 0 : warn!("Tenant is already in Broken state");
2235 : }
2236 : // This is the only "expected" path, any other path is a bug.
2237 : TenantState::Stopping { .. } => {
2238 CBC 49 : warn!(
2239 49 : "Marking Stopping tenant as Broken state, reason: {}",
2240 49 : reason
2241 49 : );
2242 49 : *current_state = TenantState::broken_from_reason(reason);
2243 : }
2244 : }
2245 51 : });
2246 51 : }
2247 :
2248 352 : pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
2249 352 : self.state.subscribe()
2250 352 : }
2251 :
2252 : /// The activate_now semaphore is initialized with zero units. As soon as
2253 : /// we add a unit, waiters will be able to acquire a unit and proceed.
2254 1026 : pub(crate) fn activate_now(&self) {
2255 1026 : self.activate_now_sem.add_permits(1);
2256 1026 : }
2257 :
2258 1478 : pub(crate) async fn wait_to_become_active(
2259 1478 : &self,
2260 1478 : timeout: Duration,
2261 1478 : ) -> Result<(), GetActiveTenantError> {
2262 1478 : let mut receiver = self.state.subscribe();
2263 2363 : loop {
2264 2363 : let current_state = receiver.borrow_and_update().clone();
2265 2363 : match current_state {
2266 : TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
2267 : // in these states, there's a chance that we can reach ::Active
2268 885 : self.activate_now();
2269 1031 : match timeout_cancellable(timeout, &self.cancel, receiver.changed()).await {
2270 885 : Ok(r) => {
2271 885 : r.map_err(
2272 885 : |_e: tokio::sync::watch::error::RecvError|
2273 : // Tenant existed but was dropped: report it as non-existent
2274 885 : GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_shard_id.tenant_id))
2275 885 : )?
2276 : }
2277 : Err(TimeoutCancellableError::Cancelled) => {
2278 UBC 0 : return Err(GetActiveTenantError::Cancelled);
2279 : }
2280 : Err(TimeoutCancellableError::Timeout) => {
2281 0 : return Err(GetActiveTenantError::WaitForActiveTimeout {
2282 0 : latest_state: Some(self.current_state()),
2283 0 : wait_time: timeout,
2284 0 : });
2285 : }
2286 : }
2287 : }
2288 : TenantState::Active { .. } => {
2289 CBC 1478 : return Ok(());
2290 : }
2291 : TenantState::Broken { .. } | TenantState::Stopping { .. } => {
2292 : // There's no chance the tenant can transition back into ::Active
2293 UBC 0 : return Err(GetActiveTenantError::WillNotBecomeActive(current_state));
2294 : }
2295 : }
2296 : }
2297 CBC 1478 : }
2298 :
2299 34 : pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
2300 34 : self.tenant_conf
2301 34 : .read()
2302 34 : .unwrap()
2303 34 : .location
2304 34 : .attach_mode
2305 34 : .clone()
2306 34 : }
2307 :
2308 36 : pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
2309 36 : &self.tenant_shard_id
2310 36 : }
2311 :
2312 6 : pub(crate) fn get_generation(&self) -> Generation {
2313 6 : self.generation
2314 6 : }
2315 : }
2316 :
2317 : /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
2318 : /// perform a topological sort, so that the parent of each timeline comes
2319 : /// before the children.
2320 : /// E extracts the ancestor from T
2321 : /// This allows for T to be different. It can be TimelineMetadata, can be Timeline itself, etc.
2322 864 : fn tree_sort_timelines<T, E>(
2323 864 : timelines: HashMap<TimelineId, T>,
2324 864 : extractor: E,
2325 864 : ) -> anyhow::Result<Vec<(TimelineId, T)>>
2326 864 : where
2327 864 : E: Fn(&T) -> Option<TimelineId>,
2328 864 : {
2329 864 : let mut result = Vec::with_capacity(timelines.len());
2330 864 :
2331 864 : let mut now = Vec::with_capacity(timelines.len());
2332 864 : // (ancestor, children)
2333 864 : let mut later: HashMap<TimelineId, Vec<(TimelineId, T)>> =
2334 864 : HashMap::with_capacity(timelines.len());
2335 :
2336 1368 : for (timeline_id, value) in timelines {
2337 504 : if let Some(ancestor_id) = extractor(&value) {
2338 52 : let children = later.entry(ancestor_id).or_default();
2339 52 : children.push((timeline_id, value));
2340 452 : } else {
2341 452 : now.push((timeline_id, value));
2342 452 : }
2343 : }
2344 :
2345 1368 : while let Some((timeline_id, metadata)) = now.pop() {
2346 504 : result.push((timeline_id, metadata));
2347 : // All children of this can be loaded now
2348 504 : if let Some(mut children) = later.remove(&timeline_id) {
2349 41 : now.append(&mut children);
2350 463 : }
2351 : }
2352 :
2353 : // All timelines should be visited now. Unless there were timelines with missing ancestors.
2354 864 : if !later.is_empty() {
2355 UBC 0 : for (missing_id, orphan_ids) in later {
2356 0 : for (orphan_id, _) in orphan_ids {
2357 0 : error!("could not load timeline {orphan_id} because its ancestor timeline {missing_id} could not be loaded");
2358 : }
2359 : }
2360 0 : bail!("could not load tenant because some timelines are missing ancestors");
2361 CBC 864 : }
2362 864 :
2363 864 : Ok(result)
2364 864 : }
2365 :
2366 : impl Tenant {
2367 90 : pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
2368 90 : self.tenant_conf.read().unwrap().tenant_conf
2369 90 : }
2370 :
2371 45 : pub fn effective_config(&self) -> TenantConf {
2372 45 : self.tenant_specific_overrides()
2373 45 : .merge(self.conf.default_tenant_conf)
2374 45 : }
2375 :
2376 5 : pub fn get_checkpoint_distance(&self) -> u64 {
2377 5 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2378 5 : tenant_conf
2379 5 : .checkpoint_distance
2380 5 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
2381 5 : }
2382 :
2383 5 : pub fn get_checkpoint_timeout(&self) -> Duration {
2384 5 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2385 5 : tenant_conf
2386 5 : .checkpoint_timeout
2387 5 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
2388 5 : }
2389 :
2390 5 : pub fn get_compaction_target_size(&self) -> u64 {
2391 5 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2392 5 : tenant_conf
2393 5 : .compaction_target_size
2394 5 : .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
2395 5 : }
2396 :
2397 1083 : pub fn get_compaction_period(&self) -> Duration {
2398 1083 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2399 1083 : tenant_conf
2400 1083 : .compaction_period
2401 1083 : .unwrap_or(self.conf.default_tenant_conf.compaction_period)
2402 1083 : }
2403 :
2404 5 : pub fn get_compaction_threshold(&self) -> usize {
2405 5 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2406 5 : tenant_conf
2407 5 : .compaction_threshold
2408 5 : .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
2409 5 : }
2410 :
2411 544 : pub fn get_gc_horizon(&self) -> u64 {
2412 544 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2413 544 : tenant_conf
2414 544 : .gc_horizon
2415 544 : .unwrap_or(self.conf.default_tenant_conf.gc_horizon)
2416 544 : }
2417 :
2418 984 : pub fn get_gc_period(&self) -> Duration {
2419 984 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2420 984 : tenant_conf
2421 984 : .gc_period
2422 984 : .unwrap_or(self.conf.default_tenant_conf.gc_period)
2423 984 : }
2424 :
2425 5 : pub fn get_image_creation_threshold(&self) -> usize {
2426 5 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2427 5 : tenant_conf
2428 5 : .image_creation_threshold
2429 5 : .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
2430 5 : }
2431 :
2432 459 : pub fn get_pitr_interval(&self) -> Duration {
2433 459 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2434 459 : tenant_conf
2435 459 : .pitr_interval
2436 459 : .unwrap_or(self.conf.default_tenant_conf.pitr_interval)
2437 459 : }
2438 :
2439 6961 : pub fn get_trace_read_requests(&self) -> bool {
2440 6961 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2441 6961 : tenant_conf
2442 6961 : .trace_read_requests
2443 6961 : .unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
2444 6961 : }
2445 :
2446 23 : pub fn get_min_resident_size_override(&self) -> Option<u64> {
2447 23 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2448 23 : tenant_conf
2449 23 : .min_resident_size_override
2450 23 : .or(self.conf.default_tenant_conf.min_resident_size_override)
2451 23 : }
2452 :
2453 842 : pub fn get_heatmap_period(&self) -> Option<Duration> {
2454 842 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2455 842 : let heatmap_period = tenant_conf
2456 842 : .heatmap_period
2457 842 : .unwrap_or(self.conf.default_tenant_conf.heatmap_period);
2458 842 : if heatmap_period.is_zero() {
2459 842 : None
2460 : } else {
2461 UBC 0 : Some(heatmap_period)
2462 : }
2463 CBC 842 : }
2464 :
2465 30 : pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
2466 30 : self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
2467 30 : // Don't hold self.timelines.lock() during the notifies.
2468 30 : // There's no risk of deadlock right now, but there could be if we consolidate
2469 30 : // mutexes in struct Timeline in the future.
2470 30 : let timelines = self.list_timelines();
2471 62 : for timeline in timelines {
2472 32 : timeline.tenant_conf_updated();
2473 32 : }
2474 30 : }
2475 :
2476 20 : pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) {
2477 20 : *self.tenant_conf.write().unwrap() = new_conf;
2478 20 : // Don't hold self.timelines.lock() during the notifies.
2479 20 : // There's no risk of deadlock right now, but there could be if we consolidate
2480 20 : // mutexes in struct Timeline in the future.
2481 20 : let timelines = self.list_timelines();
2482 40 : for timeline in timelines {
2483 20 : timeline.tenant_conf_updated();
2484 20 : }
2485 20 : }
2486 :
2487 : /// Helper function to create a new Timeline struct.
2488 : ///
2489 : /// The returned Timeline is in Loading state. The caller is responsible for
2490 : /// initializing any on-disk state, and for inserting the Timeline to the 'timelines'
2491 : /// map.
2492 : ///
2493 : /// `validate_ancestor == false` is used when a timeline is created for deletion
2494 : /// and we might not have the ancestor present anymore which is fine for to be
2495 : /// deleted timelines.
2496 1290 : fn create_timeline_struct(
2497 1290 : &self,
2498 1290 : new_timeline_id: TimelineId,
2499 1290 : new_metadata: &TimelineMetadata,
2500 1290 : ancestor: Option<Arc<Timeline>>,
2501 1290 : resources: TimelineResources,
2502 1290 : cause: CreateTimelineCause,
2503 1290 : ) -> anyhow::Result<Arc<Timeline>> {
2504 1290 : let state = match cause {
2505 : CreateTimelineCause::Load => {
2506 1278 : let ancestor_id = new_metadata.ancestor_timeline();
2507 1278 : anyhow::ensure!(
2508 1278 : ancestor_id == ancestor.as_ref().map(|t| t.timeline_id),
2509 UBC 0 : "Timeline's {new_timeline_id} ancestor {ancestor_id:?} was not found"
2510 : );
2511 CBC 1278 : TimelineState::Loading
2512 : }
2513 12 : CreateTimelineCause::Delete => TimelineState::Stopping,
2514 : };
2515 :
2516 1290 : let pg_version = new_metadata.pg_version();
2517 1290 :
2518 1290 : let timeline = Timeline::new(
2519 1290 : self.conf,
2520 1290 : Arc::clone(&self.tenant_conf),
2521 1290 : new_metadata,
2522 1290 : ancestor,
2523 1290 : new_timeline_id,
2524 1290 : self.tenant_shard_id,
2525 1290 : self.generation,
2526 1290 : self.shard_identity,
2527 1290 : Arc::clone(&self.walredo_mgr),
2528 1290 : resources,
2529 1290 : pg_version,
2530 1290 : state,
2531 1290 : self.cancel.child_token(),
2532 1290 : );
2533 1290 :
2534 1290 : Ok(timeline)
2535 1290 : }
2536 :
2537 : // Allow too_many_arguments because a constructor's argument list naturally grows with the
2538 : // number of attributes in the struct: breaking these out into a builder wouldn't be helpful.
2539 : #[allow(clippy::too_many_arguments)]
2540 778 : fn new(
2541 778 : state: TenantState,
2542 778 : conf: &'static PageServerConf,
2543 778 : attached_conf: AttachedTenantConf,
2544 778 : shard_identity: ShardIdentity,
2545 778 : walredo_mgr: Arc<WalRedoManager>,
2546 778 : tenant_shard_id: TenantShardId,
2547 778 : remote_storage: Option<GenericRemoteStorage>,
2548 778 : deletion_queue_client: DeletionQueueClient,
2549 778 : ) -> Tenant {
2550 778 : let (state, mut rx) = watch::channel(state);
2551 778 :
2552 778 : tokio::spawn(async move {
2553 778 : let tid = tenant_shard_id.to_string();
2554 778 :
2555 2278 : fn inspect_state(state: &TenantState) -> ([&'static str; 1], bool) {
2556 2278 : ([state.into()], matches!(state, TenantState::Broken { .. }))
2557 2278 : }
2558 778 :
2559 778 : let mut tuple = inspect_state(&rx.borrow_and_update());
2560 778 :
2561 778 : let is_broken = tuple.1;
2562 778 : let mut counted_broken = if !is_broken {
2563 : // the tenant might be ignored and reloaded, so first remove any previous set
2564 : // element. it most likely has already been scraped, as these are manual operations
2565 : // right now. most likely we will add it back very soon.
2566 778 : drop(crate::metrics::BROKEN_TENANTS_SET.remove_label_values(&[&tid]));
2567 778 : false
2568 : } else {
2569 : // add the id to the set right away, there should not be any updates on the channel
2570 : // after
2571 UBC 0 : crate::metrics::BROKEN_TENANTS_SET
2572 0 : .with_label_values(&[&tid])
2573 0 : .set(1);
2574 0 : true
2575 : };
2576 :
2577 CBC 2278 : loop {
2578 2278 : let labels = &tuple.0;
2579 2278 : let current = TENANT_STATE_METRIC.with_label_values(labels);
2580 2278 : current.inc();
2581 2278 :
2582 2278 : if rx.changed().await.is_err() {
2583 : // tenant has been dropped; decrement the counter because a tenant with that
2584 : // state is no longer in tenant map, but allow any broken set item to exist
2585 : // still.
2586 161 : current.dec();
2587 161 : break;
2588 1500 : }
2589 1500 :
2590 1500 : current.dec();
2591 1500 : tuple = inspect_state(&rx.borrow_and_update());
2592 1500 :
2593 1500 : let is_broken = tuple.1;
2594 1500 : if is_broken && !counted_broken {
2595 57 : counted_broken = true;
2596 57 : // insert the tenant_id (back) into the set
2597 57 : crate::metrics::BROKEN_TENANTS_SET
2598 57 : .with_label_values(&[&tid])
2599 57 : .inc();
2600 1443 : }
2601 : }
2602 778 : });
2603 778 :
2604 778 : Tenant {
2605 778 : tenant_shard_id,
2606 778 : shard_identity,
2607 778 : generation: attached_conf.location.generation,
2608 778 : conf,
2609 778 : // using now here is good enough approximation to catch tenants with really long
2610 778 : // activation times.
2611 778 : constructed_at: Instant::now(),
2612 778 : tenant_conf: Arc::new(RwLock::new(attached_conf)),
2613 778 : timelines: Mutex::new(HashMap::new()),
2614 778 : timelines_creating: Mutex::new(HashSet::new()),
2615 778 : gc_cs: tokio::sync::Mutex::new(()),
2616 778 : walredo_mgr,
2617 778 : remote_storage,
2618 778 : deletion_queue_client,
2619 778 : state,
2620 778 : cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
2621 778 : cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
2622 778 : eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
2623 778 : activate_now_sem: tokio::sync::Semaphore::new(0),
2624 778 : delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
2625 778 : cancel: CancellationToken::default(),
2626 778 : gate: Gate::new(format!("Tenant<{tenant_shard_id}>")),
2627 778 : }
2628 778 : }
2629 :
2630 : /// Locate and load config
2631 224 : pub(super) fn load_tenant_config(
2632 224 : conf: &'static PageServerConf,
2633 224 : tenant_shard_id: &TenantShardId,
2634 224 : ) -> anyhow::Result<LocationConf> {
2635 224 : let legacy_config_path = conf.tenant_config_path(tenant_shard_id);
2636 224 : let config_path = conf.tenant_location_config_path(tenant_shard_id);
2637 224 :
2638 224 : if config_path.exists() {
2639 : // New-style config takes precedence
2640 219 : let deserialized = Self::read_config(&config_path)?;
2641 219 : Ok(toml_edit::de::from_document::<LocationConf>(deserialized)?)
2642 5 : } else if legacy_config_path.exists() {
2643 : // Upgrade path: found an old-style configuration only
2644 1 : let deserialized = Self::read_config(&legacy_config_path)?;
2645 :
2646 1 : let mut tenant_conf = TenantConfOpt::default();
2647 1 : for (key, item) in deserialized.iter() {
2648 1 : match key {
2649 1 : "tenant_config" => {
2650 1 : tenant_conf = TenantConfOpt::try_from(item.to_owned()).context(format!("Failed to parse config from file '{legacy_config_path}' as pageserver config"))?;
2651 : }
2652 UBC 0 : _ => bail!(
2653 0 : "config file {legacy_config_path} has unrecognized pageserver option '{key}'"
2654 0 : ),
2655 : }
2656 : }
2657 :
2658 : // Legacy configs are implicitly in attached state
2659 CBC 1 : Ok(LocationConf::attached_single(
2660 1 : tenant_conf,
2661 1 : Generation::none(),
2662 1 : ))
2663 : } else {
2664 : // FIXME If the config file is not found, assume that we're attaching
2665 : // a detached tenant and config is passed via attach command.
2666 : // https://github.com/neondatabase/neon/issues/1555
2667 : // OR: we're loading after incomplete deletion that managed to remove config.
2668 4 : info!(
2669 4 : "tenant config not found in {} or {}",
2670 4 : config_path, legacy_config_path
2671 4 : );
2672 4 : Ok(LocationConf::default())
2673 : }
2674 224 : }
2675 :
2676 220 : fn read_config(path: &Utf8Path) -> anyhow::Result<toml_edit::Document> {
2677 220 : info!("loading tenant configuration from {path}");
2678 :
2679 : // load and parse file
2680 220 : let config = fs::read_to_string(path)
2681 220 : .with_context(|| format!("Failed to load config from path '{path}'"))?;
2682 :
2683 220 : config
2684 220 : .parse::<toml_edit::Document>()
2685 220 : .with_context(|| format!("Failed to parse config from file '{path}' as toml file"))
2686 220 : }
2687 :
2688 353 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
2689 : pub(super) async fn persist_tenant_config(
2690 : conf: &'static PageServerConf,
2691 : tenant_shard_id: &TenantShardId,
2692 : location_conf: &LocationConf,
2693 : ) -> anyhow::Result<()> {
2694 : let legacy_config_path = conf.tenant_config_path(tenant_shard_id);
2695 : let config_path = conf.tenant_location_config_path(tenant_shard_id);
2696 :
2697 : Self::persist_tenant_config_at(
2698 : tenant_shard_id,
2699 : &config_path,
2700 : &legacy_config_path,
2701 : location_conf,
2702 : )
2703 : .await
2704 : }
2705 :
2706 UBC 0 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
2707 : pub(super) async fn persist_tenant_config_at(
2708 : tenant_shard_id: &TenantShardId,
2709 : config_path: &Utf8Path,
2710 : legacy_config_path: &Utf8Path,
2711 : location_conf: &LocationConf,
2712 : ) -> anyhow::Result<()> {
2713 : // Forward compat: write out an old-style configuration that old versions can read, in case we roll back
2714 : Self::persist_tenant_config_legacy(
2715 : tenant_shard_id,
2716 : legacy_config_path,
2717 : &location_conf.tenant_conf,
2718 : )
2719 : .await?;
2720 :
2721 : if let LocationMode::Attached(attach_conf) = &location_conf.mode {
2722 : // Once we use LocationMode, generations are mandatory. If we aren't using generations,
2723 : // then drop out after writing legacy-style config.
2724 : if attach_conf.generation.is_none() {
2725 0 : tracing::debug!("Running without generations, not writing new-style LocationConf");
2726 : return Ok(());
2727 : }
2728 : }
2729 :
2730 0 : debug!("persisting tenantconf to {config_path}");
2731 :
2732 : let mut conf_content = r#"# This file contains a specific per-tenant's config.
2733 : # It is read in case of pageserver restart.
2734 : "#
2735 : .to_string();
2736 :
2737 : // Convert the config to a toml file.
2738 : conf_content += &toml_edit::ser::to_string_pretty(&location_conf)?;
2739 :
2740 : let temp_path = path_with_suffix_extension(config_path, TEMP_FILE_SUFFIX);
2741 :
2742 : let tenant_shard_id = *tenant_shard_id;
2743 : let config_path = config_path.to_owned();
2744 CBC 812 : tokio::task::spawn_blocking(move || {
2745 812 : Handle::current().block_on(async move {
2746 812 : let conf_content = conf_content.as_bytes();
2747 812 : VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
2748 UBC 0 : .await
2749 CBC 812 : .with_context(|| {
2750 UBC 0 : format!("write tenant {tenant_shard_id} config to {config_path}")
2751 CBC 812 : })
2752 812 : })
2753 812 : })
2754 : .await??;
2755 :
2756 : Ok(())
2757 : }
2758 :
2759 813 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
2760 : async fn persist_tenant_config_legacy(
2761 : tenant_shard_id: &TenantShardId,
2762 : target_config_path: &Utf8Path,
2763 : tenant_conf: &TenantConfOpt,
2764 : ) -> anyhow::Result<()> {
2765 UBC 0 : debug!("persisting tenantconf to {target_config_path}");
2766 :
2767 : let mut conf_content = r#"# This file contains a specific per-tenant's config.
2768 : # It is read in case of pageserver restart.
2769 :
2770 : [tenant_config]
2771 : "#
2772 : .to_string();
2773 :
2774 : // Convert the config to a toml file.
2775 : conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
2776 :
2777 : let temp_path = path_with_suffix_extension(target_config_path, TEMP_FILE_SUFFIX);
2778 :
2779 : let tenant_shard_id = *tenant_shard_id;
2780 : let target_config_path = target_config_path.to_owned();
2781 CBC 813 : tokio::task::spawn_blocking(move || {
2782 813 : Handle::current().block_on(async move {
2783 813 : let conf_content = conf_content.as_bytes();
2784 813 : VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
2785 UBC 0 : .await
2786 CBC 813 : .with_context(|| {
2787 UBC 0 : format!("write tenant {tenant_shard_id} config to {target_config_path}")
2788 CBC 813 : })
2789 813 : })
2790 813 : })
2791 : .await??;
2792 : Ok(())
2793 : }
2794 :
2795 : //
2796 : // How garbage collection works:
2797 : //
2798 : // +--bar------------->
2799 : // /
2800 : // +----+-----foo---------------->
2801 : // /
2802 : // ----main--+-------------------------->
2803 : // \
2804 : // +-----baz-------->
2805 : //
2806 : //
2807 : // 1. Grab 'gc_cs' mutex to prevent new timelines from being created while Timeline's
2808 : // `gc_infos` are being refreshed
2809 : // 2. Scan collected timelines, and on each timeline, make note of the
2810 : // all the points where other timelines have been branched off.
2811 : // We will refrain from removing page versions at those LSNs.
2812 : // 3. For each timeline, scan all layer files on the timeline.
2813 : // Remove all files for which a newer file exists and which
2814 : // don't cover any branch point LSNs.
2815 : //
2816 : // TODO:
2817 : // - if a relation has a non-incremental persistent layer on a child branch, then we
2818 : // don't need to keep that in the parent anymore. But currently
2819 : // we do.
2820 365 : async fn gc_iteration_internal(
2821 365 : &self,
2822 365 : target_timeline_id: Option<TimelineId>,
2823 365 : horizon: u64,
2824 365 : pitr: Duration,
2825 365 : cancel: &CancellationToken,
2826 365 : ctx: &RequestContext,
2827 365 : ) -> anyhow::Result<GcResult> {
2828 365 : let mut totals: GcResult = Default::default();
2829 365 : let now = Instant::now();
2830 :
2831 365 : let gc_timelines = match self
2832 365 : .refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx)
2833 235890 : .await
2834 : {
2835 363 : Ok(result) => result,
2836 1 : Err(e) => {
2837 UBC 0 : if let Some(PageReconstructError::Cancelled) =
2838 CBC 1 : e.downcast_ref::<PageReconstructError>()
2839 : {
2840 : // Handle cancellation
2841 UBC 0 : totals.elapsed = now.elapsed();
2842 0 : return Ok(totals);
2843 : } else {
2844 : // Propagate other errors
2845 CBC 1 : return Err(e);
2846 : }
2847 : }
2848 : };
2849 :
2850 3 : failpoint_support::sleep_millis_async!("gc_iteration_internal_after_getting_gc_timelines");
2851 :
2852 : // If there is nothing to GC, we don't want any messages in the INFO log.
2853 363 : if !gc_timelines.is_empty() {
2854 355 : info!("{} timelines need GC", gc_timelines.len());
2855 : } else {
2856 UBC 0 : debug!("{} timelines need GC", gc_timelines.len());
2857 : }
2858 :
2859 : // Perform GC for each timeline.
2860 : //
2861 : // Note that we don't hold the `Tenant::gc_cs` lock here because we don't want to delay the
2862 : // branch creation task, which requires the GC lock. A GC iteration can run concurrently
2863 : // with branch creation.
2864 : //
2865 : // See comments in [`Tenant::branch_timeline`] for more information about why branch
2866 : // creation task can run concurrently with timeline's GC iteration.
2867 CBC 854 : for timeline in gc_timelines {
2868 496 : if task_mgr::is_shutdown_requested() || cancel.is_cancelled() {
2869 : // We were requested to shut down. Stop and return with the progress we
2870 : // made.
2871 UBC 0 : break;
2872 CBC 496 : }
2873 496 : let result = timeline.gc().await?;
2874 491 : totals += result;
2875 : }
2876 :
2877 358 : totals.elapsed = now.elapsed();
2878 358 : Ok(totals)
2879 359 : }
2880 :
2881 : /// Refreshes the Timeline::gc_info for all timelines, returning the
2882 : /// vector of timelines which have [`Timeline::get_last_record_lsn`] past
2883 : /// [`Tenant::get_gc_horizon`].
2884 : ///
2885 : /// This is usually executed as part of periodic gc, but can now be triggered more often.
2886 92 : pub async fn refresh_gc_info(
2887 92 : &self,
2888 92 : cancel: &CancellationToken,
2889 92 : ctx: &RequestContext,
2890 92 : ) -> anyhow::Result<Vec<Arc<Timeline>>> {
2891 92 : // since this method can now be called at different rates than the configured gc loop, it
2892 92 : // might be that these configuration values get applied faster than what it was previously,
2893 92 : // since these were only read from the gc task.
2894 92 : let horizon = self.get_gc_horizon();
2895 92 : let pitr = self.get_pitr_interval();
2896 92 :
2897 92 : // refresh all timelines
2898 92 : let target_timeline_id = None;
2899 92 :
2900 92 : self.refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx)
2901 20 : .await
2902 92 : }
2903 :
2904 457 : async fn refresh_gc_info_internal(
2905 457 : &self,
2906 457 : target_timeline_id: Option<TimelineId>,
2907 457 : horizon: u64,
2908 457 : pitr: Duration,
2909 457 : cancel: &CancellationToken,
2910 457 : ctx: &RequestContext,
2911 457 : ) -> anyhow::Result<Vec<Arc<Timeline>>> {
2912 : // grab mutex to prevent new timelines from being created here.
2913 457 : let gc_cs = self.gc_cs.lock().await;
2914 :
2915 : // Scan all timelines. For each timeline, remember the timeline ID and
2916 : // the branch point where it was created.
2917 456 : let (all_branchpoints, timeline_ids): (BTreeSet<(TimelineId, Lsn)>, _) = {
2918 457 : let timelines = self.timelines.lock().unwrap();
2919 457 : let mut all_branchpoints = BTreeSet::new();
2920 456 : let timeline_ids = {
2921 457 : if let Some(target_timeline_id) = target_timeline_id.as_ref() {
2922 325 : if timelines.get(target_timeline_id).is_none() {
2923 1 : bail!("gc target timeline does not exist")
2924 324 : }
2925 132 : };
2926 :
2927 456 : timelines
2928 456 : .iter()
2929 949 : .map(|(timeline_id, timeline_entry)| {
2930 500 : if let Some(ancestor_timeline_id) =
2931 949 : &timeline_entry.get_ancestor_timeline_id()
2932 : {
2933 : // If target_timeline is specified, we only need to know branchpoints of its children
2934 500 : if let Some(timeline_id) = target_timeline_id {
2935 304 : if ancestor_timeline_id == &timeline_id {
2936 6 : all_branchpoints.insert((
2937 6 : *ancestor_timeline_id,
2938 6 : timeline_entry.get_ancestor_lsn(),
2939 6 : ));
2940 298 : }
2941 : }
2942 : // Collect branchpoints for all timelines
2943 196 : else {
2944 196 : all_branchpoints.insert((
2945 196 : *ancestor_timeline_id,
2946 196 : timeline_entry.get_ancestor_lsn(),
2947 196 : ));
2948 196 : }
2949 449 : }
2950 :
2951 949 : *timeline_id
2952 949 : })
2953 456 : .collect::<Vec<_>>()
2954 456 : };
2955 456 : (all_branchpoints, timeline_ids)
2956 456 : };
2957 456 :
2958 456 : // Ok, we now know all the branch points.
2959 456 : // Update the GC information for each timeline.
2960 456 : let mut gc_timelines = Vec::with_capacity(timeline_ids.len());
2961 1399 : for timeline_id in timeline_ids {
2962 : // Timeline is known to be local and loaded.
2963 944 : let timeline = self
2964 944 : .get_timeline(timeline_id, false)
2965 944 : .with_context(|| format!("Timeline {timeline_id} was not found"))?;
2966 :
2967 : // If target_timeline is specified, ignore all other timelines
2968 944 : if let Some(target_timeline_id) = target_timeline_id {
2969 628 : if timeline_id != target_timeline_id {
2970 304 : continue;
2971 324 : }
2972 316 : }
2973 :
2974 640 : if let Some(cutoff) = timeline.get_last_record_lsn().checked_sub(horizon) {
2975 561 : let branchpoints: Vec<Lsn> = all_branchpoints
2976 561 : .range((
2977 561 : Included((timeline_id, Lsn(0))),
2978 561 : Included((timeline_id, Lsn(u64::MAX))),
2979 561 : ))
2980 561 : .map(|&x| x.1)
2981 561 : .collect();
2982 561 : timeline
2983 561 : .update_gc_info(branchpoints, cutoff, pitr, cancel, ctx)
2984 235910 : .await?;
2985 :
2986 560 : gc_timelines.push(timeline);
2987 79 : }
2988 : }
2989 455 : drop(gc_cs);
2990 455 : Ok(gc_timelines)
2991 456 : }
2992 :
2993 : /// A substitute for `branch_timeline` for use in unit tests.
2994 : /// The returned timeline will have state value `Active` to make various `anyhow::ensure!()`
2995 : /// calls pass, but, we do not actually call `.activate()` under the hood. So, none of the
2996 : /// timeline background tasks are launched, except the flush loop.
2997 : #[cfg(test)]
2998 107 : async fn branch_timeline_test(
2999 107 : &self,
3000 107 : src_timeline: &Arc<Timeline>,
3001 107 : dst_id: TimelineId,
3002 107 : start_lsn: Option<Lsn>,
3003 107 : ctx: &RequestContext,
3004 107 : ) -> Result<Arc<Timeline>, CreateTimelineError> {
3005 107 : let uninit_mark = self.create_timeline_uninit_mark(dst_id).unwrap();
3006 107 : let tl = self
3007 107 : .branch_timeline_impl(src_timeline, dst_id, start_lsn, uninit_mark, ctx)
3008 2 : .await?;
3009 105 : tl.set_state(TimelineState::Active);
3010 105 : Ok(tl)
3011 107 : }
3012 :
3013 : /// Branch an existing timeline.
3014 : ///
3015 : /// The caller is responsible for activating the returned timeline.
3016 250 : async fn branch_timeline(
3017 250 : &self,
3018 250 : src_timeline: &Arc<Timeline>,
3019 250 : dst_id: TimelineId,
3020 250 : start_lsn: Option<Lsn>,
3021 250 : timeline_uninit_mark: TimelineUninitMark<'_>,
3022 250 : ctx: &RequestContext,
3023 250 : ) -> Result<Arc<Timeline>, CreateTimelineError> {
3024 250 : self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_uninit_mark, ctx)
3025 8 : .await
3026 250 : }
3027 :
3028 357 : async fn branch_timeline_impl(
3029 357 : &self,
3030 357 : src_timeline: &Arc<Timeline>,
3031 357 : dst_id: TimelineId,
3032 357 : start_lsn: Option<Lsn>,
3033 357 : timeline_uninit_mark: TimelineUninitMark<'_>,
3034 357 : _ctx: &RequestContext,
3035 357 : ) -> Result<Arc<Timeline>, CreateTimelineError> {
3036 357 : let src_id = src_timeline.timeline_id;
3037 :
3038 : // We will validate our ancestor LSN in this function. Acquire the GC lock so that
3039 : // this check cannot race with GC, and the ancestor LSN is guaranteed to remain
3040 : // valid while we are creating the branch.
3041 357 : let _gc_cs = self.gc_cs.lock().await;
3042 :
3043 : // If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
3044 357 : let start_lsn = start_lsn.unwrap_or_else(|| {
3045 222 : let lsn = src_timeline.get_last_record_lsn();
3046 222 : info!("branching timeline {dst_id} from timeline {src_id} at last record LSN: {lsn}");
3047 222 : lsn
3048 357 : });
3049 357 :
3050 357 : // Ensure that `start_lsn` is valid, i.e. the LSN is within the PITR
3051 357 : // horizon on the source timeline
3052 357 : //
3053 357 : // We check it against both the planned GC cutoff stored in 'gc_info',
3054 357 : // and the 'latest_gc_cutoff' of the last GC that was performed. The
3055 357 : // planned GC cutoff in 'gc_info' is normally larger than
3056 357 : // 'latest_gc_cutoff_lsn', but beware of corner cases like if you just
3057 357 : // changed the GC settings for the tenant to make the PITR window
3058 357 : // larger, but some of the data was already removed by an earlier GC
3059 357 : // iteration.
3060 357 :
3061 357 : // check against last actual 'latest_gc_cutoff' first
3062 357 : let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
3063 357 : src_timeline
3064 357 : .check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
3065 357 : .context(format!(
3066 357 : "invalid branch start lsn: less than latest GC cutoff {}",
3067 357 : *latest_gc_cutoff_lsn,
3068 357 : ))
3069 357 : .map_err(CreateTimelineError::AncestorLsn)?;
3070 :
3071 : // and then the planned GC cutoff
3072 : {
3073 349 : let gc_info = src_timeline.gc_info.read().unwrap();
3074 349 : let cutoff = min(gc_info.pitr_cutoff, gc_info.horizon_cutoff);
3075 349 : if start_lsn < cutoff {
3076 UBC 0 : return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
3077 0 : "invalid branch start lsn: less than planned GC cutoff {cutoff}"
3078 0 : )));
3079 CBC 349 : }
3080 349 : }
3081 349 :
3082 349 : //
3083 349 : // The branch point is valid, and we are still holding the 'gc_cs' lock
3084 349 : // so that GC cannot advance the GC cutoff until we are finished.
3085 349 : // Proceed with the branch creation.
3086 349 : //
3087 349 :
3088 349 : // Determine prev-LSN for the new timeline. We can only determine it if
3089 349 : // the timeline was branched at the current end of the source timeline.
3090 349 : let RecordLsn {
3091 349 : last: src_last,
3092 349 : prev: src_prev,
3093 349 : } = src_timeline.get_last_record_rlsn();
3094 349 : let dst_prev = if src_last == start_lsn {
3095 334 : Some(src_prev)
3096 : } else {
3097 15 : None
3098 : };
3099 :
3100 : // Create the metadata file, noting the ancestor of the new timeline.
3101 : // There is initially no data in it, but all the read-calls know to look
3102 : // into the ancestor.
3103 349 : let metadata = TimelineMetadata::new(
3104 349 : start_lsn,
3105 349 : dst_prev,
3106 349 : Some(src_id),
3107 349 : start_lsn,
3108 349 : *src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
3109 349 : src_timeline.initdb_lsn,
3110 349 : src_timeline.pg_version,
3111 349 : );
3112 :
3113 349 : let uninitialized_timeline = self
3114 349 : .prepare_new_timeline(
3115 349 : dst_id,
3116 349 : &metadata,
3117 349 : timeline_uninit_mark,
3118 349 : start_lsn + 1,
3119 349 : Some(Arc::clone(src_timeline)),
3120 349 : )
3121 UBC 0 : .await?;
3122 :
3123 CBC 349 : let new_timeline = uninitialized_timeline.finish_creation()?;
3124 :
3125 : // Root timeline gets its layers during creation and uploads them along with the metadata.
3126 : // A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
3127 : // We still need to upload its metadata eagerly: if other nodes `attach` the tenant and miss this timeline, their GC
3128 : // could get incorrect information and remove more layers, than needed.
3129 : // See also https://github.com/neondatabase/neon/issues/3865
3130 349 : if let Some(remote_client) = new_timeline.remote_client.as_ref() {
3131 349 : remote_client
3132 349 : .schedule_index_upload_for_metadata_update(&metadata)
3133 349 : .context("branch initial metadata upload")?;
3134 UBC 0 : }
3135 :
3136 CBC 349 : info!("branched timeline {dst_id} from {src_id} at {start_lsn}");
3137 :
3138 349 : Ok(new_timeline)
3139 357 : }
3140 :
3141 : /// For unit tests, make this visible so that other modules can directly create timelines
3142 : #[cfg(test)]
3143 1 : #[tracing::instrument(fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), %timeline_id))]
3144 : pub(crate) async fn bootstrap_timeline_test(
3145 : &self,
3146 : timeline_id: TimelineId,
3147 : pg_version: u32,
3148 : load_existing_initdb: Option<TimelineId>,
3149 : ctx: &RequestContext,
3150 : ) -> anyhow::Result<Arc<Timeline>> {
3151 : let uninit_mark = self.create_timeline_uninit_mark(timeline_id).unwrap();
3152 : self.bootstrap_timeline(
3153 : timeline_id,
3154 : pg_version,
3155 : load_existing_initdb,
3156 : uninit_mark,
3157 : ctx,
3158 : )
3159 : .await
3160 : }
3161 :
3162 : /// - run initdb to init temporary instance and get bootstrap data
3163 : /// - after initialization completes, tar up the temp dir and upload it to S3.
3164 : ///
3165 : /// The caller is responsible for activating the returned timeline.
3166 526 : async fn bootstrap_timeline(
3167 526 : &self,
3168 526 : timeline_id: TimelineId,
3169 526 : pg_version: u32,
3170 526 : load_existing_initdb: Option<TimelineId>,
3171 526 : timeline_uninit_mark: TimelineUninitMark<'_>,
3172 526 : ctx: &RequestContext,
3173 526 : ) -> anyhow::Result<Arc<Timeline>> {
3174 526 : // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
3175 526 : // temporary directory for basebackup files for the given timeline.
3176 526 :
3177 526 : let timelines_path = self.conf.timelines_path(&self.tenant_shard_id);
3178 526 : let pgdata_path = path_with_suffix_extension(
3179 526 : timelines_path.join(format!("basebackup-{timeline_id}")),
3180 526 : TEMP_FILE_SUFFIX,
3181 526 : );
3182 526 :
3183 526 : // an uninit mark was placed before, nothing else can access this timeline files
3184 526 : // current initdb was not run yet, so remove whatever was left from the previous runs
3185 526 : if pgdata_path.exists() {
3186 UBC 0 : fs::remove_dir_all(&pgdata_path).with_context(|| {
3187 0 : format!("Failed to remove already existing initdb directory: {pgdata_path}")
3188 0 : })?;
3189 CBC 526 : }
3190 : // this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
3191 524 : scopeguard::defer! {
3192 524 : if let Err(e) = fs::remove_dir_all(&pgdata_path) {
3193 : // this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call
3194 UBC 0 : error!("Failed to remove temporary initdb directory '{pgdata_path}': {e}");
3195 CBC 524 : }
3196 : }
3197 526 : if let Some(existing_initdb_timeline_id) = load_existing_initdb {
3198 3 : let Some(storage) = &self.remote_storage else {
3199 UBC 0 : bail!("no storage configured but load_existing_initdb set to {existing_initdb_timeline_id}");
3200 : };
3201 CBC 3 : let (initdb_tar_zst_path, initdb_tar_zst) =
3202 3 : self::remote_timeline_client::download_initdb_tar_zst(
3203 3 : self.conf,
3204 3 : storage,
3205 3 : &self.tenant_shard_id,
3206 3 : &existing_initdb_timeline_id,
3207 3 : &self.cancel,
3208 3 : )
3209 1246 : .await
3210 3 : .context("download initdb tar")?;
3211 3 : let buf_read =
3212 3 : BufReader::with_capacity(remote_timeline_client::BUFFER_SIZE, initdb_tar_zst);
3213 3 : import_datadir::extract_tar_zst(&pgdata_path, buf_read)
3214 16100 : .await
3215 3 : .context("extract initdb tar")?;
3216 :
3217 3 : tokio::fs::remove_file(&initdb_tar_zst_path)
3218 3 : .await
3219 3 : .or_else(|e| {
3220 UBC 0 : if e.kind() == std::io::ErrorKind::NotFound {
3221 : // If something else already removed the file, ignore the error
3222 0 : Ok(())
3223 : } else {
3224 0 : Err(e)
3225 : }
3226 CBC 3 : })
3227 3 : .with_context(|| format!("tempfile removal {initdb_tar_zst_path}"))?;
3228 : } else {
3229 : // Init temporarily repo to get bootstrap data, this creates a directory in the `initdb_path` path
3230 6469 : run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
3231 :
3232 : // Upload the created data dir to S3
3233 522 : if let Some(storage) = &self.remote_storage {
3234 522 : let temp_path = timelines_path.join(format!(
3235 522 : "{INITDB_PATH}.upload-{timeline_id}.{TEMP_FILE_SUFFIX}"
3236 522 : ));
3237 :
3238 522 : let (pgdata_zstd, tar_zst_size) =
3239 4270485 : import_datadir::create_tar_zst(&pgdata_path, &temp_path).await?;
3240 522 : backoff::retry(
3241 589 : || async {
3242 589 : self::remote_timeline_client::upload_initdb_dir(
3243 589 : storage,
3244 589 : &self.tenant_shard_id.tenant_id,
3245 589 : &timeline_id,
3246 589 : pgdata_zstd.try_clone().await?,
3247 589 : tar_zst_size,
3248 589 : &self.cancel,
3249 : )
3250 23739 : .await
3251 589 : },
3252 522 : |_| false,
3253 522 : 3,
3254 522 : u32::MAX,
3255 522 : "persist_initdb_tar_zst",
3256 522 : backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Cancelled")),
3257 522 : )
3258 24322 : .await?;
3259 :
3260 522 : tokio::fs::remove_file(&temp_path)
3261 521 : .await
3262 522 : .or_else(|e| {
3263 UBC 0 : if e.kind() == std::io::ErrorKind::NotFound {
3264 : // If something else already removed the file, ignore the error
3265 0 : Ok(())
3266 : } else {
3267 0 : Err(e)
3268 : }
3269 CBC 522 : })
3270 522 : .with_context(|| format!("tempfile removal {temp_path}"))?;
3271 UBC 0 : }
3272 : }
3273 CBC 525 : let pgdata_lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
3274 525 :
3275 525 : // Import the contents of the data directory at the initial checkpoint
3276 525 : // LSN, and any WAL after that.
3277 525 : // Initdb lsn will be equal to last_record_lsn which will be set after import.
3278 525 : // Because we know it upfront avoid having an option or dummy zero value by passing it to the metadata.
3279 525 : let new_metadata = TimelineMetadata::new(
3280 525 : Lsn(0),
3281 525 : None,
3282 525 : None,
3283 525 : Lsn(0),
3284 525 : pgdata_lsn,
3285 525 : pgdata_lsn,
3286 525 : pg_version,
3287 525 : );
3288 525 : let raw_timeline = self
3289 525 : .prepare_new_timeline(
3290 525 : timeline_id,
3291 525 : &new_metadata,
3292 525 : timeline_uninit_mark,
3293 525 : pgdata_lsn,
3294 525 : None,
3295 525 : )
3296 1 : .await?;
3297 :
3298 524 : let tenant_shard_id = raw_timeline.owning_tenant.tenant_shard_id;
3299 524 : let unfinished_timeline = raw_timeline.raw_timeline()?;
3300 :
3301 524 : import_datadir::import_timeline_from_postgres_datadir(
3302 524 : unfinished_timeline,
3303 524 : &pgdata_path,
3304 524 : pgdata_lsn,
3305 524 : ctx,
3306 524 : )
3307 2611938 : .await
3308 524 : .with_context(|| {
3309 UBC 0 : format!("Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}")
3310 CBC 524 : })?;
3311 :
3312 : // Flush the new layer files to disk, before we make the timeline as available to
3313 : // the outside world.
3314 : //
3315 : // Flush loop needs to be spawned in order to be able to flush.
3316 524 : unfinished_timeline.maybe_spawn_flush_loop();
3317 524 :
3318 524 : fail::fail_point!("before-checkpoint-new-timeline", |_| {
3319 2 : anyhow::bail!("failpoint before-checkpoint-new-timeline");
3320 524 : });
3321 :
3322 522 : unfinished_timeline
3323 522 : .freeze_and_flush()
3324 521 : .await
3325 521 : .with_context(|| {
3326 UBC 0 : format!(
3327 0 : "Failed to flush after pgdatadir import for timeline {tenant_shard_id}/{timeline_id}"
3328 0 : )
3329 CBC 521 : })?;
3330 :
3331 : // All done!
3332 521 : let timeline = raw_timeline.finish_creation()?;
3333 :
3334 520 : info!(
3335 520 : "created root timeline {} timeline.lsn {}",
3336 520 : timeline_id,
3337 520 : timeline.get_last_record_lsn()
3338 520 : );
3339 :
3340 520 : Ok(timeline)
3341 524 : }
3342 :
3343 : /// Call this before constructing a timeline, to build its required structures
3344 921 : fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
3345 921 : let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
3346 921 : let remote_client = RemoteTimelineClient::new(
3347 921 : remote_storage.clone(),
3348 921 : self.deletion_queue_client.clone(),
3349 921 : self.conf,
3350 921 : self.tenant_shard_id,
3351 921 : timeline_id,
3352 921 : self.generation,
3353 921 : );
3354 921 : Some(remote_client)
3355 : } else {
3356 UBC 0 : None
3357 : };
3358 :
3359 CBC 921 : TimelineResources {
3360 921 : remote_client,
3361 921 : deletion_queue_client: self.deletion_queue_client.clone(),
3362 921 : }
3363 921 : }
3364 :
3365 : /// Creates intermediate timeline structure and its files.
3366 : ///
3367 : /// An empty layer map is initialized, and new data and WAL can be imported starting
3368 : /// at 'disk_consistent_lsn'. After any initial data has been imported, call
3369 : /// `finish_creation` to insert the Timeline into the timelines map and to remove the
3370 : /// uninit mark file.
3371 921 : async fn prepare_new_timeline<'a>(
3372 921 : &'a self,
3373 921 : new_timeline_id: TimelineId,
3374 921 : new_metadata: &TimelineMetadata,
3375 921 : uninit_mark: TimelineUninitMark<'a>,
3376 921 : start_lsn: Lsn,
3377 921 : ancestor: Option<Arc<Timeline>>,
3378 921 : ) -> anyhow::Result<UninitializedTimeline> {
3379 921 : let tenant_shard_id = self.tenant_shard_id;
3380 921 :
3381 921 : let resources = self.build_timeline_resources(new_timeline_id);
3382 921 : if let Some(remote_client) = &resources.remote_client {
3383 921 : remote_client.init_upload_queue_for_empty_remote(new_metadata)?;
3384 UBC 0 : }
3385 :
3386 CBC 921 : let timeline_struct = self
3387 921 : .create_timeline_struct(
3388 921 : new_timeline_id,
3389 921 : new_metadata,
3390 921 : ancestor,
3391 921 : resources,
3392 921 : CreateTimelineCause::Load,
3393 921 : )
3394 921 : .context("Failed to create timeline data structure")?;
3395 :
3396 921 : timeline_struct.init_empty_layer_map(start_lsn);
3397 :
3398 921 : if let Err(e) = self
3399 921 : .create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
3400 UBC 0 : .await
3401 : {
3402 CBC 1 : error!("Failed to create initial files for timeline {tenant_shard_id}/{new_timeline_id}, cleaning up: {e:?}");
3403 1 : cleanup_timeline_directory(uninit_mark);
3404 1 : return Err(e);
3405 920 : }
3406 :
3407 UBC 0 : debug!(
3408 0 : "Successfully created initial files for timeline {tenant_shard_id}/{new_timeline_id}"
3409 0 : );
3410 :
3411 CBC 920 : Ok(UninitializedTimeline::new(
3412 920 : self,
3413 920 : new_timeline_id,
3414 920 : Some((timeline_struct, uninit_mark)),
3415 920 : ))
3416 921 : }
3417 :
3418 921 : async fn create_timeline_files(
3419 921 : &self,
3420 921 : timeline_path: &Utf8Path,
3421 921 : new_timeline_id: &TimelineId,
3422 921 : new_metadata: &TimelineMetadata,
3423 921 : ) -> anyhow::Result<()> {
3424 921 : crashsafe::create_dir(timeline_path).context("Failed to create timeline directory")?;
3425 :
3426 921 : fail::fail_point!("after-timeline-uninit-mark-creation", |_| {
3427 1 : anyhow::bail!("failpoint after-timeline-uninit-mark-creation");
3428 921 : });
3429 :
3430 920 : save_metadata(
3431 920 : self.conf,
3432 920 : &self.tenant_shard_id,
3433 920 : new_timeline_id,
3434 920 : new_metadata,
3435 920 : )
3436 UBC 0 : .await
3437 CBC 920 : .context("Failed to create timeline metadata")?;
3438 920 : Ok(())
3439 921 : }
3440 :
3441 : /// Attempts to create an uninit mark file for the timeline initialization.
3442 : /// Bails, if the timeline is already loaded into the memory (i.e. initialized before), or the uninit mark file already exists.
3443 : ///
3444 : /// This way, we need to hold the timelines lock only for small amount of time during the mark check/creation per timeline init.
3445 939 : fn create_timeline_uninit_mark(
3446 939 : &self,
3447 939 : timeline_id: TimelineId,
3448 939 : ) -> Result<TimelineUninitMark, TimelineExclusionError> {
3449 939 : let tenant_shard_id = self.tenant_shard_id;
3450 939 :
3451 939 : let uninit_mark_path = self
3452 939 : .conf
3453 939 : .timeline_uninit_mark_file_path(tenant_shard_id, timeline_id);
3454 939 : let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id);
3455 :
3456 939 : let uninit_mark = TimelineUninitMark::new(
3457 939 : self,
3458 939 : timeline_id,
3459 939 : uninit_mark_path.clone(),
3460 939 : timeline_path.clone(),
3461 939 : )?;
3462 :
3463 : // At this stage, we have got exclusive access to in-memory state for this timeline ID
3464 : // for creation.
3465 : // A timeline directory should never exist on disk already:
3466 : // - a previous failed creation would have cleaned up after itself
3467 : // - a pageserver restart would clean up timeline directories that don't have valid remote state
3468 : //
3469 : // Therefore it is an unexpected internal error to encounter a timeline directory already existing here,
3470 : // this error may indicate a bug in cleanup on failed creations.
3471 938 : if timeline_path.exists() {
3472 UBC 0 : return Err(TimelineExclusionError::Other(anyhow::anyhow!(
3473 0 : "Timeline directory already exists! This is a bug."
3474 0 : )));
3475 CBC 938 : }
3476 938 :
3477 938 : // Create the on-disk uninit mark _after_ the in-memory acquisition of the tenant ID: guarantees
3478 938 : // that during process runtime, colliding creations will be caught in-memory without getting
3479 938 : // as far as failing to write a file.
3480 938 : fs::OpenOptions::new()
3481 938 : .write(true)
3482 938 : .create_new(true)
3483 938 : .open(&uninit_mark_path)
3484 938 : .context("Failed to create uninit mark file")
3485 938 : .and_then(|_| {
3486 938 : crashsafe::fsync_file_and_parent(&uninit_mark_path)
3487 938 : .context("Failed to fsync uninit mark file")
3488 938 : })
3489 938 : .with_context(|| {
3490 UBC 0 : format!("Failed to crate uninit mark for timeline {tenant_shard_id}/{timeline_id}")
3491 CBC 938 : })?;
3492 :
3493 938 : Ok(uninit_mark)
3494 939 : }
3495 :
3496 : /// Gathers inputs from all of the timelines to produce a sizing model input.
3497 : ///
3498 : /// Future is cancellation safe. Only one calculation can be running at once per tenant.
3499 84 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
3500 : pub async fn gather_size_inputs(
3501 : &self,
3502 : // `max_retention_period` overrides the cutoff that is used to calculate the size
3503 : // (only if it is shorter than the real cutoff).
3504 : max_retention_period: Option<u64>,
3505 : cause: LogicalSizeCalculationCause,
3506 : cancel: &CancellationToken,
3507 : ctx: &RequestContext,
3508 : ) -> anyhow::Result<size::ModelInputs> {
3509 : let logical_sizes_at_once = self
3510 : .conf
3511 : .concurrent_tenant_size_logical_size_queries
3512 : .inner();
3513 :
3514 : // TODO: Having a single mutex block concurrent reads is not great for performance.
3515 : //
3516 : // But the only case where we need to run multiple of these at once is when we
3517 : // request a size for a tenant manually via API, while another background calculation
3518 : // is in progress (which is not a common case).
3519 : //
3520 : // See more for on the issue #2748 condenced out of the initial PR review.
3521 : let mut shared_cache = self.cached_logical_sizes.lock().await;
3522 :
3523 : size::gather_inputs(
3524 : self,
3525 : logical_sizes_at_once,
3526 : max_retention_period,
3527 : &mut shared_cache,
3528 : cause,
3529 : cancel,
3530 : ctx,
3531 : )
3532 : .await
3533 : }
3534 :
3535 : /// Calculate synthetic tenant size and cache the result.
3536 : /// This is periodically called by background worker.
3537 : /// result is cached in tenant struct
3538 29 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
3539 : pub async fn calculate_synthetic_size(
3540 : &self,
3541 : cause: LogicalSizeCalculationCause,
3542 : cancel: &CancellationToken,
3543 : ctx: &RequestContext,
3544 : ) -> anyhow::Result<u64> {
3545 : let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?;
3546 :
3547 : let size = inputs.calculate()?;
3548 :
3549 : self.set_cached_synthetic_size(size);
3550 :
3551 : Ok(size)
3552 : }
3553 :
3554 : /// Cache given synthetic size and update the metric value
3555 27 : pub fn set_cached_synthetic_size(&self, size: u64) {
3556 27 : self.cached_synthetic_tenant_size
3557 27 : .store(size, Ordering::Relaxed);
3558 27 :
3559 27 : TENANT_SYNTHETIC_SIZE_METRIC
3560 27 : .get_metric_with_label_values(&[&self.tenant_shard_id.tenant_id.to_string()])
3561 27 : .unwrap()
3562 27 : .set(size);
3563 27 : }
3564 :
3565 27 : pub fn cached_synthetic_size(&self) -> u64 {
3566 27 : self.cached_synthetic_tenant_size.load(Ordering::Relaxed)
3567 27 : }
3568 :
3569 : /// Flush any in-progress layers, schedule uploads, and wait for uploads to complete.
3570 : ///
3571 : /// This function can take a long time: callers should wrap it in a timeout if calling
3572 : /// from an external API handler.
3573 : ///
3574 : /// Cancel-safety: cancelling this function may leave I/O running, but such I/O is
3575 : /// still bounded by tenant/timeline shutdown.
3576 UBC 0 : #[tracing::instrument(skip_all)]
3577 : pub(crate) async fn flush_remote(&self) -> anyhow::Result<()> {
3578 : let timelines = self.timelines.lock().unwrap().clone();
3579 :
3580 CBC 1 : async fn flush_timeline(_gate: GateGuard, timeline: Arc<Timeline>) -> anyhow::Result<()> {
3581 1 : tracing::info!(timeline_id=%timeline.timeline_id, "Flushing...");
3582 1 : timeline.freeze_and_flush().await?;
3583 1 : tracing::info!(timeline_id=%timeline.timeline_id, "Waiting for uploads...");
3584 1 : if let Some(client) = &timeline.remote_client {
3585 1 : client.wait_completion().await?;
3586 UBC 0 : }
3587 :
3588 CBC 1 : Ok(())
3589 1 : }
3590 :
3591 : // We do not use a JoinSet for these tasks, because we don't want them to be
3592 : // aborted when this function's future is cancelled: they should stay alive
3593 : // holding their GateGuard until they complete, to ensure their I/Os complete
3594 : // before Timeline shutdown completes.
3595 : let mut results = FuturesUnordered::new();
3596 :
3597 : for (_timeline_id, timeline) in timelines {
3598 : // Run each timeline's flush in a task holding the timeline's gate: this
3599 : // means that if this function's future is cancelled, the Timeline shutdown
3600 : // will still wait for any I/O in here to complete.
3601 : let gate = match timeline.gate.enter() {
3602 : Ok(g) => g,
3603 : Err(_) => continue,
3604 : };
3605 2 : let jh = tokio::task::spawn(async move { flush_timeline(gate, timeline).await });
3606 : results.push(jh);
3607 : }
3608 :
3609 : while let Some(r) = results.next().await {
3610 : if let Err(e) = r {
3611 : if !e.is_cancelled() && !e.is_panic() {
3612 UBC 0 : tracing::error!("unexpected join error: {e:?}");
3613 : }
3614 : }
3615 : }
3616 :
3617 : // The flushes we did above were just writes, but the Tenant might have had
3618 : // pending deletions as well from recent compaction/gc: we want to flush those
3619 : // as well. This requires flushing the global delete queue. This is cheap
3620 : // because it's typically a no-op.
3621 : match self.deletion_queue_client.flush_execute().await {
3622 : Ok(_) => {}
3623 : Err(DeletionQueueError::ShuttingDown) => {}
3624 : }
3625 :
3626 : Ok(())
3627 : }
3628 : }
3629 :
3630 CBC 1 : fn remove_timeline_and_uninit_mark(
3631 1 : timeline_dir: &Utf8Path,
3632 1 : uninit_mark: &Utf8Path,
3633 1 : ) -> anyhow::Result<()> {
3634 1 : fs::remove_dir_all(timeline_dir)
3635 1 : .or_else(|e| {
3636 UBC 0 : if e.kind() == std::io::ErrorKind::NotFound {
3637 : // we can leave the uninit mark without a timeline dir,
3638 : // just remove the mark then
3639 0 : Ok(())
3640 : } else {
3641 0 : Err(e)
3642 : }
3643 CBC 1 : })
3644 1 : .with_context(|| {
3645 UBC 0 : format!("Failed to remove unit marked timeline directory {timeline_dir}")
3646 CBC 1 : })?;
3647 1 : fs::remove_file(uninit_mark)
3648 1 : .with_context(|| format!("Failed to remove timeline uninit mark file {uninit_mark}"))?;
3649 :
3650 1 : Ok(())
3651 1 : }
3652 :
3653 462 : pub(crate) async fn create_tenant_files(
3654 462 : conf: &'static PageServerConf,
3655 462 : location_conf: &LocationConf,
3656 462 : tenant_shard_id: &TenantShardId,
3657 462 : ) -> anyhow::Result<Utf8PathBuf> {
3658 462 : let target_tenant_directory = conf.tenant_path(tenant_shard_id);
3659 462 : anyhow::ensure!(
3660 462 : !target_tenant_directory
3661 462 : .try_exists()
3662 462 : .context("check existence of tenant directory")?,
3663 2 : "tenant directory already exists",
3664 : );
3665 :
3666 460 : let temporary_tenant_dir =
3667 460 : path_with_suffix_extension(&target_tenant_directory, TEMP_FILE_SUFFIX);
3668 UBC 0 : debug!("Creating temporary directory structure in {temporary_tenant_dir}");
3669 :
3670 : // top-level dir may exist if we are creating it through CLI
3671 CBC 460 : crashsafe::create_dir_all(&temporary_tenant_dir).with_context(|| {
3672 UBC 0 : format!("could not create temporary tenant directory {temporary_tenant_dir}")
3673 CBC 460 : })?;
3674 :
3675 460 : let creation_result = try_create_target_tenant_dir(
3676 460 : conf,
3677 460 : location_conf,
3678 460 : tenant_shard_id,
3679 460 : &temporary_tenant_dir,
3680 460 : &target_tenant_directory,
3681 460 : )
3682 919 : .await;
3683 :
3684 460 : if creation_result.is_err() {
3685 1 : error!(
3686 1 : "Failed to create directory structure for tenant {tenant_shard_id}, cleaning tmp data"
3687 1 : );
3688 1 : if let Err(e) = fs::remove_dir_all(&temporary_tenant_dir) {
3689 UBC 0 : error!("Failed to remove temporary tenant directory {temporary_tenant_dir:?}: {e}")
3690 CBC 1 : } else if let Err(e) = crashsafe::fsync(&temporary_tenant_dir) {
3691 1 : error!(
3692 1 : "Failed to fsync removed temporary tenant directory {temporary_tenant_dir:?}: {e}"
3693 1 : )
3694 UBC 0 : }
3695 CBC 459 : }
3696 :
3697 460 : creation_result?;
3698 :
3699 459 : Ok(target_tenant_directory)
3700 462 : }
3701 :
3702 460 : async fn try_create_target_tenant_dir(
3703 460 : conf: &'static PageServerConf,
3704 460 : location_conf: &LocationConf,
3705 460 : tenant_shard_id: &TenantShardId,
3706 460 : temporary_tenant_dir: &Utf8Path,
3707 460 : target_tenant_directory: &Utf8Path,
3708 460 : ) -> Result<(), anyhow::Error> {
3709 460 : let temporary_tenant_timelines_dir = rebase_directory(
3710 460 : &conf.timelines_path(tenant_shard_id),
3711 460 : target_tenant_directory,
3712 460 : temporary_tenant_dir,
3713 460 : )
3714 460 : .with_context(|| format!("resolve tenant {tenant_shard_id} temporary timelines dir"))?;
3715 460 : let temporary_legacy_tenant_config_path = rebase_directory(
3716 460 : &conf.tenant_config_path(tenant_shard_id),
3717 460 : target_tenant_directory,
3718 460 : temporary_tenant_dir,
3719 460 : )
3720 460 : .with_context(|| format!("resolve tenant {tenant_shard_id} temporary config path"))?;
3721 460 : let temporary_tenant_config_path = rebase_directory(
3722 460 : &conf.tenant_location_config_path(tenant_shard_id),
3723 460 : target_tenant_directory,
3724 460 : temporary_tenant_dir,
3725 460 : )
3726 460 : .with_context(|| format!("resolve tenant {tenant_shard_id} temporary config path"))?;
3727 :
3728 460 : Tenant::persist_tenant_config_at(
3729 460 : tenant_shard_id,
3730 460 : &temporary_tenant_config_path,
3731 460 : &temporary_legacy_tenant_config_path,
3732 460 : location_conf,
3733 460 : )
3734 919 : .await?;
3735 :
3736 460 : crashsafe::create_dir(&temporary_tenant_timelines_dir).with_context(|| {
3737 UBC 0 : format!(
3738 0 : "create tenant {} temporary timelines directory {}",
3739 0 : tenant_shard_id, temporary_tenant_timelines_dir,
3740 0 : )
3741 CBC 460 : })?;
3742 460 : fail::fail_point!("tenant-creation-before-tmp-rename", |_| {
3743 1 : anyhow::bail!("failpoint tenant-creation-before-tmp-rename");
3744 460 : });
3745 :
3746 : // Make sure the current tenant directory entries are durable before renaming.
3747 : // Without this, a crash may reorder any of the directory entry creations above.
3748 459 : crashsafe::fsync(temporary_tenant_dir)
3749 459 : .with_context(|| format!("sync temporary tenant directory {temporary_tenant_dir:?}"))?;
3750 :
3751 459 : fs::rename(temporary_tenant_dir, target_tenant_directory).with_context(|| {
3752 UBC 0 : format!(
3753 0 : "move tenant {} temporary directory {} into the permanent one {}",
3754 0 : tenant_shard_id, temporary_tenant_dir, target_tenant_directory
3755 0 : )
3756 CBC 459 : })?;
3757 459 : let target_dir_parent = target_tenant_directory.parent().with_context(|| {
3758 UBC 0 : format!(
3759 0 : "get tenant {} dir parent for {}",
3760 0 : tenant_shard_id, target_tenant_directory,
3761 0 : )
3762 CBC 459 : })?;
3763 459 : crashsafe::fsync(target_dir_parent).with_context(|| {
3764 UBC 0 : format!(
3765 0 : "fsync renamed directory's parent {} for tenant {}",
3766 0 : target_dir_parent, tenant_shard_id,
3767 0 : )
3768 CBC 459 : })?;
3769 :
3770 459 : Ok(())
3771 460 : }
3772 :
3773 1380 : fn rebase_directory(
3774 1380 : original_path: &Utf8Path,
3775 1380 : base: &Utf8Path,
3776 1380 : new_base: &Utf8Path,
3777 1380 : ) -> anyhow::Result<Utf8PathBuf> {
3778 1380 : let relative_path = original_path.strip_prefix(base).with_context(|| {
3779 UBC 0 : format!(
3780 0 : "Failed to strip base prefix '{}' off path '{}'",
3781 0 : base, original_path
3782 0 : )
3783 CBC 1380 : })?;
3784 1380 : Ok(new_base.join(relative_path))
3785 1380 : }
3786 :
3787 : /// Create the cluster temporarily in 'initdbpath' directory inside the repository
3788 : /// to get bootstrap data for timeline initialization.
3789 523 : async fn run_initdb(
3790 523 : conf: &'static PageServerConf,
3791 523 : initdb_target_dir: &Utf8Path,
3792 523 : pg_version: u32,
3793 523 : cancel: &CancellationToken,
3794 523 : ) -> Result<(), InitdbError> {
3795 523 : let initdb_bin_path = conf
3796 523 : .pg_bin_dir(pg_version)
3797 523 : .map_err(InitdbError::Other)?
3798 523 : .join("initdb");
3799 523 : let initdb_lib_dir = conf.pg_lib_dir(pg_version).map_err(InitdbError::Other)?;
3800 523 : info!(
3801 523 : "running {} in {}, libdir: {}",
3802 523 : initdb_bin_path, initdb_target_dir, initdb_lib_dir,
3803 523 : );
3804 :
3805 523 : let _permit = INIT_DB_SEMAPHORE.acquire().await;
3806 :
3807 523 : let initdb_command = tokio::process::Command::new(&initdb_bin_path)
3808 523 : .args(["-D", initdb_target_dir.as_ref()])
3809 523 : .args(["-U", &conf.superuser])
3810 523 : .args(["-E", "utf8"])
3811 523 : .arg("--no-instructions")
3812 523 : .arg("--no-sync")
3813 523 : .env_clear()
3814 523 : .env("LD_LIBRARY_PATH", &initdb_lib_dir)
3815 523 : .env("DYLD_LIBRARY_PATH", &initdb_lib_dir)
3816 523 : .stdout(Stdio::piped())
3817 523 : .stderr(Stdio::piped())
3818 523 : // If the `select!` below doesn't finish the `wait_with_output`,
3819 523 : // let the task get `wait()`ed for asynchronously by tokio.
3820 523 : // This means there is a slim chance we can go over the INIT_DB_SEMAPHORE.
3821 523 : // TODO: fix for this is non-trivial, see
3822 523 : // https://github.com/neondatabase/neon/pull/5921#pullrequestreview-1750858021
3823 523 : //
3824 523 : .kill_on_drop(true)
3825 523 : .spawn()?;
3826 :
3827 523 : tokio::select! {
3828 522 : initdb_output = initdb_command.wait_with_output() => {
3829 : let initdb_output = initdb_output?;
3830 : if !initdb_output.status.success() {
3831 : return Err(InitdbError::Failed(initdb_output.status, initdb_output.stderr));
3832 : }
3833 : }
3834 : _ = cancel.cancelled() => {
3835 : return Err(InitdbError::Cancelled);
3836 : }
3837 : }
3838 :
3839 522 : Ok(())
3840 522 : }
3841 :
3842 : impl Drop for Tenant {
3843 199 : fn drop(&mut self) {
3844 199 : remove_tenant_metrics(&self.tenant_shard_id.tenant_id);
3845 199 : }
3846 : }
3847 : /// Dump contents of a layer file to stdout.
3848 UBC 0 : pub async fn dump_layerfile_from_path(
3849 0 : path: &Utf8Path,
3850 0 : verbose: bool,
3851 0 : ctx: &RequestContext,
3852 0 : ) -> anyhow::Result<()> {
3853 : use std::os::unix::fs::FileExt;
3854 :
3855 : // All layer files start with a two-byte "magic" value, to identify the kind of
3856 : // file.
3857 0 : let file = File::open(path)?;
3858 0 : let mut header_buf = [0u8; 2];
3859 0 : file.read_exact_at(&mut header_buf, 0)?;
3860 :
3861 0 : match u16::from_be_bytes(header_buf) {
3862 : crate::IMAGE_FILE_MAGIC => {
3863 0 : ImageLayer::new_for_path(path, file)?
3864 0 : .dump(verbose, ctx)
3865 0 : .await?
3866 : }
3867 : crate::DELTA_FILE_MAGIC => {
3868 0 : DeltaLayer::new_for_path(path, file)?
3869 0 : .dump(verbose, ctx)
3870 0 : .await?
3871 : }
3872 0 : magic => bail!("unrecognized magic identifier: {:?}", magic),
3873 : }
3874 :
3875 0 : Ok(())
3876 0 : }
3877 :
3878 : #[cfg(test)]
3879 : pub(crate) mod harness {
3880 : use bytes::{Bytes, BytesMut};
3881 : use once_cell::sync::OnceCell;
3882 : use pageserver_api::shard::ShardIndex;
3883 : use std::fs;
3884 : use std::sync::Arc;
3885 : use utils::logging;
3886 : use utils::lsn::Lsn;
3887 :
3888 : use crate::deletion_queue::mock::MockDeletionQueue;
3889 : use crate::{
3890 : config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
3891 : };
3892 :
3893 : use super::*;
3894 : use crate::tenant::config::{TenantConf, TenantConfOpt};
3895 : use hex_literal::hex;
3896 : use utils::id::{TenantId, TimelineId};
3897 :
3898 : pub const TIMELINE_ID: TimelineId =
3899 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
3900 : pub const NEW_TIMELINE_ID: TimelineId =
3901 : TimelineId::from_array(hex!("AA223344556677881122334455667788"));
3902 :
3903 : /// Convenience function to create a page image with given string as the only content
3904 : #[allow(non_snake_case)]
3905 CBC 854114 : pub fn TEST_IMG(s: &str) -> Bytes {
3906 854114 : let mut buf = BytesMut::new();
3907 854114 : buf.extend_from_slice(s.as_bytes());
3908 854114 : buf.resize(64, 0);
3909 854114 :
3910 854114 : buf.freeze()
3911 854114 : }
3912 :
3913 : impl From<TenantConf> for TenantConfOpt {
3914 42 : fn from(tenant_conf: TenantConf) -> Self {
3915 42 : Self {
3916 42 : checkpoint_distance: Some(tenant_conf.checkpoint_distance),
3917 42 : checkpoint_timeout: Some(tenant_conf.checkpoint_timeout),
3918 42 : compaction_target_size: Some(tenant_conf.compaction_target_size),
3919 42 : compaction_period: Some(tenant_conf.compaction_period),
3920 42 : compaction_threshold: Some(tenant_conf.compaction_threshold),
3921 42 : gc_horizon: Some(tenant_conf.gc_horizon),
3922 42 : gc_period: Some(tenant_conf.gc_period),
3923 42 : image_creation_threshold: Some(tenant_conf.image_creation_threshold),
3924 42 : pitr_interval: Some(tenant_conf.pitr_interval),
3925 42 : walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
3926 42 : lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
3927 42 : max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
3928 42 : trace_read_requests: Some(tenant_conf.trace_read_requests),
3929 42 : eviction_policy: Some(tenant_conf.eviction_policy),
3930 42 : min_resident_size_override: tenant_conf.min_resident_size_override,
3931 42 : evictions_low_residence_duration_metric_threshold: Some(
3932 42 : tenant_conf.evictions_low_residence_duration_metric_threshold,
3933 42 : ),
3934 42 : gc_feedback: Some(tenant_conf.gc_feedback),
3935 42 : heatmap_period: Some(tenant_conf.heatmap_period),
3936 42 : }
3937 42 : }
3938 : }
3939 :
3940 : enum LoadMode {
3941 : Local,
3942 : Remote,
3943 : }
3944 :
3945 : pub struct TenantHarness {
3946 : pub conf: &'static PageServerConf,
3947 : pub tenant_conf: TenantConf,
3948 : // TODO(sharding): remove duplicative `tenant_id` in favor of access to tenant_shard_id
3949 : pub(crate) tenant_id: TenantId,
3950 : pub tenant_shard_id: TenantShardId,
3951 : pub generation: Generation,
3952 : pub shard: ShardIndex,
3953 : pub remote_storage: GenericRemoteStorage,
3954 : pub remote_fs_dir: Utf8PathBuf,
3955 : pub deletion_queue: MockDeletionQueue,
3956 : }
3957 :
3958 : static LOG_HANDLE: OnceCell<()> = OnceCell::new();
3959 :
3960 44 : pub(crate) fn setup_logging() {
3961 44 : LOG_HANDLE.get_or_init(|| {
3962 44 : logging::init(
3963 44 : logging::LogFormat::Test,
3964 44 : // enable it in case the tests exercise code paths that use
3965 44 : // debug_assert_current_span_has_tenant_and_timeline_id
3966 44 : logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
3967 44 : logging::Output::Stdout,
3968 44 : )
3969 44 : .expect("Failed to init test logging")
3970 44 : });
3971 44 : }
3972 :
3973 : impl TenantHarness {
3974 41 : pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
3975 41 : setup_logging();
3976 41 :
3977 41 : let repo_dir = PageServerConf::test_repo_dir(test_name);
3978 41 : let _ = fs::remove_dir_all(&repo_dir);
3979 41 : fs::create_dir_all(&repo_dir)?;
3980 :
3981 41 : let conf = PageServerConf::dummy_conf(repo_dir);
3982 41 : // Make a static copy of the config. This can never be free'd, but that's
3983 41 : // OK in a test.
3984 41 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
3985 41 :
3986 41 : // Disable automatic GC and compaction to make the unit tests more deterministic.
3987 41 : // The tests perform them manually if needed.
3988 41 : let tenant_conf = TenantConf {
3989 41 : gc_period: Duration::ZERO,
3990 41 : compaction_period: Duration::ZERO,
3991 41 : ..TenantConf::default()
3992 41 : };
3993 41 :
3994 41 : let tenant_id = TenantId::generate();
3995 41 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
3996 41 : fs::create_dir_all(conf.tenant_path(&tenant_shard_id))?;
3997 41 : fs::create_dir_all(conf.timelines_path(&tenant_shard_id))?;
3998 :
3999 : use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
4000 41 : let remote_fs_dir = conf.workdir.join("localfs");
4001 41 : std::fs::create_dir_all(&remote_fs_dir).unwrap();
4002 41 : let config = RemoteStorageConfig {
4003 41 : storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
4004 41 : };
4005 41 : let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
4006 41 : let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()));
4007 41 :
4008 41 : Ok(Self {
4009 41 : conf,
4010 41 : tenant_conf,
4011 41 : tenant_id,
4012 41 : tenant_shard_id,
4013 41 : generation: Generation::new(0xdeadbeef),
4014 41 : shard: ShardIndex::unsharded(),
4015 41 : remote_storage,
4016 41 : remote_fs_dir,
4017 41 : deletion_queue,
4018 41 : })
4019 41 : }
4020 :
4021 40 : pub async fn load(&self) -> (Arc<Tenant>, RequestContext) {
4022 40 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
4023 40 : (
4024 40 : self.try_load(&ctx)
4025 50 : .await
4026 40 : .expect("failed to load test tenant"),
4027 40 : ctx,
4028 40 : )
4029 40 : }
4030 :
4031 41 : fn remote_empty(&self) -> bool {
4032 41 : let tenant_path = self.conf.tenant_path(&self.tenant_shard_id);
4033 41 : let remote_tenant_dir = self
4034 41 : .remote_fs_dir
4035 41 : .join(tenant_path.strip_prefix(&self.conf.workdir).unwrap());
4036 41 : if std::fs::metadata(&remote_tenant_dir).is_err() {
4037 39 : return true;
4038 2 : }
4039 2 :
4040 2 : match std::fs::read_dir(remote_tenant_dir)
4041 2 : .unwrap()
4042 2 : .flatten()
4043 2 : .next()
4044 : {
4045 2 : Some(entry) => {
4046 2 : tracing::debug!(
4047 UBC 0 : "remote_empty: not empty, found file {}",
4048 0 : entry.file_name().to_string_lossy(),
4049 0 : );
4050 CBC 2 : false
4051 : }
4052 UBC 0 : None => true,
4053 : }
4054 CBC 41 : }
4055 :
4056 42 : async fn do_try_load(
4057 42 : &self,
4058 42 : ctx: &RequestContext,
4059 42 : mode: LoadMode,
4060 42 : ) -> anyhow::Result<Arc<Tenant>> {
4061 42 : let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
4062 42 :
4063 42 : let tenant = Arc::new(Tenant::new(
4064 42 : TenantState::Loading,
4065 42 : self.conf,
4066 42 : AttachedTenantConf::try_from(LocationConf::attached_single(
4067 42 : TenantConfOpt::from(self.tenant_conf),
4068 42 : self.generation,
4069 42 : ))
4070 42 : .unwrap(),
4071 42 : // This is a legacy/test code path: sharding isn't supported here.
4072 42 : ShardIdentity::unsharded(),
4073 42 : walredo_mgr,
4074 42 : self.tenant_shard_id,
4075 42 : Some(self.remote_storage.clone()),
4076 42 : self.deletion_queue.new_client(),
4077 42 : ));
4078 42 :
4079 42 : match mode {
4080 : LoadMode::Local => {
4081 40 : tenant
4082 40 : .load_local(ctx)
4083 40 : .instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
4084 40 : .await?;
4085 : }
4086 : LoadMode::Remote => {
4087 2 : let preload = tenant
4088 2 : .preload(&self.remote_storage, CancellationToken::new())
4089 2 : .instrument(info_span!("try_load_preload", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
4090 7 : .await?;
4091 2 : tenant
4092 2 : .attach(Some(preload), ctx)
4093 2 : .instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
4094 5 : .await?;
4095 : }
4096 : }
4097 :
4098 41 : tenant.state.send_replace(TenantState::Active);
4099 41 : for timeline in tenant.timelines.lock().unwrap().values() {
4100 3 : timeline.set_state(TimelineState::Active);
4101 3 : }
4102 41 : Ok(tenant)
4103 42 : }
4104 :
4105 : /// For tests that specifically want to exercise the local load path, which does
4106 : /// not use remote storage.
4107 1 : pub async fn try_load_local(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
4108 1 : self.do_try_load(ctx, LoadMode::Local).await
4109 1 : }
4110 :
4111 : /// The 'load' in this function is either a local load or a normal attachment,
4112 41 : pub async fn try_load(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
4113 : // If we have nothing in remote storage, must use load_local instead of attach: attach
4114 : // will error out if there are no timelines.
4115 : //
4116 : // See https://github.com/neondatabase/neon/issues/5456 for how we will eliminate
4117 : // this weird state of a Tenant which exists but doesn't have any timelines.
4118 41 : let mode = match self.remote_empty() {
4119 39 : true => LoadMode::Local,
4120 2 : false => LoadMode::Remote,
4121 : };
4122 :
4123 51 : self.do_try_load(ctx, mode).await
4124 41 : }
4125 :
4126 3 : pub fn timeline_path(&self, timeline_id: &TimelineId) -> Utf8PathBuf {
4127 3 : self.conf.timeline_path(&self.tenant_shard_id, timeline_id)
4128 3 : }
4129 : }
4130 :
4131 : // Mock WAL redo manager that doesn't do much
4132 : pub(crate) struct TestRedoManager;
4133 :
4134 : impl TestRedoManager {
4135 : /// # Cancel-Safety
4136 : ///
4137 : /// This method is cancellation-safe.
4138 UBC 0 : pub async fn request_redo(
4139 0 : &self,
4140 0 : key: Key,
4141 0 : lsn: Lsn,
4142 0 : base_img: Option<(Lsn, Bytes)>,
4143 0 : records: Vec<(Lsn, NeonWalRecord)>,
4144 0 : _pg_version: u32,
4145 0 : ) -> anyhow::Result<Bytes> {
4146 0 : let s = format!(
4147 0 : "redo for {} to get to {}, with {} and {} records",
4148 0 : key,
4149 0 : lsn,
4150 0 : if base_img.is_some() {
4151 0 : "base image"
4152 : } else {
4153 0 : "no base image"
4154 : },
4155 0 : records.len()
4156 0 : );
4157 0 : println!("{s}");
4158 0 :
4159 0 : Ok(TEST_IMG(&s))
4160 0 : }
4161 : }
4162 : }
4163 :
4164 : #[cfg(test)]
4165 : mod tests {
4166 : use super::*;
4167 : use crate::keyspace::KeySpaceAccum;
4168 : use crate::repository::{Key, Value};
4169 : use crate::tenant::harness::*;
4170 : use crate::DEFAULT_PG_VERSION;
4171 : use crate::METADATA_FILE_NAME;
4172 : use bytes::BytesMut;
4173 : use hex_literal::hex;
4174 : use once_cell::sync::Lazy;
4175 : use rand::{thread_rng, Rng};
4176 : use tokio_util::sync::CancellationToken;
4177 :
4178 : static TEST_KEY: Lazy<Key> =
4179 CBC 9 : Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
4180 :
4181 1 : #[tokio::test]
4182 1 : async fn test_basic() -> anyhow::Result<()> {
4183 1 : let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
4184 1 : let tline = tenant
4185 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
4186 1 : .await?;
4187 :
4188 1 : let writer = tline.writer().await;
4189 1 : writer
4190 1 : .put(
4191 1 : *TEST_KEY,
4192 1 : Lsn(0x10),
4193 1 : &Value::Image(TEST_IMG("foo at 0x10")),
4194 1 : &ctx,
4195 1 : )
4196 UBC 0 : .await?;
4197 CBC 1 : writer.finish_write(Lsn(0x10));
4198 1 : drop(writer);
4199 :
4200 1 : let writer = tline.writer().await;
4201 1 : writer
4202 1 : .put(
4203 1 : *TEST_KEY,
4204 1 : Lsn(0x20),
4205 1 : &Value::Image(TEST_IMG("foo at 0x20")),
4206 1 : &ctx,
4207 1 : )
4208 UBC 0 : .await?;
4209 CBC 1 : writer.finish_write(Lsn(0x20));
4210 1 : drop(writer);
4211 :
4212 1 : assert_eq!(
4213 1 : tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
4214 1 : TEST_IMG("foo at 0x10")
4215 : );
4216 1 : assert_eq!(
4217 1 : tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
4218 1 : TEST_IMG("foo at 0x10")
4219 : );
4220 1 : assert_eq!(
4221 1 : tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
4222 1 : TEST_IMG("foo at 0x20")
4223 : );
4224 :
4225 1 : Ok(())
4226 : }
4227 :
4228 1 : #[tokio::test]
4229 1 : async fn no_duplicate_timelines() -> anyhow::Result<()> {
4230 1 : let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
4231 1 : .load()
4232 1 : .await;
4233 1 : let _ = tenant
4234 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4235 1 : .await?;
4236 :
4237 1 : match tenant
4238 1 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4239 UBC 0 : .await
4240 : {
4241 0 : Ok(_) => panic!("duplicate timeline creation should fail"),
4242 CBC 1 : Err(e) => assert_eq!(e.to_string(), "Already exists".to_string()),
4243 : }
4244 :
4245 1 : Ok(())
4246 : }
4247 :
4248 : /// Convenience function to create a page image with given string as the only content
4249 5 : pub fn test_value(s: &str) -> Value {
4250 5 : let mut buf = BytesMut::new();
4251 5 : buf.extend_from_slice(s.as_bytes());
4252 5 : Value::Image(buf.freeze())
4253 5 : }
4254 :
4255 : ///
4256 : /// Test branch creation
4257 : ///
4258 1 : #[tokio::test]
4259 1 : async fn test_branch() -> anyhow::Result<()> {
4260 : use std::str::from_utf8;
4261 :
4262 1 : let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
4263 1 : let tline = tenant
4264 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4265 2 : .await?;
4266 1 : let writer = tline.writer().await;
4267 :
4268 : #[allow(non_snake_case)]
4269 1 : let TEST_KEY_A: Key = Key::from_hex("110000000033333333444444445500000001").unwrap();
4270 1 : #[allow(non_snake_case)]
4271 1 : let TEST_KEY_B: Key = Key::from_hex("110000000033333333444444445500000002").unwrap();
4272 1 :
4273 1 : // Insert a value on the timeline
4274 1 : writer
4275 1 : .put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"), &ctx)
4276 UBC 0 : .await?;
4277 CBC 1 : writer
4278 1 : .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"), &ctx)
4279 UBC 0 : .await?;
4280 CBC 1 : writer.finish_write(Lsn(0x20));
4281 1 :
4282 1 : writer
4283 1 : .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"), &ctx)
4284 UBC 0 : .await?;
4285 CBC 1 : writer.finish_write(Lsn(0x30));
4286 1 : writer
4287 1 : .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"), &ctx)
4288 UBC 0 : .await?;
4289 CBC 1 : writer.finish_write(Lsn(0x40));
4290 1 :
4291 1 : //assert_current_logical_size(&tline, Lsn(0x40));
4292 1 :
4293 1 : // Branch the history, modify relation differently on the new timeline
4294 1 : tenant
4295 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx)
4296 UBC 0 : .await?;
4297 CBC 1 : let newtline = tenant
4298 1 : .get_timeline(NEW_TIMELINE_ID, true)
4299 1 : .expect("Should have a local timeline");
4300 1 : let new_writer = newtline.writer().await;
4301 1 : new_writer
4302 1 : .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"), &ctx)
4303 UBC 0 : .await?;
4304 CBC 1 : new_writer.finish_write(Lsn(0x40));
4305 :
4306 : // Check page contents on both branches
4307 1 : assert_eq!(
4308 1 : from_utf8(&tline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
4309 : "foo at 0x40"
4310 : );
4311 1 : assert_eq!(
4312 1 : from_utf8(&newtline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
4313 : "bar at 0x40"
4314 : );
4315 1 : assert_eq!(
4316 1 : from_utf8(&newtline.get(TEST_KEY_B, Lsn(0x40), &ctx).await?)?,
4317 : "foobar at 0x20"
4318 : );
4319 :
4320 : //assert_current_logical_size(&tline, Lsn(0x40));
4321 :
4322 1 : Ok(())
4323 : }
4324 :
4325 10 : async fn make_some_layers(
4326 10 : tline: &Timeline,
4327 10 : start_lsn: Lsn,
4328 10 : ctx: &RequestContext,
4329 10 : ) -> anyhow::Result<()> {
4330 10 : let mut lsn = start_lsn;
4331 : #[allow(non_snake_case)]
4332 : {
4333 10 : let writer = tline.writer().await;
4334 : // Create a relation on the timeline
4335 10 : writer
4336 10 : .put(
4337 10 : *TEST_KEY,
4338 10 : lsn,
4339 10 : &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
4340 10 : ctx,
4341 10 : )
4342 UBC 0 : .await?;
4343 CBC 10 : writer.finish_write(lsn);
4344 10 : lsn += 0x10;
4345 10 : writer
4346 10 : .put(
4347 10 : *TEST_KEY,
4348 10 : lsn,
4349 10 : &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
4350 10 : ctx,
4351 10 : )
4352 UBC 0 : .await?;
4353 CBC 10 : writer.finish_write(lsn);
4354 10 : lsn += 0x10;
4355 10 : }
4356 10 : tline.freeze_and_flush().await?;
4357 : {
4358 10 : let writer = tline.writer().await;
4359 10 : writer
4360 10 : .put(
4361 10 : *TEST_KEY,
4362 10 : lsn,
4363 10 : &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
4364 10 : ctx,
4365 10 : )
4366 UBC 0 : .await?;
4367 CBC 10 : writer.finish_write(lsn);
4368 10 : lsn += 0x10;
4369 10 : writer
4370 10 : .put(
4371 10 : *TEST_KEY,
4372 10 : lsn,
4373 10 : &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
4374 10 : ctx,
4375 10 : )
4376 UBC 0 : .await?;
4377 CBC 10 : writer.finish_write(lsn);
4378 10 : }
4379 10 : tline.freeze_and_flush().await
4380 10 : }
4381 :
4382 1 : #[tokio::test]
4383 1 : async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
4384 1 : let (tenant, ctx) =
4385 1 : TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
4386 1 : .load()
4387 1 : .await;
4388 1 : let tline = tenant
4389 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4390 1 : .await?;
4391 2 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4392 :
4393 : // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
4394 : // FIXME: this doesn't actually remove any layer currently, given how the flushing
4395 : // and compaction works. But it does set the 'cutoff' point so that the cross check
4396 : // below should fail.
4397 1 : tenant
4398 1 : .gc_iteration(
4399 1 : Some(TIMELINE_ID),
4400 1 : 0x10,
4401 1 : Duration::ZERO,
4402 1 : &CancellationToken::new(),
4403 1 : &ctx,
4404 1 : )
4405 UBC 0 : .await?;
4406 :
4407 : // try to branch at lsn 25, should fail because we already garbage collected the data
4408 CBC 1 : match tenant
4409 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
4410 UBC 0 : .await
4411 : {
4412 0 : Ok(_) => panic!("branching should have failed"),
4413 CBC 1 : Err(err) => {
4414 1 : let CreateTimelineError::AncestorLsn(err) = err else {
4415 UBC 0 : panic!("wrong error type")
4416 : };
4417 CBC 1 : assert!(err.to_string().contains("invalid branch start lsn"));
4418 1 : assert!(err
4419 1 : .source()
4420 1 : .unwrap()
4421 1 : .to_string()
4422 1 : .contains("we might've already garbage collected needed data"))
4423 : }
4424 : }
4425 :
4426 1 : Ok(())
4427 : }
4428 :
4429 1 : #[tokio::test]
4430 1 : async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
4431 1 : let (tenant, ctx) =
4432 1 : TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?
4433 1 : .load()
4434 1 : .await;
4435 :
4436 1 : let tline = tenant
4437 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)
4438 1 : .await?;
4439 : // try to branch at lsn 0x25, should fail because initdb lsn is 0x50
4440 1 : match tenant
4441 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
4442 UBC 0 : .await
4443 : {
4444 0 : Ok(_) => panic!("branching should have failed"),
4445 CBC 1 : Err(err) => {
4446 1 : let CreateTimelineError::AncestorLsn(err) = err else {
4447 UBC 0 : panic!("wrong error type");
4448 : };
4449 CBC 1 : assert!(&err.to_string().contains("invalid branch start lsn"));
4450 1 : assert!(&err
4451 1 : .source()
4452 1 : .unwrap()
4453 1 : .to_string()
4454 1 : .contains("is earlier than latest GC horizon"));
4455 : }
4456 : }
4457 :
4458 1 : Ok(())
4459 : }
4460 :
4461 : /*
4462 : // FIXME: This currently fails to error out. Calling GC doesn't currently
4463 : // remove the old value, we'd need to work a little harder
4464 : #[tokio::test]
4465 : async fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
4466 : let repo =
4467 : RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
4468 : .load();
4469 :
4470 : let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
4471 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4472 :
4473 : repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
4474 : let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
4475 : assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
4476 : match tline.get(*TEST_KEY, Lsn(0x25)) {
4477 : Ok(_) => panic!("request for page should have failed"),
4478 : Err(err) => assert!(err.to_string().contains("not found at")),
4479 : }
4480 : Ok(())
4481 : }
4482 : */
4483 :
4484 1 : #[tokio::test]
4485 1 : async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> {
4486 1 : let (tenant, ctx) =
4487 1 : TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
4488 1 : .load()
4489 1 : .await;
4490 1 : let tline = tenant
4491 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4492 2 : .await?;
4493 2 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4494 :
4495 1 : tenant
4496 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
4497 UBC 0 : .await?;
4498 CBC 1 : let newtline = tenant
4499 1 : .get_timeline(NEW_TIMELINE_ID, true)
4500 1 : .expect("Should have a local timeline");
4501 1 :
4502 2 : make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
4503 :
4504 1 : tline.set_broken("test".to_owned());
4505 1 :
4506 1 : tenant
4507 1 : .gc_iteration(
4508 1 : Some(TIMELINE_ID),
4509 1 : 0x10,
4510 1 : Duration::ZERO,
4511 1 : &CancellationToken::new(),
4512 1 : &ctx,
4513 1 : )
4514 UBC 0 : .await?;
4515 :
4516 : // The branchpoints should contain all timelines, even ones marked
4517 : // as Broken.
4518 : {
4519 CBC 1 : let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
4520 1 : assert_eq!(branchpoints.len(), 1);
4521 1 : assert_eq!(branchpoints[0], Lsn(0x40));
4522 : }
4523 :
4524 : // You can read the key from the child branch even though the parent is
4525 : // Broken, as long as you don't need to access data from the parent.
4526 1 : assert_eq!(
4527 1 : newtline.get(*TEST_KEY, Lsn(0x70), &ctx).await?,
4528 1 : TEST_IMG(&format!("foo at {}", Lsn(0x70)))
4529 : );
4530 :
4531 : // This needs to traverse to the parent, and fails.
4532 1 : let err = newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await.unwrap_err();
4533 1 : assert!(err
4534 1 : .to_string()
4535 1 : .contains("will not become active. Current state: Broken"));
4536 :
4537 1 : Ok(())
4538 : }
4539 :
4540 1 : #[tokio::test]
4541 1 : async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
4542 1 : let (tenant, ctx) =
4543 1 : TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
4544 1 : .load()
4545 1 : .await;
4546 1 : let tline = tenant
4547 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4548 2 : .await?;
4549 2 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4550 :
4551 1 : tenant
4552 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
4553 UBC 0 : .await?;
4554 CBC 1 : let newtline = tenant
4555 1 : .get_timeline(NEW_TIMELINE_ID, true)
4556 1 : .expect("Should have a local timeline");
4557 1 : // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
4558 1 : tenant
4559 1 : .gc_iteration(
4560 1 : Some(TIMELINE_ID),
4561 1 : 0x10,
4562 1 : Duration::ZERO,
4563 1 : &CancellationToken::new(),
4564 1 : &ctx,
4565 1 : )
4566 UBC 0 : .await?;
4567 CBC 1 : assert!(newtline.get(*TEST_KEY, Lsn(0x25), &ctx).await.is_ok());
4568 :
4569 1 : Ok(())
4570 : }
4571 1 : #[tokio::test]
4572 1 : async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
4573 1 : let (tenant, ctx) =
4574 1 : TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
4575 1 : .load()
4576 1 : .await;
4577 1 : let tline = tenant
4578 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4579 1 : .await?;
4580 2 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4581 :
4582 1 : tenant
4583 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
4584 UBC 0 : .await?;
4585 CBC 1 : let newtline = tenant
4586 1 : .get_timeline(NEW_TIMELINE_ID, true)
4587 1 : .expect("Should have a local timeline");
4588 1 :
4589 2 : make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
4590 :
4591 : // run gc on parent
4592 1 : tenant
4593 1 : .gc_iteration(
4594 1 : Some(TIMELINE_ID),
4595 1 : 0x10,
4596 1 : Duration::ZERO,
4597 1 : &CancellationToken::new(),
4598 1 : &ctx,
4599 1 : )
4600 UBC 0 : .await?;
4601 :
4602 : // Check that the data is still accessible on the branch.
4603 CBC 1 : assert_eq!(
4604 1 : newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await?,
4605 1 : TEST_IMG(&format!("foo at {}", Lsn(0x40)))
4606 : );
4607 :
4608 1 : Ok(())
4609 : }
4610 :
4611 1 : #[tokio::test]
4612 1 : async fn timeline_load() -> anyhow::Result<()> {
4613 : const TEST_NAME: &str = "timeline_load";
4614 1 : let harness = TenantHarness::create(TEST_NAME)?;
4615 : {
4616 1 : let (tenant, ctx) = harness.load().await;
4617 1 : let tline = tenant
4618 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)
4619 1 : .await?;
4620 2 : make_some_layers(tline.as_ref(), Lsn(0x8000), &ctx).await?;
4621 : // so that all uploads finish & we can call harness.load() below again
4622 1 : tenant
4623 1 : .shutdown(Default::default(), true)
4624 1 : .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_shard_id))
4625 1 : .await
4626 1 : .ok()
4627 1 : .unwrap();
4628 : }
4629 :
4630 3 : let (tenant, _ctx) = harness.load().await;
4631 1 : tenant
4632 1 : .get_timeline(TIMELINE_ID, true)
4633 1 : .expect("cannot load timeline");
4634 1 :
4635 1 : Ok(())
4636 : }
4637 :
4638 1 : #[tokio::test]
4639 1 : async fn timeline_load_with_ancestor() -> anyhow::Result<()> {
4640 : const TEST_NAME: &str = "timeline_load_with_ancestor";
4641 1 : let harness = TenantHarness::create(TEST_NAME)?;
4642 : // create two timelines
4643 : {
4644 1 : let (tenant, ctx) = harness.load().await;
4645 1 : let tline = tenant
4646 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4647 2 : .await?;
4648 :
4649 2 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4650 :
4651 1 : let child_tline = tenant
4652 1 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
4653 UBC 0 : .await?;
4654 CBC 1 : child_tline.set_state(TimelineState::Active);
4655 1 :
4656 1 : let newtline = tenant
4657 1 : .get_timeline(NEW_TIMELINE_ID, true)
4658 1 : .expect("Should have a local timeline");
4659 1 :
4660 2 : make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
4661 :
4662 : // so that all uploads finish & we can call harness.load() below again
4663 1 : tenant
4664 1 : .shutdown(Default::default(), true)
4665 1 : .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_shard_id))
4666 2 : .await
4667 1 : .ok()
4668 1 : .unwrap();
4669 : }
4670 :
4671 : // check that both of them are initially unloaded
4672 9 : let (tenant, _ctx) = harness.load().await;
4673 :
4674 : // check that both, child and ancestor are loaded
4675 1 : let _child_tline = tenant
4676 1 : .get_timeline(NEW_TIMELINE_ID, true)
4677 1 : .expect("cannot get child timeline loaded");
4678 1 :
4679 1 : let _ancestor_tline = tenant
4680 1 : .get_timeline(TIMELINE_ID, true)
4681 1 : .expect("cannot get ancestor timeline loaded");
4682 1 :
4683 1 : Ok(())
4684 : }
4685 :
4686 1 : #[tokio::test]
4687 1 : async fn delta_layer_dumping() -> anyhow::Result<()> {
4688 : use storage_layer::AsLayerDesc;
4689 1 : let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
4690 1 : let tline = tenant
4691 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4692 1 : .await?;
4693 2 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4694 :
4695 1 : let layer_map = tline.layers.read().await;
4696 1 : let level0_deltas = layer_map
4697 1 : .layer_map()
4698 1 : .get_level0_deltas()?
4699 1 : .into_iter()
4700 2 : .map(|desc| layer_map.get_from_desc(&desc))
4701 1 : .collect::<Vec<_>>();
4702 :
4703 1 : assert!(!level0_deltas.is_empty());
4704 :
4705 3 : for delta in level0_deltas {
4706 : // Ensure we are dumping a delta layer here
4707 2 : assert!(delta.layer_desc().is_delta);
4708 2 : delta.dump(true, &ctx).await.unwrap();
4709 : }
4710 :
4711 1 : Ok(())
4712 : }
4713 :
4714 1 : #[tokio::test]
4715 1 : async fn corrupt_local_metadata() -> anyhow::Result<()> {
4716 : const TEST_NAME: &str = "corrupt_metadata";
4717 1 : let harness = TenantHarness::create(TEST_NAME)?;
4718 1 : let (tenant, ctx) = harness.load().await;
4719 :
4720 1 : let tline = tenant
4721 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4722 2 : .await?;
4723 1 : drop(tline);
4724 1 : // so that all uploads finish & we can call harness.try_load() below again
4725 1 : tenant
4726 1 : .shutdown(Default::default(), true)
4727 1 : .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_shard_id))
4728 1 : .await
4729 1 : .ok()
4730 1 : .unwrap();
4731 1 : drop(tenant);
4732 1 :
4733 1 : // Corrupt local metadata
4734 1 : let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
4735 1 : assert!(metadata_path.is_file());
4736 1 : let mut metadata_bytes = std::fs::read(&metadata_path)?;
4737 1 : assert_eq!(metadata_bytes.len(), 512);
4738 1 : metadata_bytes[8] ^= 1;
4739 1 : std::fs::write(metadata_path, metadata_bytes)?;
4740 :
4741 1 : let err = harness.try_load_local(&ctx).await.expect_err("should fail");
4742 1 : // get all the stack with all .context, not only the last one
4743 1 : let message = format!("{err:#}");
4744 1 : let expected = "failed to load metadata";
4745 1 : assert!(
4746 1 : message.contains(expected),
4747 UBC 0 : "message '{message}' expected to contain {expected}"
4748 : );
4749 :
4750 CBC 1 : let mut found_error_message = false;
4751 1 : let mut err_source = err.source();
4752 1 : while let Some(source) = err_source {
4753 1 : if source.to_string().contains("metadata checksum mismatch") {
4754 1 : found_error_message = true;
4755 1 : break;
4756 UBC 0 : }
4757 0 : err_source = source.source();
4758 : }
4759 CBC 1 : assert!(
4760 1 : found_error_message,
4761 UBC 0 : "didn't find the corrupted metadata error in {}",
4762 : message
4763 : );
4764 :
4765 CBC 1 : Ok(())
4766 : }
4767 :
4768 1 : #[tokio::test]
4769 1 : async fn test_images() -> anyhow::Result<()> {
4770 1 : let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
4771 1 : let tline = tenant
4772 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
4773 1 : .await?;
4774 :
4775 1 : let writer = tline.writer().await;
4776 1 : writer
4777 1 : .put(
4778 1 : *TEST_KEY,
4779 1 : Lsn(0x10),
4780 1 : &Value::Image(TEST_IMG("foo at 0x10")),
4781 1 : &ctx,
4782 1 : )
4783 UBC 0 : .await?;
4784 CBC 1 : writer.finish_write(Lsn(0x10));
4785 1 : drop(writer);
4786 1 :
4787 1 : tline.freeze_and_flush().await?;
4788 1 : tline
4789 1 : .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
4790 1 : .await?;
4791 :
4792 1 : let writer = tline.writer().await;
4793 1 : writer
4794 1 : .put(
4795 1 : *TEST_KEY,
4796 1 : Lsn(0x20),
4797 1 : &Value::Image(TEST_IMG("foo at 0x20")),
4798 1 : &ctx,
4799 1 : )
4800 UBC 0 : .await?;
4801 CBC 1 : writer.finish_write(Lsn(0x20));
4802 1 : drop(writer);
4803 1 :
4804 1 : tline.freeze_and_flush().await?;
4805 1 : tline
4806 1 : .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
4807 1 : .await?;
4808 :
4809 1 : let writer = tline.writer().await;
4810 1 : writer
4811 1 : .put(
4812 1 : *TEST_KEY,
4813 1 : Lsn(0x30),
4814 1 : &Value::Image(TEST_IMG("foo at 0x30")),
4815 1 : &ctx,
4816 1 : )
4817 UBC 0 : .await?;
4818 CBC 1 : writer.finish_write(Lsn(0x30));
4819 1 : drop(writer);
4820 1 :
4821 1 : tline.freeze_and_flush().await?;
4822 1 : tline
4823 1 : .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
4824 1 : .await?;
4825 :
4826 1 : let writer = tline.writer().await;
4827 1 : writer
4828 1 : .put(
4829 1 : *TEST_KEY,
4830 1 : Lsn(0x40),
4831 1 : &Value::Image(TEST_IMG("foo at 0x40")),
4832 1 : &ctx,
4833 1 : )
4834 UBC 0 : .await?;
4835 CBC 1 : writer.finish_write(Lsn(0x40));
4836 1 : drop(writer);
4837 1 :
4838 1 : tline.freeze_and_flush().await?;
4839 1 : tline
4840 1 : .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
4841 1 : .await?;
4842 :
4843 1 : assert_eq!(
4844 1 : tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
4845 1 : TEST_IMG("foo at 0x10")
4846 : );
4847 1 : assert_eq!(
4848 1 : tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
4849 1 : TEST_IMG("foo at 0x10")
4850 : );
4851 1 : assert_eq!(
4852 1 : tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
4853 1 : TEST_IMG("foo at 0x20")
4854 : );
4855 1 : assert_eq!(
4856 1 : tline.get(*TEST_KEY, Lsn(0x30), &ctx).await?,
4857 1 : TEST_IMG("foo at 0x30")
4858 : );
4859 1 : assert_eq!(
4860 1 : tline.get(*TEST_KEY, Lsn(0x40), &ctx).await?,
4861 1 : TEST_IMG("foo at 0x40")
4862 : );
4863 :
4864 1 : Ok(())
4865 : }
4866 :
4867 : //
4868 : // Insert 1000 key-value pairs with increasing keys, flush, compact, GC.
4869 : // Repeat 50 times.
4870 : //
4871 1 : #[tokio::test]
4872 1 : async fn test_bulk_insert() -> anyhow::Result<()> {
4873 1 : let harness = TenantHarness::create("test_bulk_insert")?;
4874 1 : let (tenant, ctx) = harness.load().await;
4875 1 : let tline = tenant
4876 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
4877 2 : .await?;
4878 :
4879 1 : let mut lsn = Lsn(0x10);
4880 1 :
4881 1 : let mut keyspace = KeySpaceAccum::new();
4882 1 :
4883 1 : let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
4884 1 : let mut blknum = 0;
4885 51 : for _ in 0..50 {
4886 500050 : for _ in 0..10000 {
4887 500000 : test_key.field6 = blknum;
4888 500000 : let writer = tline.writer().await;
4889 500000 : writer
4890 500000 : .put(
4891 500000 : test_key,
4892 500000 : lsn,
4893 500000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
4894 500000 : &ctx,
4895 500000 : )
4896 7802 : .await?;
4897 500000 : writer.finish_write(lsn);
4898 500000 : drop(writer);
4899 500000 :
4900 500000 : keyspace.add_key(test_key);
4901 500000 :
4902 500000 : lsn = Lsn(lsn.0 + 0x10);
4903 500000 : blknum += 1;
4904 : }
4905 :
4906 50 : let cutoff = tline.get_last_record_lsn();
4907 50 :
4908 50 : tline
4909 50 : .update_gc_info(
4910 50 : Vec::new(),
4911 50 : cutoff,
4912 50 : Duration::ZERO,
4913 50 : &CancellationToken::new(),
4914 50 : &ctx,
4915 50 : )
4916 UBC 0 : .await?;
4917 CBC 50 : tline.freeze_and_flush().await?;
4918 50 : tline
4919 50 : .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
4920 7930 : .await?;
4921 50 : tline.gc().await?;
4922 : }
4923 :
4924 1 : Ok(())
4925 : }
4926 :
4927 1 : #[tokio::test]
4928 1 : async fn test_random_updates() -> anyhow::Result<()> {
4929 1 : let harness = TenantHarness::create("test_random_updates")?;
4930 1 : let (tenant, ctx) = harness.load().await;
4931 1 : let tline = tenant
4932 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4933 1 : .await?;
4934 :
4935 : const NUM_KEYS: usize = 1000;
4936 :
4937 1 : let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
4938 1 :
4939 1 : let mut keyspace = KeySpaceAccum::new();
4940 1 :
4941 1 : // Track when each page was last modified. Used to assert that
4942 1 : // a read sees the latest page version.
4943 1 : let mut updated = [Lsn(0); NUM_KEYS];
4944 1 :
4945 1 : let mut lsn = Lsn(0x10);
4946 : #[allow(clippy::needless_range_loop)]
4947 1001 : for blknum in 0..NUM_KEYS {
4948 1000 : lsn = Lsn(lsn.0 + 0x10);
4949 1000 : test_key.field6 = blknum as u32;
4950 1000 : let writer = tline.writer().await;
4951 1000 : writer
4952 1000 : .put(
4953 1000 : test_key,
4954 1000 : lsn,
4955 1000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
4956 1000 : &ctx,
4957 1000 : )
4958 17 : .await?;
4959 1000 : writer.finish_write(lsn);
4960 1000 : updated[blknum] = lsn;
4961 1000 : drop(writer);
4962 1000 :
4963 1000 : keyspace.add_key(test_key);
4964 : }
4965 :
4966 51 : for _ in 0..50 {
4967 50050 : for _ in 0..NUM_KEYS {
4968 50000 : lsn = Lsn(lsn.0 + 0x10);
4969 50000 : let blknum = thread_rng().gen_range(0..NUM_KEYS);
4970 50000 : test_key.field6 = blknum as u32;
4971 50000 : let writer = tline.writer().await;
4972 50000 : writer
4973 50000 : .put(
4974 50000 : test_key,
4975 50000 : lsn,
4976 50000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
4977 50000 : &ctx,
4978 50000 : )
4979 702 : .await?;
4980 50000 : writer.finish_write(lsn);
4981 50000 : drop(writer);
4982 50000 : updated[blknum] = lsn;
4983 : }
4984 :
4985 : // Read all the blocks
4986 50000 : for (blknum, last_lsn) in updated.iter().enumerate() {
4987 50000 : test_key.field6 = blknum as u32;
4988 50000 : assert_eq!(
4989 50000 : tline.get(test_key, lsn, &ctx).await?,
4990 50000 : TEST_IMG(&format!("{} at {}", blknum, last_lsn))
4991 : );
4992 : }
4993 :
4994 : // Perform a cycle of flush, compact, and GC
4995 50 : let cutoff = tline.get_last_record_lsn();
4996 50 : tline
4997 50 : .update_gc_info(
4998 50 : Vec::new(),
4999 50 : cutoff,
5000 50 : Duration::ZERO,
5001 50 : &CancellationToken::new(),
5002 50 : &ctx,
5003 50 : )
5004 UBC 0 : .await?;
5005 CBC 54 : tline.freeze_and_flush().await?;
5006 50 : tline
5007 50 : .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
5008 861 : .await?;
5009 50 : tline.gc().await?;
5010 : }
5011 :
5012 1 : Ok(())
5013 : }
5014 :
5015 1 : #[tokio::test]
5016 1 : async fn test_traverse_branches() -> anyhow::Result<()> {
5017 1 : let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
5018 1 : .load()
5019 1 : .await;
5020 1 : let mut tline = tenant
5021 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
5022 1 : .await?;
5023 :
5024 : const NUM_KEYS: usize = 1000;
5025 :
5026 1 : let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
5027 1 :
5028 1 : let mut keyspace = KeySpaceAccum::new();
5029 1 :
5030 1 : // Track when each page was last modified. Used to assert that
5031 1 : // a read sees the latest page version.
5032 1 : let mut updated = [Lsn(0); NUM_KEYS];
5033 1 :
5034 1 : let mut lsn = Lsn(0x10);
5035 : #[allow(clippy::needless_range_loop)]
5036 1001 : for blknum in 0..NUM_KEYS {
5037 1000 : lsn = Lsn(lsn.0 + 0x10);
5038 1000 : test_key.field6 = blknum as u32;
5039 1000 : let writer = tline.writer().await;
5040 1000 : writer
5041 1000 : .put(
5042 1000 : test_key,
5043 1000 : lsn,
5044 1000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
5045 1000 : &ctx,
5046 1000 : )
5047 17 : .await?;
5048 1000 : writer.finish_write(lsn);
5049 1000 : updated[blknum] = lsn;
5050 1000 : drop(writer);
5051 1000 :
5052 1000 : keyspace.add_key(test_key);
5053 : }
5054 :
5055 51 : for _ in 0..50 {
5056 50 : let new_tline_id = TimelineId::generate();
5057 50 : tenant
5058 50 : .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
5059 UBC 0 : .await?;
5060 CBC 50 : tline = tenant
5061 50 : .get_timeline(new_tline_id, true)
5062 50 : .expect("Should have the branched timeline");
5063 :
5064 50050 : for _ in 0..NUM_KEYS {
5065 50000 : lsn = Lsn(lsn.0 + 0x10);
5066 50000 : let blknum = thread_rng().gen_range(0..NUM_KEYS);
5067 50000 : test_key.field6 = blknum as u32;
5068 50000 : let writer = tline.writer().await;
5069 50000 : writer
5070 50000 : .put(
5071 50000 : test_key,
5072 50000 : lsn,
5073 50000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
5074 50000 : &ctx,
5075 50000 : )
5076 799 : .await?;
5077 50000 : println!("updating {} at {}", blknum, lsn);
5078 50000 : writer.finish_write(lsn);
5079 50000 : drop(writer);
5080 50000 : updated[blknum] = lsn;
5081 : }
5082 :
5083 : // Read all the blocks
5084 50000 : for (blknum, last_lsn) in updated.iter().enumerate() {
5085 50000 : test_key.field6 = blknum as u32;
5086 50000 : assert_eq!(
5087 50000 : tline.get(test_key, lsn, &ctx).await?,
5088 50000 : TEST_IMG(&format!("{} at {}", blknum, last_lsn))
5089 : );
5090 : }
5091 :
5092 : // Perform a cycle of flush, compact, and GC
5093 50 : let cutoff = tline.get_last_record_lsn();
5094 50 : tline
5095 50 : .update_gc_info(
5096 50 : Vec::new(),
5097 50 : cutoff,
5098 50 : Duration::ZERO,
5099 50 : &CancellationToken::new(),
5100 50 : &ctx,
5101 50 : )
5102 UBC 0 : .await?;
5103 CBC 53 : tline.freeze_and_flush().await?;
5104 50 : tline
5105 50 : .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
5106 192 : .await?;
5107 50 : tline.gc().await?;
5108 : }
5109 :
5110 1 : Ok(())
5111 : }
5112 :
5113 1 : #[tokio::test]
5114 1 : async fn test_traverse_ancestors() -> anyhow::Result<()> {
5115 1 : let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
5116 1 : .load()
5117 1 : .await;
5118 1 : let mut tline = tenant
5119 1 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
5120 1 : .await?;
5121 :
5122 : const NUM_KEYS: usize = 100;
5123 : const NUM_TLINES: usize = 50;
5124 :
5125 1 : let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
5126 1 : // Track page mutation lsns across different timelines.
5127 1 : let mut updated = [[Lsn(0); NUM_KEYS]; NUM_TLINES];
5128 1 :
5129 1 : let mut lsn = Lsn(0x10);
5130 :
5131 : #[allow(clippy::needless_range_loop)]
5132 51 : for idx in 0..NUM_TLINES {
5133 50 : let new_tline_id = TimelineId::generate();
5134 50 : tenant
5135 50 : .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
5136 UBC 0 : .await?;
5137 CBC 50 : tline = tenant
5138 50 : .get_timeline(new_tline_id, true)
5139 50 : .expect("Should have the branched timeline");
5140 :
5141 5050 : for _ in 0..NUM_KEYS {
5142 5000 : lsn = Lsn(lsn.0 + 0x10);
5143 5000 : let blknum = thread_rng().gen_range(0..NUM_KEYS);
5144 5000 : test_key.field6 = blknum as u32;
5145 5000 : let writer = tline.writer().await;
5146 5000 : writer
5147 5000 : .put(
5148 5000 : test_key,
5149 5000 : lsn,
5150 5000 : &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
5151 5000 : &ctx,
5152 5000 : )
5153 78 : .await?;
5154 5000 : println!("updating [{}][{}] at {}", idx, blknum, lsn);
5155 5000 : writer.finish_write(lsn);
5156 5000 : drop(writer);
5157 5000 : updated[idx][blknum] = lsn;
5158 : }
5159 : }
5160 :
5161 : // Read pages from leaf timeline across all ancestors.
5162 50 : for (idx, lsns) in updated.iter().enumerate() {
5163 5000 : for (blknum, lsn) in lsns.iter().enumerate() {
5164 : // Skip empty mutations.
5165 5000 : if lsn.0 == 0 {
5166 1836 : continue;
5167 3164 : }
5168 3164 : println!("checking [{idx}][{blknum}] at {lsn}");
5169 3164 : test_key.field6 = blknum as u32;
5170 3164 : assert_eq!(
5171 3164 : tline.get(test_key, *lsn, &ctx).await?,
5172 3164 : TEST_IMG(&format!("{idx} {blknum} at {lsn}"))
5173 : );
5174 : }
5175 : }
5176 1 : Ok(())
5177 : }
5178 :
5179 1 : #[tokio::test]
5180 1 : async fn test_write_at_initdb_lsn_takes_optimization_code_path() -> anyhow::Result<()> {
5181 1 : let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")?
5182 1 : .load()
5183 1 : .await;
5184 :
5185 1 : let initdb_lsn = Lsn(0x20);
5186 1 : let utline = tenant
5187 1 : .create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
5188 UBC 0 : .await?;
5189 CBC 1 : let tline = utline.raw_timeline().unwrap();
5190 1 :
5191 1 : // Spawn flush loop now so that we can set the `expect_initdb_optimization`
5192 1 : tline.maybe_spawn_flush_loop();
5193 1 :
5194 1 : // Make sure the timeline has the minimum set of required keys for operation.
5195 1 : // The only operation you can always do on an empty timeline is to `put` new data.
5196 1 : // Except if you `put` at `initdb_lsn`.
5197 1 : // In that case, there's an optimization to directly create image layers instead of delta layers.
5198 1 : // It uses `repartition()`, which assumes some keys to be present.
5199 1 : // Let's make sure the test timeline can handle that case.
5200 1 : {
5201 1 : let mut state = tline.flush_loop_state.lock().unwrap();
5202 1 : assert_eq!(
5203 1 : timeline::FlushLoopState::Running {
5204 1 : expect_initdb_optimization: false,
5205 1 : initdb_optimization_count: 0,
5206 1 : },
5207 1 : *state
5208 1 : );
5209 1 : *state = timeline::FlushLoopState::Running {
5210 1 : expect_initdb_optimization: true,
5211 1 : initdb_optimization_count: 0,
5212 1 : };
5213 1 : }
5214 1 :
5215 1 : // Make writes at the initdb_lsn. When we flush it below, it should be handled by the optimization.
5216 1 : // As explained above, the optimization requires some keys to be present.
5217 1 : // As per `create_empty_timeline` documentation, use init_empty to set them.
5218 1 : // This is what `create_test_timeline` does, by the way.
5219 1 : let mut modification = tline.begin_modification(initdb_lsn);
5220 1 : modification
5221 1 : .init_empty_test_timeline()
5222 1 : .context("init_empty_test_timeline")?;
5223 1 : modification
5224 1 : .commit(&ctx)
5225 UBC 0 : .await
5226 CBC 1 : .context("commit init_empty_test_timeline modification")?;
5227 :
5228 : // Do the flush. The flush code will check the expectations that we set above.
5229 1 : tline.freeze_and_flush().await?;
5230 :
5231 : // assert freeze_and_flush exercised the initdb optimization
5232 : {
5233 1 : let state = tline.flush_loop_state.lock().unwrap();
5234 : let timeline::FlushLoopState::Running {
5235 1 : expect_initdb_optimization,
5236 1 : initdb_optimization_count,
5237 1 : } = *state
5238 : else {
5239 UBC 0 : panic!("unexpected state: {:?}", *state);
5240 : };
5241 CBC 1 : assert!(expect_initdb_optimization);
5242 1 : assert!(initdb_optimization_count > 0);
5243 : }
5244 1 : Ok(())
5245 : }
5246 :
5247 1 : #[tokio::test]
5248 1 : async fn test_uninit_mark_crash() -> anyhow::Result<()> {
5249 1 : let name = "test_uninit_mark_crash";
5250 1 : let harness = TenantHarness::create(name)?;
5251 : {
5252 1 : let (tenant, ctx) = harness.load().await;
5253 1 : let tline = tenant
5254 1 : .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
5255 UBC 0 : .await?;
5256 : // Keeps uninit mark in place
5257 CBC 1 : let raw_tline = tline.raw_timeline().unwrap();
5258 1 : raw_tline
5259 1 : .shutdown()
5260 1 : .instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_shard_id))
5261 UBC 0 : .await;
5262 CBC 1 : std::mem::forget(tline);
5263 : }
5264 :
5265 1 : let (tenant, _) = harness.load().await;
5266 1 : match tenant.get_timeline(TIMELINE_ID, false) {
5267 UBC 0 : Ok(_) => panic!("timeline should've been removed during load"),
5268 CBC 1 : Err(e) => {
5269 1 : assert_eq!(
5270 1 : e,
5271 1 : GetTimelineError::NotFound {
5272 1 : tenant_id: tenant.tenant_shard_id.tenant_id,
5273 1 : timeline_id: TIMELINE_ID,
5274 1 : }
5275 1 : )
5276 : }
5277 : }
5278 :
5279 1 : assert!(!harness
5280 1 : .conf
5281 1 : .timeline_path(&tenant.tenant_shard_id, &TIMELINE_ID)
5282 1 : .exists());
5283 :
5284 1 : assert!(!harness
5285 1 : .conf
5286 1 : .timeline_uninit_mark_file_path(tenant.tenant_shard_id, TIMELINE_ID)
5287 1 : .exists());
5288 :
5289 1 : Ok(())
5290 : }
5291 : }
|