Line data Source code
1 : //!
2 : //! Timeline repository implementation that keeps old data in files on disk, and
3 : //! the recent changes in memory. See tenant/*_layer.rs files.
4 : //! The functions here are responsible for locating the correct layer for the
5 : //! get/put call, walking back the timeline branching history as needed.
6 : //!
7 : //! The files are stored in the .neon/tenants/<tenant_id>/timelines/<timeline_id>
8 : //! directory. See docs/pageserver-storage.md for how the files are managed.
9 : //! In addition to the layer files, there is a metadata file in the same
10 : //! directory that contains information about the timeline, in particular its
11 : //! parent timeline, and the last LSN that has been written to disk.
12 : //!
13 :
14 : use anyhow::{bail, Context};
15 : use camino::Utf8Path;
16 : use camino::Utf8PathBuf;
17 : use enumset::EnumSet;
18 : use futures::stream::FuturesUnordered;
19 : use futures::FutureExt;
20 : use futures::StreamExt;
21 : use pageserver_api::models;
22 : use pageserver_api::models::TimelineState;
23 : use pageserver_api::models::WalRedoManagerStatus;
24 : use pageserver_api::shard::ShardIdentity;
25 : use pageserver_api::shard::TenantShardId;
26 : use remote_storage::DownloadError;
27 : use remote_storage::GenericRemoteStorage;
28 : use std::fmt;
29 : use storage_broker::BrokerClientChannel;
30 : use tokio::io::BufReader;
31 : use tokio::runtime::Handle;
32 : use tokio::sync::watch;
33 : use tokio::task::JoinSet;
34 : use tokio_util::sync::CancellationToken;
35 : use tracing::*;
36 : use utils::backoff;
37 : use utils::completion;
38 : use utils::crashsafe::path_with_suffix_extension;
39 : use utils::failpoint_support;
40 : use utils::fs_ext;
41 : use utils::sync::gate::Gate;
42 : use utils::sync::gate::GateGuard;
43 : use utils::timeout::timeout_cancellable;
44 : use utils::timeout::TimeoutCancellableError;
45 :
46 : use self::config::AttachedLocationConfig;
47 : use self::config::AttachmentMode;
48 : use self::config::LocationConf;
49 : use self::config::TenantConf;
50 : use self::delete::DeleteTenantFlow;
51 : use self::metadata::LoadMetadataError;
52 : use self::metadata::TimelineMetadata;
53 : use self::mgr::GetActiveTenantError;
54 : use self::mgr::GetTenantError;
55 : use self::mgr::TenantsMap;
56 : use self::remote_timeline_client::RemoteTimelineClient;
57 : use self::timeline::uninit::TimelineExclusionError;
58 : use self::timeline::uninit::TimelineUninitMark;
59 : use self::timeline::uninit::UninitializedTimeline;
60 : use self::timeline::EvictionTaskTenantState;
61 : use self::timeline::TimelineResources;
62 : use self::timeline::WaitLsnError;
63 : use crate::config::PageServerConf;
64 : use crate::context::{DownloadBehavior, RequestContext};
65 : use crate::deletion_queue::DeletionQueueClient;
66 : use crate::deletion_queue::DeletionQueueError;
67 : use crate::import_datadir;
68 : use crate::is_uninit_mark;
69 : use crate::metrics::TENANT;
70 : use crate::metrics::{
71 : remove_tenant_metrics, BROKEN_TENANTS_SET, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC,
72 : };
73 : use crate::repository::GcResult;
74 : use crate::task_mgr;
75 : use crate::task_mgr::TaskKind;
76 : use crate::tenant::config::LocationMode;
77 : use crate::tenant::config::TenantConfOpt;
78 : use crate::tenant::metadata::load_metadata;
79 : pub use crate::tenant::remote_timeline_client::index::IndexPart;
80 : use crate::tenant::remote_timeline_client::remote_initdb_archive_path;
81 : use crate::tenant::remote_timeline_client::MaybeDeletedIndexPart;
82 : use crate::tenant::remote_timeline_client::INITDB_PATH;
83 : use crate::tenant::storage_layer::DeltaLayer;
84 : use crate::tenant::storage_layer::ImageLayer;
85 : use crate::InitializationOrder;
86 : use std::cmp::min;
87 : use std::collections::hash_map::Entry;
88 : use std::collections::BTreeSet;
89 : use std::collections::HashMap;
90 : use std::collections::HashSet;
91 : use std::fmt::Debug;
92 : use std::fmt::Display;
93 : use std::fs;
94 : use std::fs::File;
95 : use std::io;
96 : use std::ops::Bound::Included;
97 : use std::sync::atomic::AtomicU64;
98 : use std::sync::atomic::Ordering;
99 : use std::sync::Arc;
100 : use std::sync::{Mutex, RwLock};
101 : use std::time::{Duration, Instant};
102 :
103 : use crate::span;
104 : use crate::tenant::timeline::delete::DeleteTimelineFlow;
105 : use crate::tenant::timeline::uninit::cleanup_timeline_directory;
106 : use crate::virtual_file::VirtualFile;
107 : use crate::walredo::PostgresRedoManager;
108 : use crate::TEMP_FILE_SUFFIX;
109 : use once_cell::sync::Lazy;
110 : pub use pageserver_api::models::TenantState;
111 : use tokio::sync::Semaphore;
112 :
113 372 : static INIT_DB_SEMAPHORE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(8));
114 : use toml_edit;
115 : use utils::{
116 : crashsafe,
117 : generation::Generation,
118 : id::TimelineId,
119 : lsn::{Lsn, RecordLsn},
120 : };
121 :
122 : /// Declare a failpoint that can use the `pause` failpoint action.
123 : /// We don't want to block the executor thread, hence, spawn_blocking + await.
124 : macro_rules! pausable_failpoint {
125 : ($name:literal) => {
126 : if cfg!(feature = "testing") {
127 : tokio::task::spawn_blocking({
128 : let current = tracing::Span::current();
129 : move || {
130 : let _entered = current.entered();
131 : tracing::info!("at failpoint {}", $name);
132 : fail::fail_point!($name);
133 : }
134 : })
135 : .await
136 : .expect("spawn_blocking");
137 : }
138 : };
139 : ($name:literal, $cond:expr) => {
140 : if cfg!(feature = "testing") {
141 : if $cond {
142 : pausable_failpoint!($name)
143 : }
144 : }
145 : };
146 : }
147 :
148 : pub mod blob_io;
149 : pub mod block_io;
150 :
151 : pub mod disk_btree;
152 : pub(crate) mod ephemeral_file;
153 : pub mod layer_map;
154 :
155 : pub mod metadata;
156 : mod par_fsync;
157 : pub mod remote_timeline_client;
158 : pub mod storage_layer;
159 :
160 : pub mod config;
161 : pub mod delete;
162 : pub mod mgr;
163 : pub mod secondary;
164 : pub mod tasks;
165 : pub mod upload_queue;
166 :
167 : pub(crate) mod timeline;
168 :
169 : pub mod size;
170 :
171 : pub(crate) use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
172 : pub(crate) use timeline::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
173 :
174 : // re-export for use in remote_timeline_client.rs
175 : pub use crate::tenant::metadata::save_metadata;
176 :
177 : // re-export for use in walreceiver
178 : pub use crate::tenant::timeline::WalReceiverInfo;
179 :
180 : /// The "tenants" part of `tenants/<tenant>/timelines...`
181 : pub const TENANTS_SEGMENT_NAME: &str = "tenants";
182 :
183 : /// Parts of the `.neon/tenants/<tenant_id>/timelines/<timeline_id>` directory prefix.
184 : pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
185 :
186 : pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
187 :
188 : /// References to shared objects that are passed into each tenant, such
189 : /// as the shared remote storage client and process initialization state.
190 855 : #[derive(Clone)]
191 : pub struct TenantSharedResources {
192 : pub broker_client: storage_broker::BrokerClientChannel,
193 : pub remote_storage: Option<GenericRemoteStorage>,
194 : pub deletion_queue_client: DeletionQueueClient,
195 : }
196 :
197 : /// A [`Tenant`] is really an _attached_ tenant. The configuration
198 : /// for an attached tenant is a subset of the [`LocationConf`], represented
199 : /// in this struct.
200 : pub(super) struct AttachedTenantConf {
201 : tenant_conf: TenantConfOpt,
202 : location: AttachedLocationConfig,
203 : }
204 :
205 : impl AttachedTenantConf {
206 970 : fn try_from(location_conf: LocationConf) -> anyhow::Result<Self> {
207 970 : match &location_conf.mode {
208 970 : LocationMode::Attached(attach_conf) => Ok(Self {
209 970 : tenant_conf: location_conf.tenant_conf,
210 970 : location: *attach_conf,
211 970 : }),
212 : LocationMode::Secondary(_) => {
213 0 : anyhow::bail!("Attempted to construct AttachedTenantConf from a LocationConf in secondary mode")
214 : }
215 : }
216 970 : }
217 : }
218 : struct TimelinePreload {
219 : timeline_id: TimelineId,
220 : client: RemoteTimelineClient,
221 : index_part: Result<MaybeDeletedIndexPart, DownloadError>,
222 : }
223 :
224 : pub(crate) struct TenantPreload {
225 : deleting: bool,
226 : timelines: HashMap<TimelineId, TimelinePreload>,
227 : }
228 :
229 : /// When we spawn a tenant, there is a special mode for tenant creation that
230 : /// avoids trying to read anything from remote storage.
231 : pub(crate) enum SpawnMode {
232 : Normal,
233 : Create,
234 : }
235 :
236 : ///
237 : /// Tenant consists of multiple timelines. Keep them in a hash table.
238 : ///
239 : pub struct Tenant {
240 : // Global pageserver config parameters
241 : pub conf: &'static PageServerConf,
242 :
243 : /// The value creation timestamp, used to measure activation delay, see:
244 : /// <https://github.com/neondatabase/neon/issues/4025>
245 : constructed_at: Instant,
246 :
247 : state: watch::Sender<TenantState>,
248 :
249 : // Overridden tenant-specific config parameters.
250 : // We keep TenantConfOpt sturct here to preserve the information
251 : // about parameters that are not set.
252 : // This is necessary to allow global config updates.
253 : tenant_conf: Arc<RwLock<AttachedTenantConf>>,
254 :
255 : tenant_shard_id: TenantShardId,
256 :
257 : // The detailed sharding information, beyond the number/count in tenant_shard_id
258 : shard_identity: ShardIdentity,
259 :
260 : /// The remote storage generation, used to protect S3 objects from split-brain.
261 : /// Does not change over the lifetime of the [`Tenant`] object.
262 : ///
263 : /// This duplicates the generation stored in LocationConf, but that structure is mutable:
264 : /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
265 : generation: Generation,
266 :
267 : timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
268 :
269 : /// During timeline creation, we first insert the TimelineId to the
270 : /// creating map, then `timelines`, then remove it from the creating map.
271 : /// **Lock order**: if acquring both, acquire`timelines` before `timelines_creating`
272 : timelines_creating: std::sync::Mutex<HashSet<TimelineId>>,
273 :
274 : // This mutex prevents creation of new timelines during GC.
275 : // Adding yet another mutex (in addition to `timelines`) is needed because holding
276 : // `timelines` mutex during all GC iteration
277 : // may block for a long time `get_timeline`, `get_timelines_state`,... and other operations
278 : // with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
279 : // timeout...
280 : gc_cs: tokio::sync::Mutex<()>,
281 : walredo_mgr: Option<Arc<WalRedoManager>>,
282 :
283 : // provides access to timeline data sitting in the remote storage
284 : pub(crate) remote_storage: Option<GenericRemoteStorage>,
285 :
286 : // Access to global deletion queue for when this tenant wants to schedule a deletion
287 : deletion_queue_client: DeletionQueueClient,
288 :
289 : /// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
290 : cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
291 : cached_synthetic_tenant_size: Arc<AtomicU64>,
292 :
293 : eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
294 :
295 : /// If the tenant is in Activating state, notify this to encourage it
296 : /// to proceed to Active as soon as possible, rather than waiting for lazy
297 : /// background warmup.
298 : pub(crate) activate_now_sem: tokio::sync::Semaphore,
299 :
300 : pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
301 :
302 : // Cancellation token fires when we have entered shutdown(). This is a parent of
303 : // Timelines' cancellation token.
304 : pub(crate) cancel: CancellationToken,
305 :
306 : // Users of the Tenant such as the page service must take this Gate to avoid
307 : // trying to use a Tenant which is shutting down.
308 : pub(crate) gate: Gate,
309 : }
310 :
311 : impl std::fmt::Debug for Tenant {
312 2 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313 2 : write!(f, "{} ({})", self.tenant_shard_id, self.current_state())
314 2 : }
315 : }
316 :
317 : pub(crate) enum WalRedoManager {
318 : Prod(PostgresRedoManager),
319 : #[cfg(test)]
320 : Test(harness::TestRedoManager),
321 : }
322 :
323 : impl From<PostgresRedoManager> for WalRedoManager {
324 858 : fn from(mgr: PostgresRedoManager) -> Self {
325 858 : Self::Prod(mgr)
326 858 : }
327 : }
328 :
329 : #[cfg(test)]
330 : impl From<harness::TestRedoManager> for WalRedoManager {
331 84 : fn from(mgr: harness::TestRedoManager) -> Self {
332 84 : Self::Test(mgr)
333 84 : }
334 : }
335 :
336 : impl WalRedoManager {
337 0 : pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
338 0 : match self {
339 775 : Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
340 775 : #[cfg(test)]
341 775 : Self::Test(_) => {
342 0 : // Not applicable to test redo manager
343 0 : }
344 775 : }
345 775 : }
346 :
347 : /// # Cancel-Safety
348 : ///
349 : /// This method is cancellation-safe.
350 2860917 : pub async fn request_redo(
351 2860917 : &self,
352 2860917 : key: crate::repository::Key,
353 2860917 : lsn: Lsn,
354 2860917 : base_img: Option<(Lsn, bytes::Bytes)>,
355 2860917 : records: Vec<(Lsn, crate::walrecord::NeonWalRecord)>,
356 2860917 : pg_version: u32,
357 2860917 : ) -> anyhow::Result<bytes::Bytes> {
358 0 : match self {
359 2860917 : Self::Prod(mgr) => {
360 0 : mgr.request_redo(key, lsn, base_img, records, pg_version)
361 0 : .await
362 : }
363 : #[cfg(test)]
364 0 : Self::Test(mgr) => {
365 0 : mgr.request_redo(key, lsn, base_img, records, pg_version)
366 0 : .await
367 : }
368 : }
369 2860917 : }
370 :
371 0 : pub(crate) fn status(&self) -> Option<WalRedoManagerStatus> {
372 0 : match self {
373 486 : WalRedoManager::Prod(m) => m.status(),
374 486 : #[cfg(test)]
375 486 : WalRedoManager::Test(_) => None,
376 486 : }
377 486 : }
378 : }
379 :
380 106 : #[derive(Debug, thiserror::Error, PartialEq, Eq)]
381 : pub enum GetTimelineError {
382 : #[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")]
383 : NotActive {
384 : tenant_id: TenantShardId,
385 : timeline_id: TimelineId,
386 : state: TimelineState,
387 : },
388 : #[error("Timeline {tenant_id}/{timeline_id} was not found")]
389 : NotFound {
390 : tenant_id: TenantShardId,
391 : timeline_id: TimelineId,
392 : },
393 : }
394 :
395 0 : #[derive(Debug, thiserror::Error)]
396 : pub enum LoadLocalTimelineError {
397 : #[error("FailedToLoad")]
398 : Load(#[source] anyhow::Error),
399 : #[error("FailedToResumeDeletion")]
400 : ResumeDeletion(#[source] anyhow::Error),
401 : }
402 :
403 139 : #[derive(thiserror::Error)]
404 : pub enum DeleteTimelineError {
405 : #[error("NotFound")]
406 : NotFound,
407 :
408 : #[error("HasChildren")]
409 : HasChildren(Vec<TimelineId>),
410 :
411 : #[error("Timeline deletion is already in progress")]
412 : AlreadyInProgress(Arc<tokio::sync::Mutex<DeleteTimelineFlow>>),
413 :
414 : #[error(transparent)]
415 : Other(#[from] anyhow::Error),
416 : }
417 :
418 : impl Debug for DeleteTimelineError {
419 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
420 0 : match self {
421 0 : Self::NotFound => write!(f, "NotFound"),
422 0 : Self::HasChildren(c) => f.debug_tuple("HasChildren").field(c).finish(),
423 0 : Self::AlreadyInProgress(_) => f.debug_tuple("AlreadyInProgress").finish(),
424 0 : Self::Other(e) => f.debug_tuple("Other").field(e).finish(),
425 : }
426 0 : }
427 : }
428 :
429 : pub enum SetStoppingError {
430 : AlreadyStopping(completion::Barrier),
431 : Broken,
432 : }
433 :
434 : impl Debug for SetStoppingError {
435 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
436 0 : match self {
437 0 : Self::AlreadyStopping(_) => f.debug_tuple("AlreadyStopping").finish(),
438 0 : Self::Broken => write!(f, "Broken"),
439 : }
440 0 : }
441 : }
442 :
443 6 : #[derive(thiserror::Error, Debug)]
444 : pub enum CreateTimelineError {
445 : #[error("creation of timeline with the given ID is in progress")]
446 : AlreadyCreating,
447 : #[error("timeline already exists with different parameters")]
448 : Conflict,
449 : #[error(transparent)]
450 : AncestorLsn(anyhow::Error),
451 : #[error("ancestor timeline is not active")]
452 : AncestorNotActive,
453 : #[error("tenant shutting down")]
454 : ShuttingDown,
455 : #[error(transparent)]
456 : Other(#[from] anyhow::Error),
457 : }
458 :
459 0 : #[derive(thiserror::Error, Debug)]
460 : enum InitdbError {
461 : Other(anyhow::Error),
462 : Cancelled,
463 : Spawn(std::io::Result<()>),
464 : Failed(std::process::ExitStatus, Vec<u8>),
465 : }
466 :
467 : impl fmt::Display for InitdbError {
468 0 : fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
469 0 : match self {
470 0 : InitdbError::Cancelled => write!(f, "Operation was cancelled"),
471 0 : InitdbError::Spawn(e) => write!(f, "Spawn error: {:?}", e),
472 0 : InitdbError::Failed(status, stderr) => write!(
473 0 : f,
474 0 : "Command failed with status {:?}: {}",
475 0 : status,
476 0 : String::from_utf8_lossy(stderr)
477 0 : ),
478 0 : InitdbError::Other(e) => write!(f, "Error: {:?}", e),
479 : }
480 0 : }
481 : }
482 :
483 : impl From<std::io::Error> for InitdbError {
484 0 : fn from(error: std::io::Error) -> Self {
485 0 : InitdbError::Spawn(Err(error))
486 0 : }
487 : }
488 :
489 : struct TenantDirectoryScan {
490 : sorted_timelines_to_load: Vec<(TimelineId, TimelineMetadata)>,
491 : timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)>,
492 : }
493 :
494 : enum CreateTimelineCause {
495 : Load,
496 : Delete,
497 : }
498 :
499 : impl Tenant {
500 : /// Yet another helper for timeline initialization.
501 : ///
502 : /// - Initializes the Timeline struct and inserts it into the tenant's hash map
503 : /// - Scans the local timeline directory for layer files and builds the layer map
504 : /// - Downloads remote index file and adds remote files to the layer map
505 : /// - Schedules remote upload tasks for any files that are present locally but missing from remote storage.
506 : ///
507 : /// If the operation fails, the timeline is left in the tenant's hash map in Broken state. On success,
508 : /// it is marked as Active.
509 : #[allow(clippy::too_many_arguments)]
510 412 : async fn timeline_init_and_sync(
511 412 : &self,
512 412 : timeline_id: TimelineId,
513 412 : resources: TimelineResources,
514 412 : index_part: Option<IndexPart>,
515 412 : metadata: TimelineMetadata,
516 412 : ancestor: Option<Arc<Timeline>>,
517 412 : _ctx: &RequestContext,
518 412 : ) -> anyhow::Result<()> {
519 412 : let tenant_id = self.tenant_shard_id;
520 :
521 412 : let timeline = self.create_timeline_struct(
522 412 : timeline_id,
523 412 : &metadata,
524 412 : ancestor.clone(),
525 412 : resources,
526 412 : CreateTimelineCause::Load,
527 412 : )?;
528 412 : let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
529 412 : anyhow::ensure!(
530 412 : disk_consistent_lsn.is_valid(),
531 0 : "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
532 : );
533 412 : assert_eq!(
534 412 : disk_consistent_lsn,
535 412 : metadata.disk_consistent_lsn(),
536 0 : "these are used interchangeably"
537 : );
538 :
539 412 : if let Some(index_part) = index_part.as_ref() {
540 412 : timeline
541 412 : .remote_client
542 412 : .as_ref()
543 412 : .unwrap()
544 412 : .init_upload_queue(index_part)?;
545 0 : } else if self.remote_storage.is_some() {
546 : // No data on the remote storage, but we have local metadata file. We can end up
547 : // here with timeline_create being interrupted before finishing index part upload.
548 : // By doing what we do here, the index part upload is retried.
549 : // If control plane retries timeline creation in the meantime, the mgmt API handler
550 : // for timeline creation will coalesce on the upload we queue here.
551 0 : let rtc = timeline.remote_client.as_ref().unwrap();
552 0 : rtc.init_upload_queue_for_empty_remote(&metadata)?;
553 0 : rtc.schedule_index_upload_for_metadata_update(&metadata)?;
554 0 : }
555 :
556 412 : timeline
557 412 : .load_layer_map(disk_consistent_lsn, index_part)
558 408 : .await
559 412 : .with_context(|| {
560 0 : format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}")
561 412 : })?;
562 :
563 : {
564 : // avoiding holding it across awaits
565 412 : let mut timelines_accessor = self.timelines.lock().unwrap();
566 412 : match timelines_accessor.entry(timeline_id) {
567 : Entry::Occupied(_) => {
568 : // The uninit mark file acts as a lock that prevents another task from
569 : // initializing the timeline at the same time.
570 0 : unreachable!(
571 0 : "Timeline {tenant_id}/{timeline_id} already exists in the tenant map"
572 0 : );
573 : }
574 412 : Entry::Vacant(v) => {
575 412 : v.insert(Arc::clone(&timeline));
576 412 : timeline.maybe_spawn_flush_loop();
577 412 : }
578 412 : }
579 412 : };
580 412 :
581 412 : // Sanity check: a timeline should have some content.
582 412 : anyhow::ensure!(
583 412 : ancestor.is_some()
584 359 : || timeline
585 359 : .layers
586 359 : .read()
587 0 : .await
588 359 : .layer_map()
589 359 : .iter_historic_layers()
590 359 : .next()
591 359 : .is_some(),
592 0 : "Timeline has no ancestor and no layer files"
593 : );
594 :
595 412 : Ok(())
596 412 : }
597 :
598 : /// Attach a tenant that's available in cloud storage.
599 : ///
600 : /// This returns quickly, after just creating the in-memory object
601 : /// Tenant struct and launching a background task to download
602 : /// the remote index files. On return, the tenant is most likely still in
603 : /// Attaching state, and it will become Active once the background task
604 : /// finishes. You can use wait_until_active() to wait for the task to
605 : /// complete.
606 : ///
607 : #[allow(clippy::too_many_arguments)]
608 858 : pub(crate) fn spawn(
609 858 : conf: &'static PageServerConf,
610 858 : tenant_shard_id: TenantShardId,
611 858 : resources: TenantSharedResources,
612 858 : attached_conf: AttachedTenantConf,
613 858 : shard_identity: ShardIdentity,
614 858 : init_order: Option<InitializationOrder>,
615 858 : tenants: &'static std::sync::RwLock<TenantsMap>,
616 858 : mode: SpawnMode,
617 858 : ctx: &RequestContext,
618 858 : ) -> anyhow::Result<Arc<Tenant>> {
619 858 : let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
620 858 : conf,
621 858 : tenant_shard_id,
622 858 : )));
623 858 :
624 858 : let TenantSharedResources {
625 858 : broker_client,
626 858 : remote_storage,
627 858 : deletion_queue_client,
628 858 : } = resources;
629 858 :
630 858 : let attach_mode = attached_conf.location.attach_mode;
631 858 : let generation = attached_conf.location.generation;
632 858 :
633 858 : let tenant = Arc::new(Tenant::new(
634 858 : TenantState::Attaching,
635 858 : conf,
636 858 : attached_conf,
637 858 : shard_identity,
638 858 : Some(wal_redo_manager),
639 858 : tenant_shard_id,
640 858 : remote_storage.clone(),
641 858 : deletion_queue_client,
642 858 : ));
643 :
644 : // The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if
645 : // we shut down while attaching.
646 858 : let Ok(attach_gate_guard) = tenant.gate.enter() else {
647 : // We just created the Tenant: nothing else can have shut it down yet
648 0 : unreachable!();
649 : };
650 :
651 : // Do all the hard work in the background
652 858 : let tenant_clone = Arc::clone(&tenant);
653 858 : let ctx = ctx.detached_child(TaskKind::Attach, DownloadBehavior::Warn);
654 858 : task_mgr::spawn(
655 858 : &tokio::runtime::Handle::current(),
656 858 : TaskKind::Attach,
657 858 : Some(tenant_shard_id),
658 858 : None,
659 858 : "attach tenant",
660 : false,
661 858 : async move {
662 858 :
663 858 : info!(
664 858 : ?attach_mode,
665 858 : "Attaching tenant"
666 858 : );
667 :
668 858 : let _gate_guard = attach_gate_guard;
669 858 :
670 858 : // Is this tenant being spawned as part of process startup?
671 858 : let starting_up = init_order.is_some();
672 850 : scopeguard::defer! {
673 850 : if starting_up {
674 199 : TENANT.startup_complete.inc();
675 651 : }
676 : }
677 :
678 : // Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
679 858 : let make_broken =
680 7 : |t: &Tenant, err: anyhow::Error| {
681 7 : error!("attach failed, setting tenant state to Broken: {err:?}");
682 7 : t.state.send_modify(|state| {
683 : // The Stopping case is for when we have passed control on to DeleteTenantFlow:
684 : // if it errors, we will call make_broken when tenant is already in Stopping.
685 7 : assert!(
686 7 : matches!(*state, TenantState::Attaching | TenantState::Stopping { .. }),
687 0 : "the attach task owns the tenant state until activation is complete"
688 : );
689 :
690 7 : *state = TenantState::broken_from_reason(err.to_string());
691 7 : });
692 7 : };
693 :
694 858 : let mut init_order = init_order;
695 858 : // take the completion because initial tenant loading will complete when all of
696 858 : // these tasks complete.
697 858 : let _completion = init_order
698 858 : .as_mut()
699 858 : .and_then(|x| x.initial_tenant_load.take());
700 858 : let remote_load_completion = init_order
701 858 : .as_mut()
702 858 : .and_then(|x| x.initial_tenant_load_remote.take());
703 :
704 : enum AttachType<'a> {
705 : // During pageserver startup, we are attaching this tenant lazily in the background
706 : Warmup(tokio::sync::SemaphorePermit<'a>),
707 : // During pageserver startup, we are attaching this tenant as soon as we can,
708 : // because a client tried to access it.
709 : OnDemand,
710 : // During normal operations after startup, we are attaching a tenant.
711 : Normal,
712 : }
713 :
714 : // Before doing any I/O, wait for either or:
715 : // - A client to attempt to access to this tenant (on-demand loading)
716 : // - A permit to become available in the warmup semaphore (background warmup)
717 : //
718 : // Some-ness of init_order is how we know if we're attaching during startup or later
719 : // in process lifetime.
720 858 : let attach_type = if init_order.is_some() {
721 207 : tokio::select!(
722 : _ = tenant_clone.activate_now_sem.acquire() => {
723 3 : tracing::info!("Activating tenant (on-demand)");
724 : AttachType::OnDemand
725 : },
726 203 : permit_result = conf.concurrent_tenant_warmup.inner().acquire() => {
727 : match permit_result {
728 : Ok(p) => {
729 203 : tracing::info!("Activating tenant (warmup)");
730 : AttachType::Warmup(p)
731 : }
732 : Err(_) => {
733 : // This is unexpected: the warmup semaphore should stay alive
734 : // for the lifetime of init_order. Log a warning and proceed.
735 0 : tracing::warn!("warmup_limit semaphore unexpectedly closed");
736 : AttachType::Normal
737 : }
738 : }
739 :
740 : }
741 : _ = tenant_clone.cancel.cancelled() => {
742 : // This is safe, but should be pretty rare: it is interesting if a tenant
743 : // stayed in Activating for such a long time that shutdown found it in
744 : // that state.
745 1 : tracing::info!(state=%tenant_clone.current_state(), "Tenant shut down before activation");
746 : // Make the tenant broken so that set_stopping will not hang waiting for it to leave
747 : // the Attaching state. This is an over-reaction (nothing really broke, the tenant is
748 : // just shutting down), but ensures progress.
749 : make_broken(&tenant_clone, anyhow::anyhow!("Shut down while Attaching"));
750 : return Ok(());
751 : },
752 : )
753 : } else {
754 651 : AttachType::Normal
755 : };
756 :
757 857 : let preload_timer = TENANT.preload.start_timer();
758 857 : let preload = match mode {
759 : SpawnMode::Create => {
760 : // Don't count the skipped preload into the histogram of preload durations
761 15 : preload_timer.stop_and_discard();
762 15 : None
763 : },
764 : SpawnMode::Normal => {
765 842 : match &remote_storage {
766 842 : Some(remote_storage) => Some(
767 842 : match tenant_clone
768 842 : .preload(remote_storage, task_mgr::shutdown_token())
769 842 : .instrument(
770 842 : tracing::info_span!(parent: None, "attach_preload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()),
771 : )
772 2419 : .await {
773 836 : Ok(p) => {
774 836 : preload_timer.observe_duration();
775 836 : p
776 : }
777 : ,
778 6 : Err(e) => {
779 6 : make_broken(&tenant_clone, anyhow::anyhow!(e));
780 6 : return Ok(());
781 : }
782 : },
783 : ),
784 0 : None => None,
785 : }
786 : }
787 : };
788 :
789 : // Remote preload is complete.
790 851 : drop(remote_load_completion);
791 :
792 851 : let pending_deletion = {
793 851 : match DeleteTenantFlow::should_resume_deletion(
794 851 : conf,
795 851 : preload.as_ref().map(|p| p.deleting).unwrap_or(false),
796 851 : &tenant_clone,
797 851 : )
798 0 : .await
799 : {
800 851 : Ok(should_resume_deletion) => should_resume_deletion,
801 0 : Err(err) => {
802 0 : make_broken(&tenant_clone, anyhow::anyhow!(err));
803 0 : return Ok(());
804 : }
805 : }
806 : };
807 :
808 851 : info!("pending_deletion {}", pending_deletion.is_some());
809 :
810 851 : if let Some(deletion) = pending_deletion {
811 : // as we are no longer loading, signal completion by dropping
812 : // the completion while we resume deletion
813 21 : drop(_completion);
814 21 : let background_jobs_can_start =
815 21 : init_order.as_ref().map(|x| &x.background_jobs_can_start);
816 21 : if let Some(background) = background_jobs_can_start {
817 20 : info!("waiting for backgound jobs barrier");
818 20 : background.clone().wait().await;
819 20 : info!("ready for backgound jobs barrier");
820 1 : }
821 :
822 21 : match DeleteTenantFlow::resume_from_attach(
823 21 : deletion,
824 21 : &tenant_clone,
825 21 : preload,
826 21 : tenants,
827 21 : &ctx,
828 21 : )
829 789 : .await
830 : {
831 0 : Err(err) => {
832 0 : make_broken(&tenant_clone, anyhow::anyhow!(err));
833 0 : return Ok(());
834 : }
835 21 : Ok(()) => return Ok(()),
836 : }
837 830 : }
838 :
839 : // We will time the duration of the attach phase unless this is a creation (attach will do no work)
840 830 : let attach_timer = match mode {
841 15 : SpawnMode::Create => None,
842 815 : SpawnMode::Normal => {Some(TENANT.attach.start_timer())}
843 : };
844 830 : match tenant_clone.attach(preload, mode, &ctx).await {
845 : Ok(()) => {
846 830 : info!("attach finished, activating");
847 830 : if let Some(t)= attach_timer {t.observe_duration();}
848 830 : tenant_clone.activate(broker_client, None, &ctx);
849 : }
850 0 : Err(e) => {
851 0 : if let Some(t)= attach_timer {t.observe_duration();}
852 0 : make_broken(&tenant_clone, anyhow::anyhow!(e));
853 : }
854 : }
855 :
856 : // If we are doing an opportunistic warmup attachment at startup, initialize
857 : // logical size at the same time. This is better than starting a bunch of idle tenants
858 : // with cold caches and then coming back later to initialize their logical sizes.
859 : //
860 : // It also prevents the warmup proccess competing with the concurrency limit on
861 : // logical size calculations: if logical size calculation semaphore is saturated,
862 : // then warmup will wait for that before proceeding to the next tenant.
863 830 : if let AttachType::Warmup(_permit) = attach_type {
864 183 : let mut futs = FuturesUnordered::new();
865 183 : let timelines: Vec<_> = tenant_clone.timelines.lock().unwrap().values().cloned().collect();
866 417 : for t in timelines {
867 234 : futs.push(t.await_initial_logical_size())
868 : }
869 183 : tracing::info!("Waiting for initial logical sizes while warming up...");
870 409 : while futs.next().await.is_some() {
871 226 :
872 226 : }
873 175 : tracing::info!("Warm-up complete");
874 647 : }
875 :
876 822 : Ok(())
877 850 : }
878 858 : .instrument({
879 858 : let span = tracing::info_span!(parent: None, "attach", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), gen=?generation);
880 858 : span.follows_from(Span::current());
881 858 : span
882 858 : }),
883 858 : );
884 858 : Ok(tenant)
885 858 : }
886 :
887 846 : pub(crate) async fn preload(
888 846 : self: &Arc<Tenant>,
889 846 : remote_storage: &GenericRemoteStorage,
890 846 : cancel: CancellationToken,
891 846 : ) -> anyhow::Result<TenantPreload> {
892 : // Get list of remote timelines
893 : // download index files for every tenant timeline
894 846 : info!("listing remote timelines");
895 846 : let (remote_timeline_ids, other_keys) = remote_timeline_client::list_remote_timelines(
896 846 : remote_storage,
897 846 : self.tenant_shard_id,
898 846 : cancel.clone(),
899 846 : )
900 2031 : .await?;
901 :
902 840 : let deleting = other_keys.contains(TENANT_DELETED_MARKER_FILE_NAME);
903 840 : info!(
904 840 : "found {} timelines, deleting={}",
905 840 : remote_timeline_ids.len(),
906 840 : deleting
907 840 : );
908 :
909 855 : for k in other_keys {
910 15 : if k != TENANT_DELETED_MARKER_FILE_NAME {
911 0 : warn!("Unexpected non timeline key {k}");
912 15 : }
913 : }
914 :
915 : Ok(TenantPreload {
916 840 : deleting,
917 840 : timelines: self
918 840 : .load_timeline_metadata(remote_timeline_ids, remote_storage, cancel)
919 403 : .await?,
920 : })
921 846 : }
922 :
923 : ///
924 : /// Background task that downloads all data for a tenant and brings it to Active state.
925 : ///
926 : /// No background tasks are started as part of this routine.
927 : ///
928 855 : async fn attach(
929 855 : self: &Arc<Tenant>,
930 855 : preload: Option<TenantPreload>,
931 855 : mode: SpawnMode,
932 855 : ctx: &RequestContext,
933 855 : ) -> anyhow::Result<()> {
934 855 : span::debug_assert_current_span_has_tenant_id();
935 :
936 3 : failpoint_support::sleep_millis_async!("before-attaching-tenant");
937 :
938 855 : let preload = match (preload, mode) {
939 840 : (Some(p), _) => p,
940 15 : (None, SpawnMode::Create) => TenantPreload {
941 15 : deleting: false,
942 15 : timelines: HashMap::new(),
943 15 : },
944 : (None, SpawnMode::Normal) => {
945 : // Deprecated dev mode: load from local disk state instead of remote storage
946 : // https://github.com/neondatabase/neon/issues/5624
947 0 : return self.load_local(ctx).await;
948 : }
949 : };
950 :
951 855 : let mut timelines_to_resume_deletions = vec![];
952 855 :
953 855 : let mut remote_index_and_client = HashMap::new();
954 855 : let mut timeline_ancestors = HashMap::new();
955 855 : let mut existent_timelines = HashSet::new();
956 1282 : for (timeline_id, preload) in preload.timelines {
957 424 : let index_part = match preload.index_part {
958 424 : Ok(i) => {
959 0 : debug!("remote index part exists for timeline {timeline_id}");
960 : // We found index_part on the remote, this is the standard case.
961 424 : existent_timelines.insert(timeline_id);
962 424 : i
963 : }
964 : Err(DownloadError::NotFound) => {
965 : // There is no index_part on the remote. We only get here
966 : // if there is some prefix for the timeline in the remote storage.
967 : // This can e.g. be the initdb.tar.zst archive, maybe a
968 : // remnant from a prior incomplete creation or deletion attempt.
969 : // Delete the local directory as the deciding criterion for a
970 : // timeline's existence is presence of index_part.
971 3 : info!(%timeline_id, "index_part not found on remote");
972 3 : continue;
973 : }
974 0 : Err(e) => {
975 : // Some (possibly ephemeral) error happened during index_part download.
976 : // Pretend the timeline exists to not delete the timeline directory,
977 : // as it might be a temporary issue and we don't want to re-download
978 : // everything after it resolves.
979 0 : warn!(%timeline_id, "Failed to load index_part from remote storage, failed creation? ({e})");
980 :
981 0 : existent_timelines.insert(timeline_id);
982 0 : continue;
983 : }
984 : };
985 424 : match index_part {
986 412 : MaybeDeletedIndexPart::IndexPart(index_part) => {
987 412 : timeline_ancestors.insert(timeline_id, index_part.metadata.clone());
988 412 : remote_index_and_client.insert(timeline_id, (index_part, preload.client));
989 412 : }
990 12 : MaybeDeletedIndexPart::Deleted(index_part) => {
991 12 : info!(
992 12 : "timeline {} is deleted, picking to resume deletion",
993 12 : timeline_id
994 12 : );
995 12 : timelines_to_resume_deletions.push((timeline_id, index_part, preload.client));
996 : }
997 : }
998 : }
999 :
1000 : // For every timeline, download the metadata file, scan the local directory,
1001 : // and build a layer map that contains an entry for each remote and local
1002 : // layer file.
1003 855 : let sorted_timelines = tree_sort_timelines(timeline_ancestors, |m| m.ancestor_timeline())?;
1004 1267 : for (timeline_id, remote_metadata) in sorted_timelines {
1005 412 : let (index_part, remote_client) = remote_index_and_client
1006 412 : .remove(&timeline_id)
1007 412 : .expect("just put it in above");
1008 412 :
1009 412 : // TODO again handle early failure
1010 412 : self.load_remote_timeline(
1011 412 : timeline_id,
1012 412 : index_part,
1013 412 : remote_metadata,
1014 412 : TimelineResources {
1015 412 : remote_client: Some(remote_client),
1016 412 : deletion_queue_client: self.deletion_queue_client.clone(),
1017 412 : },
1018 412 : ctx,
1019 412 : )
1020 818 : .await
1021 412 : .with_context(|| {
1022 0 : format!(
1023 0 : "failed to load remote timeline {} for tenant {}",
1024 0 : timeline_id, self.tenant_shard_id
1025 0 : )
1026 412 : })?;
1027 : }
1028 :
1029 : // Walk through deleted timelines, resume deletion
1030 867 : for (timeline_id, index_part, remote_timeline_client) in timelines_to_resume_deletions {
1031 12 : remote_timeline_client
1032 12 : .init_upload_queue_stopped_to_continue_deletion(&index_part)
1033 12 : .context("init queue stopped")
1034 12 : .map_err(LoadLocalTimelineError::ResumeDeletion)?;
1035 :
1036 12 : DeleteTimelineFlow::resume_deletion(
1037 12 : Arc::clone(self),
1038 12 : timeline_id,
1039 12 : &index_part.metadata,
1040 12 : Some(remote_timeline_client),
1041 12 : self.deletion_queue_client.clone(),
1042 12 : )
1043 12 : .instrument(tracing::info_span!("timeline_delete", %timeline_id))
1044 0 : .await
1045 12 : .context("resume_deletion")
1046 12 : .map_err(LoadLocalTimelineError::ResumeDeletion)?;
1047 : }
1048 :
1049 : // The local filesystem contents are a cache of what's in the remote IndexPart;
1050 : // IndexPart is the source of truth.
1051 855 : self.clean_up_timelines(&existent_timelines)?;
1052 :
1053 855 : fail::fail_point!("attach-before-activate", |_| {
1054 0 : anyhow::bail!("attach-before-activate");
1055 855 : });
1056 3 : failpoint_support::sleep_millis_async!("attach-before-activate-sleep", &self.cancel);
1057 :
1058 855 : info!("Done");
1059 :
1060 855 : Ok(())
1061 855 : }
1062 :
1063 : /// Check for any local timeline directories that are temporary, or do not correspond to a
1064 : /// timeline that still exists: this can happen if we crashed during a deletion/creation, or
1065 : /// if a timeline was deleted while the tenant was attached to a different pageserver.
1066 855 : fn clean_up_timelines(&self, existent_timelines: &HashSet<TimelineId>) -> anyhow::Result<()> {
1067 855 : let timelines_dir = self.conf.timelines_path(&self.tenant_shard_id);
1068 :
1069 855 : let entries = match timelines_dir.read_dir_utf8() {
1070 853 : Ok(d) => d,
1071 2 : Err(e) => {
1072 2 : if e.kind() == std::io::ErrorKind::NotFound {
1073 2 : return Ok(());
1074 : } else {
1075 0 : return Err(e).context("list timelines directory for tenant");
1076 : }
1077 : }
1078 : };
1079 :
1080 1285 : for entry in entries {
1081 432 : let entry = entry.context("read timeline dir entry")?;
1082 432 : let entry_path = entry.path();
1083 :
1084 432 : let purge = if crate::is_temporary(entry_path)
1085 : // TODO: uninit_mark isn't needed any more, since uninitialized timelines are already
1086 : // covered by the check that the timeline must exist in remote storage.
1087 429 : || is_uninit_mark(entry_path)
1088 427 : || crate::is_delete_mark(entry_path)
1089 : {
1090 5 : true
1091 : } else {
1092 427 : match TimelineId::try_from(entry_path.file_name()) {
1093 427 : Ok(i) => {
1094 427 : // Purge if the timeline ID does not exist in remote storage: remote storage is the authority.
1095 427 : !existent_timelines.contains(&i)
1096 : }
1097 0 : Err(e) => {
1098 0 : tracing::warn!(
1099 0 : "Unparseable directory in timelines directory: {entry_path}, ignoring ({e})"
1100 0 : );
1101 : // Do not purge junk: if we don't recognize it, be cautious and leave it for a human.
1102 0 : false
1103 : }
1104 : }
1105 : };
1106 :
1107 432 : if purge {
1108 10 : tracing::info!("Purging stale timeline dentry {entry_path}");
1109 10 : if let Err(e) = match entry.file_type() {
1110 10 : Ok(t) => if t.is_dir() {
1111 7 : std::fs::remove_dir_all(entry_path)
1112 : } else {
1113 3 : std::fs::remove_file(entry_path)
1114 : }
1115 10 : .or_else(fs_ext::ignore_not_found),
1116 0 : Err(e) => Err(e),
1117 : } {
1118 0 : tracing::warn!("Failed to purge stale timeline dentry {entry_path}: {e}");
1119 10 : }
1120 422 : }
1121 : }
1122 :
1123 853 : Ok(())
1124 855 : }
1125 :
1126 : /// Get sum of all remote timelines sizes
1127 : ///
1128 : /// This function relies on the index_part instead of listing the remote storage
1129 23 : pub fn remote_size(&self) -> u64 {
1130 23 : let mut size = 0;
1131 :
1132 23 : for timeline in self.list_timelines() {
1133 15 : if let Some(remote_client) = &timeline.remote_client {
1134 15 : size += remote_client.get_remote_physical_size();
1135 15 : }
1136 : }
1137 :
1138 23 : size
1139 23 : }
1140 :
1141 412 : #[instrument(skip_all, fields(timeline_id=%timeline_id))]
1142 : async fn load_remote_timeline(
1143 : &self,
1144 : timeline_id: TimelineId,
1145 : index_part: IndexPart,
1146 : remote_metadata: TimelineMetadata,
1147 : resources: TimelineResources,
1148 : ctx: &RequestContext,
1149 : ) -> anyhow::Result<()> {
1150 : span::debug_assert_current_span_has_tenant_id();
1151 :
1152 412 : info!("downloading index file for timeline {}", timeline_id);
1153 : tokio::fs::create_dir_all(self.conf.timeline_path(&self.tenant_shard_id, &timeline_id))
1154 : .await
1155 : .context("Failed to create new timeline directory")?;
1156 :
1157 : let ancestor = if let Some(ancestor_id) = remote_metadata.ancestor_timeline() {
1158 : let timelines = self.timelines.lock().unwrap();
1159 : Some(Arc::clone(timelines.get(&ancestor_id).ok_or_else(
1160 0 : || {
1161 0 : anyhow::anyhow!(
1162 0 : "cannot find ancestor timeline {ancestor_id} for timeline {timeline_id}"
1163 0 : )
1164 0 : },
1165 : )?))
1166 : } else {
1167 : None
1168 : };
1169 :
1170 : // timeline loading after attach expects to find metadata file for each metadata
1171 : save_metadata(
1172 : self.conf,
1173 : &self.tenant_shard_id,
1174 : &timeline_id,
1175 : &remote_metadata,
1176 : )
1177 : .await
1178 : .context("save_metadata")
1179 : .map_err(LoadLocalTimelineError::Load)?;
1180 :
1181 : self.timeline_init_and_sync(
1182 : timeline_id,
1183 : resources,
1184 : Some(index_part),
1185 : remote_metadata,
1186 : ancestor,
1187 : ctx,
1188 : )
1189 : .await
1190 : }
1191 :
1192 : /// Create a placeholder Tenant object for a broken tenant
1193 0 : pub fn create_broken_tenant(
1194 0 : conf: &'static PageServerConf,
1195 0 : tenant_shard_id: TenantShardId,
1196 0 : reason: String,
1197 0 : ) -> Arc<Tenant> {
1198 0 : Arc::new(Tenant::new(
1199 0 : TenantState::Broken {
1200 0 : reason,
1201 0 : backtrace: String::new(),
1202 0 : },
1203 0 : conf,
1204 0 : AttachedTenantConf::try_from(LocationConf::default()).unwrap(),
1205 0 : // Shard identity isn't meaningful for a broken tenant: it's just a placeholder
1206 0 : // to occupy the slot for this TenantShardId.
1207 0 : ShardIdentity::broken(tenant_shard_id.shard_number, tenant_shard_id.shard_count),
1208 0 : None,
1209 0 : tenant_shard_id,
1210 0 : None,
1211 0 : DeletionQueueClient::broken(),
1212 0 : ))
1213 0 : }
1214 :
1215 80 : fn scan_and_sort_timelines_dir(self: Arc<Tenant>) -> anyhow::Result<TenantDirectoryScan> {
1216 80 : let mut timelines_to_load: HashMap<TimelineId, TimelineMetadata> = HashMap::new();
1217 80 : // Note timelines_to_resume_deletion needs to be separate because it can be not sortable
1218 80 : // from the point of `tree_sort_timelines`. I e some parents can be missing because deletion
1219 80 : // completed in non topological order (for example because parent has smaller number of layer files in it)
1220 80 : let mut timelines_to_resume_deletion: Vec<(TimelineId, Option<TimelineMetadata>)> = vec![];
1221 80 :
1222 80 : let timelines_dir = self.conf.timelines_path(&self.tenant_shard_id);
1223 :
1224 80 : for entry in timelines_dir
1225 80 : .read_dir_utf8()
1226 80 : .context("list timelines directory for tenant")?
1227 : {
1228 6 : let entry = entry.context("read timeline dir entry")?;
1229 6 : let timeline_dir = entry.path();
1230 6 :
1231 6 : if crate::is_temporary(timeline_dir) {
1232 0 : info!("Found temporary timeline directory, removing: {timeline_dir}");
1233 0 : if let Err(e) = std::fs::remove_dir_all(timeline_dir) {
1234 0 : error!("Failed to remove temporary directory '{timeline_dir}': {e:?}");
1235 0 : }
1236 6 : } else if is_uninit_mark(timeline_dir) {
1237 2 : if !timeline_dir.exists() {
1238 0 : warn!("Timeline dir entry become invalid: {timeline_dir}");
1239 0 : continue;
1240 2 : }
1241 2 :
1242 2 : let timeline_uninit_mark_file = &timeline_dir;
1243 2 : info!(
1244 2 : "Found an uninit mark file {timeline_uninit_mark_file}, removing the timeline and its uninit mark",
1245 2 : );
1246 2 : let timeline_id =
1247 2 : TimelineId::try_from(timeline_uninit_mark_file.file_stem())
1248 2 : .with_context(|| {
1249 0 : format!(
1250 0 : "Could not parse timeline id out of the timeline uninit mark name {timeline_uninit_mark_file}",
1251 0 : )
1252 2 : })?;
1253 2 : let timeline_dir = self.conf.timeline_path(&self.tenant_shard_id, &timeline_id);
1254 0 : if let Err(e) =
1255 2 : remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
1256 : {
1257 0 : error!("Failed to clean up uninit marked timeline: {e:?}");
1258 2 : }
1259 4 : } else if crate::is_delete_mark(timeline_dir) {
1260 : // If metadata exists, load as usual, continue deletion
1261 0 : let timeline_id = TimelineId::try_from(timeline_dir.file_stem())
1262 0 : .with_context(|| {
1263 0 : format!(
1264 0 : "Could not parse timeline id out of the timeline uninit mark name {timeline_dir}",
1265 0 : )
1266 0 : })?;
1267 :
1268 0 : info!("Found deletion mark for timeline {}", timeline_id);
1269 :
1270 0 : match load_metadata(self.conf, &self.tenant_shard_id, &timeline_id) {
1271 0 : Ok(metadata) => {
1272 0 : timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
1273 : }
1274 0 : Err(e) => match &e {
1275 0 : LoadMetadataError::Read(r) => {
1276 0 : if r.kind() != io::ErrorKind::NotFound {
1277 0 : return Err(anyhow::anyhow!(e)).with_context(|| {
1278 0 : format!("Failed to load metadata for timeline_id {timeline_id}")
1279 0 : });
1280 0 : }
1281 0 :
1282 0 : // If metadata doesnt exist it means that we've crashed without
1283 0 : // completing cleanup_remaining_timeline_fs_traces in DeleteTimelineFlow.
1284 0 : // So save timeline_id for later call to `DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces`.
1285 0 : // We cant do it here because the method is async so we'd need block_on
1286 0 : // and here we're in spawn_blocking. cleanup_remaining_timeline_fs_traces uses fs operations
1287 0 : // so that basically results in a cycle:
1288 0 : // spawn_blocking
1289 0 : // - block_on
1290 0 : // - spawn_blocking
1291 0 : // which can lead to running out of threads in blocing pool.
1292 0 : timelines_to_resume_deletion.push((timeline_id, None));
1293 : }
1294 : _ => {
1295 0 : return Err(anyhow::anyhow!(e)).with_context(|| {
1296 0 : format!("Failed to load metadata for timeline_id {timeline_id}")
1297 0 : })
1298 : }
1299 : },
1300 : }
1301 : } else {
1302 4 : if !timeline_dir.exists() {
1303 2 : warn!("Timeline dir entry become invalid: {timeline_dir}");
1304 2 : continue;
1305 2 : }
1306 2 : let timeline_id = TimelineId::try_from(timeline_dir.file_name())
1307 2 : .with_context(|| {
1308 0 : format!(
1309 0 : "Could not parse timeline id out of the timeline dir name {timeline_dir}",
1310 0 : )
1311 2 : })?;
1312 2 : let timeline_uninit_mark_file = self
1313 2 : .conf
1314 2 : .timeline_uninit_mark_file_path(self.tenant_shard_id, timeline_id);
1315 2 : if timeline_uninit_mark_file.exists() {
1316 0 : info!(
1317 0 : %timeline_id,
1318 0 : "Found an uninit mark file, removing the timeline and its uninit mark",
1319 0 : );
1320 0 : if let Err(e) =
1321 0 : remove_timeline_and_uninit_mark(timeline_dir, &timeline_uninit_mark_file)
1322 : {
1323 0 : error!("Failed to clean up uninit marked timeline: {e:?}");
1324 0 : }
1325 0 : continue;
1326 2 : }
1327 2 :
1328 2 : let timeline_delete_mark_file = self
1329 2 : .conf
1330 2 : .timeline_delete_mark_file_path(self.tenant_shard_id, timeline_id);
1331 2 : if timeline_delete_mark_file.exists() {
1332 : // Cleanup should be done in `is_delete_mark` branch above
1333 0 : continue;
1334 2 : }
1335 2 :
1336 2 : let file_name = entry.file_name();
1337 2 : if let Ok(timeline_id) = file_name.parse::<TimelineId>() {
1338 2 : let metadata = load_metadata(self.conf, &self.tenant_shard_id, &timeline_id)
1339 2 : .context("failed to load metadata")?;
1340 0 : timelines_to_load.insert(timeline_id, metadata);
1341 : } else {
1342 : // A file or directory that doesn't look like a timeline ID
1343 0 : warn!("unexpected file or directory in timelines directory: {file_name}");
1344 : }
1345 : }
1346 : }
1347 :
1348 : // Sort the array of timeline IDs into tree-order, so that parent comes before
1349 : // all its children.
1350 78 : tree_sort_timelines(timelines_to_load, |m| m.ancestor_timeline()).map(|sorted_timelines| {
1351 78 : TenantDirectoryScan {
1352 78 : sorted_timelines_to_load: sorted_timelines,
1353 78 : timelines_to_resume_deletion,
1354 78 : }
1355 78 : })
1356 80 : }
1357 :
1358 840 : async fn load_timeline_metadata(
1359 840 : self: &Arc<Tenant>,
1360 840 : timeline_ids: HashSet<TimelineId>,
1361 840 : remote_storage: &GenericRemoteStorage,
1362 840 : cancel: CancellationToken,
1363 840 : ) -> anyhow::Result<HashMap<TimelineId, TimelinePreload>> {
1364 840 : let mut part_downloads = JoinSet::new();
1365 1267 : for timeline_id in timeline_ids {
1366 427 : let client = RemoteTimelineClient::new(
1367 427 : remote_storage.clone(),
1368 427 : self.deletion_queue_client.clone(),
1369 427 : self.conf,
1370 427 : self.tenant_shard_id,
1371 427 : timeline_id,
1372 427 : self.generation,
1373 427 : );
1374 427 : let cancel_clone = cancel.clone();
1375 427 : part_downloads.spawn(
1376 427 : async move {
1377 427 : debug!("starting index part download");
1378 :
1379 2931 : let index_part = client.download_index_file(cancel_clone).await;
1380 :
1381 427 : debug!("finished index part download");
1382 :
1383 427 : Result::<_, anyhow::Error>::Ok(TimelinePreload {
1384 427 : client,
1385 427 : timeline_id,
1386 427 : index_part,
1387 427 : })
1388 427 : }
1389 427 : .map(move |res| {
1390 427 : res.with_context(|| format!("download index part for timeline {timeline_id}"))
1391 427 : })
1392 427 : .instrument(info_span!("download_index_part", %timeline_id)),
1393 : );
1394 : }
1395 :
1396 840 : let mut timeline_preloads: HashMap<TimelineId, TimelinePreload> = HashMap::new();
1397 :
1398 1267 : loop {
1399 1670 : tokio::select!(
1400 1267 : next = part_downloads.join_next() => {
1401 : match next {
1402 : Some(result) => {
1403 : let preload_result = result.context("join preload task")?;
1404 : let preload = preload_result?;
1405 : timeline_preloads.insert(preload.timeline_id, preload);
1406 : },
1407 : None => {
1408 : break;
1409 : }
1410 : }
1411 : },
1412 : _ = cancel.cancelled() => {
1413 : anyhow::bail!("Cancelled while waiting for remote index download")
1414 : }
1415 1267 : )
1416 1267 : }
1417 :
1418 840 : Ok(timeline_preloads)
1419 840 : }
1420 :
1421 : ///
1422 : /// Background task to load in-memory data structures for this tenant, from
1423 : /// files on disk. Used at pageserver startup.
1424 : ///
1425 : /// No background tasks are started as part of this routine.
1426 80 : async fn load_local(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
1427 80 : span::debug_assert_current_span_has_tenant_id();
1428 :
1429 0 : debug!("loading tenant task");
1430 :
1431 : // Load in-memory state to reflect the local files on disk
1432 : //
1433 : // Scan the directory, peek into the metadata file of each timeline, and
1434 : // collect a list of timelines and their ancestors.
1435 80 : let span = info_span!("blocking");
1436 80 : let cloned = Arc::clone(self);
1437 :
1438 80 : let scan = tokio::task::spawn_blocking(move || {
1439 80 : let _g = span.entered();
1440 80 : cloned.scan_and_sort_timelines_dir()
1441 80 : })
1442 80 : .await
1443 80 : .context("load spawn_blocking")
1444 80 : .and_then(|res| res)?;
1445 :
1446 : // FIXME original collect_timeline_files contained one more check:
1447 : // 1. "Timeline has no ancestor and no layer files"
1448 :
1449 : // Process loadable timelines first
1450 78 : for (timeline_id, local_metadata) in scan.sorted_timelines_to_load {
1451 0 : if let Err(e) = self
1452 0 : .load_local_timeline(timeline_id, local_metadata, ctx, false)
1453 0 : .await
1454 : {
1455 0 : match e {
1456 0 : LoadLocalTimelineError::Load(source) => {
1457 0 : return Err(anyhow::anyhow!(source)).with_context(|| {
1458 0 : format!("Failed to load local timeline: {timeline_id}")
1459 0 : })
1460 : }
1461 0 : LoadLocalTimelineError::ResumeDeletion(source) => {
1462 : // Make sure resumed deletion wont fail loading for entire tenant.
1463 0 : error!("Failed to resume timeline deletion: {source:#}")
1464 : }
1465 : }
1466 0 : }
1467 : }
1468 :
1469 : // Resume deletion ones with deleted_mark
1470 78 : for (timeline_id, maybe_local_metadata) in scan.timelines_to_resume_deletion {
1471 0 : match maybe_local_metadata {
1472 : None => {
1473 : // See comment in `scan_and_sort_timelines_dir`.
1474 0 : if let Err(e) =
1475 0 : DeleteTimelineFlow::cleanup_remaining_timeline_fs_traces(self, timeline_id)
1476 0 : .await
1477 : {
1478 0 : warn!(
1479 0 : "cannot clean up deleted timeline dir timeline_id: {} error: {:#}",
1480 0 : timeline_id, e
1481 0 : );
1482 0 : }
1483 : }
1484 0 : Some(local_metadata) => {
1485 0 : if let Err(e) = self
1486 0 : .load_local_timeline(timeline_id, local_metadata, ctx, true)
1487 0 : .await
1488 : {
1489 0 : match e {
1490 0 : LoadLocalTimelineError::Load(source) => {
1491 0 : // We tried to load deleted timeline, this is a bug.
1492 0 : return Err(anyhow::anyhow!(source).context(
1493 0 : format!("This is a bug. We tried to load deleted timeline which is wrong and loading failed. Timeline: {timeline_id}")
1494 0 : ));
1495 : }
1496 0 : LoadLocalTimelineError::ResumeDeletion(source) => {
1497 : // Make sure resumed deletion wont fail loading for entire tenant.
1498 0 : error!("Failed to resume timeline deletion: {source:#}")
1499 : }
1500 : }
1501 0 : }
1502 : }
1503 : }
1504 : }
1505 :
1506 0 : trace!("Done");
1507 :
1508 78 : Ok(())
1509 80 : }
1510 :
1511 : /// Subroutine of `load_tenant`, to load an individual timeline
1512 : ///
1513 : /// NB: The parent is assumed to be already loaded!
1514 0 : #[instrument(skip(self, local_metadata, ctx))]
1515 : async fn load_local_timeline(
1516 : self: &Arc<Self>,
1517 : timeline_id: TimelineId,
1518 : local_metadata: TimelineMetadata,
1519 : ctx: &RequestContext,
1520 : found_delete_mark: bool,
1521 : ) -> Result<(), LoadLocalTimelineError> {
1522 : span::debug_assert_current_span_has_tenant_id();
1523 :
1524 : let resources = self.build_timeline_resources(timeline_id);
1525 :
1526 : if found_delete_mark {
1527 : // There is no remote client, we found local metadata.
1528 : // Continue cleaning up local disk.
1529 : DeleteTimelineFlow::resume_deletion(
1530 : Arc::clone(self),
1531 : timeline_id,
1532 : &local_metadata,
1533 : None,
1534 : self.deletion_queue_client.clone(),
1535 : )
1536 : .await
1537 : .context("resume deletion")
1538 : .map_err(LoadLocalTimelineError::ResumeDeletion)?;
1539 : return Ok(());
1540 : }
1541 :
1542 : let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
1543 : let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
1544 0 : .with_context(|| anyhow::anyhow!("cannot find ancestor timeline {ancestor_timeline_id} for timeline {timeline_id}"))
1545 : .map_err(LoadLocalTimelineError::Load)?;
1546 : Some(ancestor_timeline)
1547 : } else {
1548 : None
1549 : };
1550 :
1551 : self.timeline_init_and_sync(timeline_id, resources, None, local_metadata, ancestor, ctx)
1552 : .await
1553 : .map_err(LoadLocalTimelineError::Load)
1554 : }
1555 :
1556 672 : pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
1557 672 : self.tenant_shard_id
1558 672 : }
1559 :
1560 : /// Get Timeline handle for given Neon timeline ID.
1561 : /// This function is idempotent. It doesn't change internal state in any way.
1562 17379 : pub fn get_timeline(
1563 17379 : &self,
1564 17379 : timeline_id: TimelineId,
1565 17379 : active_only: bool,
1566 17379 : ) -> Result<Arc<Timeline>, GetTimelineError> {
1567 17379 : let timelines_accessor = self.timelines.lock().unwrap();
1568 17379 : let timeline = timelines_accessor
1569 17379 : .get(&timeline_id)
1570 17379 : .ok_or(GetTimelineError::NotFound {
1571 17379 : tenant_id: self.tenant_shard_id,
1572 17379 : timeline_id,
1573 17379 : })?;
1574 :
1575 17326 : if active_only && !timeline.is_active() {
1576 6 : Err(GetTimelineError::NotActive {
1577 6 : tenant_id: self.tenant_shard_id,
1578 6 : timeline_id,
1579 6 : state: timeline.current_state(),
1580 6 : })
1581 : } else {
1582 17320 : Ok(Arc::clone(timeline))
1583 : }
1584 17379 : }
1585 :
1586 : /// Lists timelines the tenant contains.
1587 : /// Up to tenant's implementation to omit certain timelines that ar not considered ready for use.
1588 759 : pub fn list_timelines(&self) -> Vec<Arc<Timeline>> {
1589 759 : self.timelines
1590 759 : .lock()
1591 759 : .unwrap()
1592 759 : .values()
1593 759 : .map(Arc::clone)
1594 759 : .collect()
1595 759 : }
1596 :
1597 486 : pub fn list_timeline_ids(&self) -> Vec<TimelineId> {
1598 486 : self.timelines.lock().unwrap().keys().cloned().collect()
1599 486 : }
1600 :
1601 : /// This is used to create the initial 'main' timeline during bootstrapping,
1602 : /// or when importing a new base backup. The caller is expected to load an
1603 : /// initial image of the datadir to the new timeline after this.
1604 : ///
1605 : /// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
1606 : /// and the timeline will fail to load at a restart.
1607 : ///
1608 : /// That's why we add an uninit mark file, and wrap it together witht the Timeline
1609 : /// in-memory object into UninitializedTimeline.
1610 : /// Once the caller is done setting up the timeline, they should call
1611 : /// `UninitializedTimeline::initialize_with_lock` to remove the uninit mark.
1612 : ///
1613 : /// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
1614 : /// minimum amount of keys required to get a writable timeline.
1615 : /// (Without it, `put` might fail due to `repartition` failing.)
1616 86 : pub(crate) async fn create_empty_timeline(
1617 86 : &self,
1618 86 : new_timeline_id: TimelineId,
1619 86 : initdb_lsn: Lsn,
1620 86 : pg_version: u32,
1621 86 : _ctx: &RequestContext,
1622 86 : ) -> anyhow::Result<UninitializedTimeline> {
1623 86 : anyhow::ensure!(
1624 86 : self.is_active(),
1625 0 : "Cannot create empty timelines on inactive tenant"
1626 : );
1627 :
1628 86 : let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id)?;
1629 84 : let new_metadata = TimelineMetadata::new(
1630 84 : // Initialize disk_consistent LSN to 0, The caller must import some data to
1631 84 : // make it valid, before calling finish_creation()
1632 84 : Lsn(0),
1633 84 : None,
1634 84 : None,
1635 84 : Lsn(0),
1636 84 : initdb_lsn,
1637 84 : initdb_lsn,
1638 84 : pg_version,
1639 84 : );
1640 84 : self.prepare_new_timeline(
1641 84 : new_timeline_id,
1642 84 : &new_metadata,
1643 84 : timeline_uninit_mark,
1644 84 : initdb_lsn,
1645 84 : None,
1646 84 : )
1647 108 : .await
1648 86 : }
1649 :
1650 : /// Helper for unit tests to create an empty timeline.
1651 : ///
1652 : /// The timeline is has state value `Active` but its background loops are not running.
1653 : // This makes the various functions which anyhow::ensure! for Active state work in tests.
1654 : // Our current tests don't need the background loops.
1655 : #[cfg(test)]
1656 68 : pub async fn create_test_timeline(
1657 68 : &self,
1658 68 : new_timeline_id: TimelineId,
1659 68 : initdb_lsn: Lsn,
1660 68 : pg_version: u32,
1661 68 : ctx: &RequestContext,
1662 68 : ) -> anyhow::Result<Arc<Timeline>> {
1663 68 : let uninit_tl = self
1664 68 : .create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
1665 102 : .await?;
1666 68 : let tline = uninit_tl.raw_timeline().expect("we just created it");
1667 68 : assert_eq!(tline.get_last_record_lsn(), Lsn(0));
1668 :
1669 : // Setup minimum keys required for the timeline to be usable.
1670 68 : let mut modification = tline.begin_modification(initdb_lsn);
1671 68 : modification
1672 68 : .init_empty_test_timeline()
1673 68 : .context("init_empty_test_timeline")?;
1674 68 : modification
1675 68 : .commit(ctx)
1676 34 : .await
1677 68 : .context("commit init_empty_test_timeline modification")?;
1678 :
1679 : // Flush to disk so that uninit_tl's check for valid disk_consistent_lsn passes.
1680 68 : tline.maybe_spawn_flush_loop();
1681 68 : tline.freeze_and_flush().await.context("freeze_and_flush")?;
1682 :
1683 : // Make sure the freeze_and_flush reaches remote storage.
1684 68 : tline
1685 68 : .remote_client
1686 68 : .as_ref()
1687 68 : .unwrap()
1688 68 : .wait_completion()
1689 31 : .await
1690 68 : .unwrap();
1691 :
1692 68 : let tl = uninit_tl.finish_creation()?;
1693 : // The non-test code would call tl.activate() here.
1694 68 : tl.set_state(TimelineState::Active);
1695 68 : Ok(tl)
1696 68 : }
1697 :
1698 : /// Create a new timeline.
1699 : ///
1700 : /// Returns the new timeline ID and reference to its Timeline object.
1701 : ///
1702 : /// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
1703 : /// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
1704 : #[allow(clippy::too_many_arguments)]
1705 893 : pub(crate) async fn create_timeline(
1706 893 : &self,
1707 893 : new_timeline_id: TimelineId,
1708 893 : ancestor_timeline_id: Option<TimelineId>,
1709 893 : mut ancestor_start_lsn: Option<Lsn>,
1710 893 : pg_version: u32,
1711 893 : load_existing_initdb: Option<TimelineId>,
1712 893 : broker_client: storage_broker::BrokerClientChannel,
1713 893 : ctx: &RequestContext,
1714 893 : ) -> Result<Arc<Timeline>, CreateTimelineError> {
1715 893 : if !self.is_active() {
1716 0 : if matches!(self.current_state(), TenantState::Stopping { .. }) {
1717 0 : return Err(CreateTimelineError::ShuttingDown);
1718 : } else {
1719 0 : return Err(CreateTimelineError::Other(anyhow::anyhow!(
1720 0 : "Cannot create timelines on inactive tenant"
1721 0 : )));
1722 : }
1723 893 : }
1724 :
1725 893 : let _gate = self
1726 893 : .gate
1727 893 : .enter()
1728 893 : .map_err(|_| CreateTimelineError::ShuttingDown)?;
1729 :
1730 : // Get exclusive access to the timeline ID: this ensures that it does not already exist,
1731 : // and that no other creation attempts will be allowed in while we are working. The
1732 : // uninit_mark is a guard.
1733 893 : let uninit_mark = match self.create_timeline_uninit_mark(new_timeline_id) {
1734 865 : Ok(m) => m,
1735 : Err(TimelineExclusionError::AlreadyCreating) => {
1736 : // Creation is in progress, we cannot create it again, and we cannot
1737 : // check if this request matches the existing one, so caller must try
1738 : // again later.
1739 1 : return Err(CreateTimelineError::AlreadyCreating);
1740 : }
1741 0 : Err(TimelineExclusionError::Other(e)) => {
1742 0 : return Err(CreateTimelineError::Other(e));
1743 : }
1744 27 : Err(TimelineExclusionError::AlreadyExists(existing)) => {
1745 0 : debug!("timeline {new_timeline_id} already exists");
1746 :
1747 : // Idempotency: creating the same timeline twice is not an error, unless
1748 : // the second creation has different parameters.
1749 27 : if existing.get_ancestor_timeline_id() != ancestor_timeline_id
1750 27 : || existing.pg_version != pg_version
1751 27 : || (ancestor_start_lsn.is_some()
1752 0 : && ancestor_start_lsn != Some(existing.get_ancestor_lsn()))
1753 : {
1754 0 : return Err(CreateTimelineError::Conflict);
1755 27 : }
1756 :
1757 27 : if let Some(remote_client) = existing.remote_client.as_ref() {
1758 : // Wait for uploads to complete, so that when we return Ok, the timeline
1759 : // is known to be durable on remote storage. Just like we do at the end of
1760 : // this function, after we have created the timeline ourselves.
1761 : //
1762 : // We only really care that the initial version of `index_part.json` has
1763 : // been uploaded. That's enough to remember that the timeline
1764 : // exists. However, there is no function to wait specifically for that so
1765 : // we just wait for all in-progress uploads to finish.
1766 27 : remote_client
1767 27 : .wait_completion()
1768 0 : .await
1769 27 : .context("wait for timeline uploads to complete")?;
1770 0 : }
1771 :
1772 27 : return Ok(existing);
1773 : }
1774 : };
1775 :
1776 865 : let loaded_timeline = match ancestor_timeline_id {
1777 270 : Some(ancestor_timeline_id) => {
1778 270 : let ancestor_timeline = self
1779 270 : .get_timeline(ancestor_timeline_id, false)
1780 270 : .context("Cannot branch off the timeline that's not present in pageserver")?;
1781 :
1782 : // instead of waiting around, just deny the request because ancestor is not yet
1783 : // ready for other purposes either.
1784 270 : if !ancestor_timeline.is_active() {
1785 6 : return Err(CreateTimelineError::AncestorNotActive);
1786 264 : }
1787 :
1788 264 : if let Some(lsn) = ancestor_start_lsn.as_mut() {
1789 33 : *lsn = lsn.align();
1790 33 :
1791 33 : let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn();
1792 33 : if ancestor_ancestor_lsn > *lsn {
1793 : // can we safely just branch from the ancestor instead?
1794 2 : return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
1795 2 : "invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
1796 2 : lsn,
1797 2 : ancestor_timeline_id,
1798 2 : ancestor_ancestor_lsn,
1799 2 : )));
1800 31 : }
1801 31 :
1802 31 : // Wait for the WAL to arrive and be processed on the parent branch up
1803 31 : // to the requested branch point. The repository code itself doesn't
1804 31 : // require it, but if we start to receive WAL on the new timeline,
1805 31 : // decoding the new WAL might need to look up previous pages, relation
1806 31 : // sizes etc. and that would get confused if the previous page versions
1807 31 : // are not in the repository yet.
1808 31 : ancestor_timeline
1809 31 : .wait_lsn(*lsn, ctx)
1810 5 : .await
1811 31 : .map_err(|e| match e {
1812 0 : e @ (WaitLsnError::Timeout(_) | WaitLsnError::BadState) => {
1813 0 : CreateTimelineError::AncestorLsn(anyhow::anyhow!(e))
1814 : }
1815 0 : WaitLsnError::Shutdown => CreateTimelineError::ShuttingDown,
1816 31 : })?;
1817 231 : }
1818 :
1819 262 : self.branch_timeline(
1820 262 : &ancestor_timeline,
1821 262 : new_timeline_id,
1822 262 : ancestor_start_lsn,
1823 262 : uninit_mark,
1824 262 : ctx,
1825 262 : )
1826 6 : .await?
1827 : }
1828 : None => {
1829 595 : self.bootstrap_timeline(
1830 595 : new_timeline_id,
1831 595 : pg_version,
1832 595 : load_existing_initdb,
1833 595 : uninit_mark,
1834 595 : ctx,
1835 595 : )
1836 7640121 : .await?
1837 : }
1838 : };
1839 :
1840 : // At this point we have dropped our guard on [`Self::timelines_creating`], and
1841 : // the timeline is visible in [`Self::timelines`], but it is _not_ durable yet. We must
1842 : // not send a success to the caller until it is. The same applies to handling retries,
1843 : // see the handling of [`TimelineExclusionError::AlreadyExists`] above.
1844 843 : if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
1845 843 : let kind = ancestor_timeline_id
1846 843 : .map(|_| "branched")
1847 843 : .unwrap_or("bootstrapped");
1848 843 : remote_client.wait_completion().await.with_context(|| {
1849 0 : format!("wait for {} timeline initial uploads to complete", kind)
1850 839 : })?;
1851 0 : }
1852 :
1853 839 : loaded_timeline.activate(broker_client, None, ctx);
1854 839 :
1855 839 : Ok(loaded_timeline)
1856 887 : }
1857 :
1858 64 : pub(crate) async fn delete_timeline(
1859 64 : self: Arc<Self>,
1860 64 : timeline_id: TimelineId,
1861 64 : ) -> Result<(), DeleteTimelineError> {
1862 378 : DeleteTimelineFlow::run(&self, timeline_id, false).await?;
1863 :
1864 53 : Ok(())
1865 64 : }
1866 :
1867 : /// perform one garbage collection iteration, removing old data files from disk.
1868 : /// this function is periodically called by gc task.
1869 : /// also it can be explicitly requested through page server api 'do_gc' command.
1870 : ///
1871 : /// `target_timeline_id` specifies the timeline to GC, or None for all.
1872 : ///
1873 : /// The `horizon` an `pitr` parameters determine how much WAL history needs to be retained.
1874 : /// Also known as the retention period, or the GC cutoff point. `horizon` specifies
1875 : /// the amount of history, as LSN difference from current latest LSN on each timeline.
1876 : /// `pitr` specifies the same as a time difference from the current time. The effective
1877 : /// GC cutoff point is determined conservatively by either `horizon` and `pitr`, whichever
1878 : /// requires more history to be retained.
1879 : //
1880 304 : pub async fn gc_iteration(
1881 304 : &self,
1882 304 : target_timeline_id: Option<TimelineId>,
1883 304 : horizon: u64,
1884 304 : pitr: Duration,
1885 304 : cancel: &CancellationToken,
1886 304 : ctx: &RequestContext,
1887 304 : ) -> anyhow::Result<GcResult> {
1888 304 : // Don't start doing work during shutdown
1889 304 : if let TenantState::Stopping { .. } = self.current_state() {
1890 0 : return Ok(GcResult::default());
1891 304 : }
1892 304 :
1893 304 : // there is a global allowed_error for this
1894 304 : anyhow::ensure!(
1895 304 : self.is_active(),
1896 0 : "Cannot run GC iteration on inactive tenant"
1897 : );
1898 :
1899 : {
1900 304 : let conf = self.tenant_conf.read().unwrap();
1901 304 :
1902 304 : if !conf.location.may_delete_layers_hint() {
1903 2 : info!("Skipping GC in location state {:?}", conf.location);
1904 2 : return Ok(GcResult::default());
1905 302 : }
1906 302 : }
1907 302 :
1908 302 : self.gc_iteration_internal(target_timeline_id, horizon, pitr, cancel, ctx)
1909 264491 : .await
1910 303 : }
1911 :
1912 : /// Perform one compaction iteration.
1913 : /// This function is periodically called by compactor task.
1914 : /// Also it can be explicitly requested per timeline through page server
1915 : /// api's 'compact' command.
1916 390 : async fn compaction_iteration(
1917 390 : &self,
1918 390 : cancel: &CancellationToken,
1919 390 : ctx: &RequestContext,
1920 390 : ) -> anyhow::Result<(), timeline::CompactionError> {
1921 390 : // Don't start doing work during shutdown, or when broken, we do not need those in the logs
1922 390 : if !self.is_active() {
1923 1 : return Ok(());
1924 389 : }
1925 389 :
1926 389 : {
1927 389 : let conf = self.tenant_conf.read().unwrap();
1928 389 : if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
1929 6 : info!("Skipping compaction in location state {:?}", conf.location);
1930 6 : return Ok(());
1931 383 : }
1932 383 : }
1933 383 :
1934 383 : // Scan through the hashmap and collect a list of all the timelines,
1935 383 : // while holding the lock. Then drop the lock and actually perform the
1936 383 : // compactions. We don't want to block everything else while the
1937 383 : // compaction runs.
1938 383 : let timelines_to_compact = {
1939 383 : let timelines = self.timelines.lock().unwrap();
1940 383 : let timelines_to_compact = timelines
1941 383 : .iter()
1942 566 : .filter_map(|(timeline_id, timeline)| {
1943 566 : if timeline.is_active() {
1944 563 : Some((*timeline_id, timeline.clone()))
1945 : } else {
1946 3 : None
1947 : }
1948 566 : })
1949 383 : .collect::<Vec<_>>();
1950 383 : drop(timelines);
1951 383 : timelines_to_compact
1952 : };
1953 :
1954 942 : for (timeline_id, timeline) in &timelines_to_compact {
1955 561 : timeline
1956 561 : .compact(cancel, EnumSet::empty(), ctx)
1957 561 : .instrument(info_span!("compact_timeline", %timeline_id))
1958 128297 : .await?;
1959 : }
1960 :
1961 381 : Ok(())
1962 388 : }
1963 :
1964 34368 : pub fn current_state(&self) -> TenantState {
1965 34368 : self.state.borrow().clone()
1966 34368 : }
1967 :
1968 2633 : pub fn is_active(&self) -> bool {
1969 2633 : self.current_state() == TenantState::Active
1970 2633 : }
1971 :
1972 726 : pub fn generation(&self) -> Generation {
1973 726 : self.generation
1974 726 : }
1975 :
1976 486 : pub(crate) fn wal_redo_manager_status(&self) -> Option<WalRedoManagerStatus> {
1977 486 : self.walredo_mgr.as_ref().and_then(|mgr| mgr.status())
1978 486 : }
1979 :
1980 : /// Changes tenant status to active, unless shutdown was already requested.
1981 : ///
1982 : /// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
1983 : /// to delay background jobs. Background jobs can be started right away when None is given.
1984 830 : fn activate(
1985 830 : self: &Arc<Self>,
1986 830 : broker_client: BrokerClientChannel,
1987 830 : background_jobs_can_start: Option<&completion::Barrier>,
1988 830 : ctx: &RequestContext,
1989 830 : ) {
1990 830 : span::debug_assert_current_span_has_tenant_id();
1991 830 :
1992 830 : let mut activating = false;
1993 830 : self.state.send_modify(|current_state| {
1994 830 : use pageserver_api::models::ActivatingFrom;
1995 830 : match &*current_state {
1996 : TenantState::Activating(_) | TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => {
1997 0 : panic!("caller is responsible for calling activate() only on Loading / Attaching tenants, got {state:?}", state = current_state);
1998 : }
1999 0 : TenantState::Loading => {
2000 0 : *current_state = TenantState::Activating(ActivatingFrom::Loading);
2001 0 : }
2002 830 : TenantState::Attaching => {
2003 830 : *current_state = TenantState::Activating(ActivatingFrom::Attaching);
2004 830 : }
2005 : }
2006 830 : debug!(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), "Activating tenant");
2007 830 : activating = true;
2008 830 : // Continue outside the closure. We need to grab timelines.lock()
2009 830 : // and we plan to turn it into a tokio::sync::Mutex in a future patch.
2010 830 : });
2011 830 :
2012 830 : if activating {
2013 830 : let timelines_accessor = self.timelines.lock().unwrap();
2014 830 : let timelines_to_activate = timelines_accessor
2015 830 : .values()
2016 830 : .filter(|timeline| !(timeline.is_broken() || timeline.is_stopping()));
2017 830 :
2018 830 : // Spawn gc and compaction loops. The loops will shut themselves
2019 830 : // down when they notice that the tenant is inactive.
2020 830 : tasks::start_background_loops(self, background_jobs_can_start);
2021 830 :
2022 830 : let mut activated_timelines = 0;
2023 :
2024 1216 : for timeline in timelines_to_activate {
2025 386 : timeline.activate(broker_client.clone(), background_jobs_can_start, ctx);
2026 386 : activated_timelines += 1;
2027 386 : }
2028 :
2029 830 : self.state.send_modify(move |current_state| {
2030 830 : assert!(
2031 830 : matches!(current_state, TenantState::Activating(_)),
2032 0 : "set_stopping and set_broken wait for us to leave Activating state",
2033 : );
2034 830 : *current_state = TenantState::Active;
2035 830 :
2036 830 : let elapsed = self.constructed_at.elapsed();
2037 830 : let total_timelines = timelines_accessor.len();
2038 830 :
2039 830 : // log a lot of stuff, because some tenants sometimes suffer from user-visible
2040 830 : // times to activate. see https://github.com/neondatabase/neon/issues/4025
2041 830 : info!(
2042 830 : since_creation_millis = elapsed.as_millis(),
2043 830 : tenant_id = %self.tenant_shard_id.tenant_id,
2044 830 : shard_id = %self.tenant_shard_id.shard_slug(),
2045 830 : activated_timelines,
2046 830 : total_timelines,
2047 830 : post_state = <&'static str>::from(&*current_state),
2048 830 : "activation attempt finished"
2049 830 : );
2050 :
2051 830 : TENANT.activation.observe(elapsed.as_secs_f64());
2052 830 : });
2053 0 : }
2054 830 : }
2055 :
2056 : /// Shutdown the tenant and join all of the spawned tasks.
2057 : ///
2058 : /// The method caters for all use-cases:
2059 : /// - pageserver shutdown (freeze_and_flush == true)
2060 : /// - detach + ignore (freeze_and_flush == false)
2061 : ///
2062 : /// This will attempt to shutdown even if tenant is broken.
2063 : ///
2064 : /// `shutdown_progress` is a [`completion::Barrier`] for the shutdown initiated by this call.
2065 : /// If the tenant is already shutting down, we return a clone of the first shutdown call's
2066 : /// `Barrier` as an `Err`. This not-first caller can use the returned barrier to join with
2067 : /// the ongoing shutdown.
2068 449 : async fn shutdown(
2069 449 : &self,
2070 449 : shutdown_progress: completion::Barrier,
2071 449 : freeze_and_flush: bool,
2072 449 : ) -> Result<(), completion::Barrier> {
2073 449 : span::debug_assert_current_span_has_tenant_id();
2074 449 :
2075 449 : // Set tenant (and its timlines) to Stoppping state.
2076 449 : //
2077 449 : // Since we can only transition into Stopping state after activation is complete,
2078 449 : // run it in a JoinSet so all tenants have a chance to stop before we get SIGKILLed.
2079 449 : //
2080 449 : // Transitioning tenants to Stopping state has a couple of non-obvious side effects:
2081 449 : // 1. Lock out any new requests to the tenants.
2082 449 : // 2. Signal cancellation to WAL receivers (we wait on it below).
2083 449 : // 3. Signal cancellation for other tenant background loops.
2084 449 : // 4. ???
2085 449 : //
2086 449 : // The waiting for the cancellation is not done uniformly.
2087 449 : // We certainly wait for WAL receivers to shut down.
2088 449 : // That is necessary so that no new data comes in before the freeze_and_flush.
2089 449 : // But the tenant background loops are joined-on in our caller.
2090 449 : // It's mesed up.
2091 449 : // we just ignore the failure to stop
2092 449 :
2093 449 : // If we're still attaching, fire the cancellation token early to drop out: this
2094 449 : // will prevent us flushing, but ensures timely shutdown if some I/O during attach
2095 449 : // is very slow.
2096 449 : if matches!(self.current_state(), TenantState::Attaching) {
2097 14 : self.cancel.cancel();
2098 435 : }
2099 :
2100 449 : match self.set_stopping(shutdown_progress, false, false).await {
2101 392 : Ok(()) => {}
2102 57 : Err(SetStoppingError::Broken) => {
2103 57 : // assume that this is acceptable
2104 57 : }
2105 0 : Err(SetStoppingError::AlreadyStopping(other)) => {
2106 : // give caller the option to wait for this this shutdown
2107 0 : info!("Tenant::shutdown: AlreadyStopping");
2108 0 : return Err(other);
2109 : }
2110 : };
2111 :
2112 449 : let mut js = tokio::task::JoinSet::new();
2113 449 : {
2114 449 : let timelines = self.timelines.lock().unwrap();
2115 575 : timelines.values().for_each(|timeline| {
2116 575 : let timeline = Arc::clone(timeline);
2117 575 : let timeline_id = timeline.timeline_id;
2118 :
2119 575 : let span =
2120 575 : tracing::info_span!("timeline_shutdown", %timeline_id, ?freeze_and_flush);
2121 575 : js.spawn(async move {
2122 575 : if freeze_and_flush {
2123 829 : timeline.flush_and_shutdown().instrument(span).await
2124 : } else {
2125 786 : timeline.shutdown().instrument(span).await
2126 : }
2127 575 : });
2128 575 : })
2129 : };
2130 : // test_long_timeline_create_then_tenant_delete is leaning on this message
2131 449 : tracing::info!("Waiting for timelines...");
2132 1024 : while let Some(res) = js.join_next().await {
2133 0 : match res {
2134 575 : Ok(()) => {}
2135 0 : Err(je) if je.is_cancelled() => unreachable!("no cancelling used"),
2136 0 : Err(je) if je.is_panic() => { /* logged already */ }
2137 0 : Err(je) => warn!("unexpected JoinError: {je:?}"),
2138 : }
2139 : }
2140 :
2141 : // We cancel the Tenant's cancellation token _after_ the timelines have all shut down. This permits
2142 : // them to continue to do work during their shutdown methods, e.g. flushing data.
2143 0 : tracing::debug!("Cancelling CancellationToken");
2144 449 : self.cancel.cancel();
2145 :
2146 : // shutdown all tenant and timeline tasks: gc, compaction, page service
2147 : // No new tasks will be started for this tenant because it's in `Stopping` state.
2148 : //
2149 : // this will additionally shutdown and await all timeline tasks.
2150 0 : tracing::debug!("Waiting for tasks...");
2151 449 : task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), None).await;
2152 :
2153 : // Wait for any in-flight operations to complete
2154 449 : self.gate.close().await;
2155 :
2156 449 : Ok(())
2157 449 : }
2158 :
2159 : /// Change tenant status to Stopping, to mark that it is being shut down.
2160 : ///
2161 : /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
2162 : ///
2163 : /// This function is not cancel-safe!
2164 : ///
2165 : /// `allow_transition_from_loading` is needed for the special case of loading task deleting the tenant.
2166 : /// `allow_transition_from_attaching` is needed for the special case of attaching deleted tenant.
2167 470 : async fn set_stopping(
2168 470 : &self,
2169 470 : progress: completion::Barrier,
2170 470 : allow_transition_from_loading: bool,
2171 470 : allow_transition_from_attaching: bool,
2172 470 : ) -> Result<(), SetStoppingError> {
2173 470 : let mut rx = self.state.subscribe();
2174 470 :
2175 470 : // cannot stop before we're done activating, so wait out until we're done activating
2176 493 : rx.wait_for(|state| match state {
2177 21 : TenantState::Attaching if allow_transition_from_attaching => true,
2178 : TenantState::Activating(_) | TenantState::Attaching => {
2179 23 : info!(
2180 23 : "waiting for {} to turn Active|Broken|Stopping",
2181 23 : <&'static str>::from(state)
2182 23 : );
2183 23 : false
2184 : }
2185 0 : TenantState::Loading => allow_transition_from_loading,
2186 449 : TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
2187 493 : })
2188 23 : .await
2189 470 : .expect("cannot drop self.state while on a &self method");
2190 470 :
2191 470 : // we now know we're done activating, let's see whether this task is the winner to transition into Stopping
2192 470 : let mut err = None;
2193 470 : let stopping = self.state.send_if_modified(|current_state| match current_state {
2194 : TenantState::Activating(_) => {
2195 0 : unreachable!("1we ensured above that we're done with activation, and, there is no re-activation")
2196 : }
2197 : TenantState::Attaching => {
2198 21 : if !allow_transition_from_attaching {
2199 0 : unreachable!("2we ensured above that we're done with activation, and, there is no re-activation")
2200 21 : };
2201 21 : *current_state = TenantState::Stopping { progress };
2202 21 : true
2203 : }
2204 : TenantState::Loading => {
2205 0 : if !allow_transition_from_loading {
2206 0 : unreachable!("3we ensured above that we're done with activation, and, there is no re-activation")
2207 0 : };
2208 0 : *current_state = TenantState::Stopping { progress };
2209 0 : true
2210 : }
2211 : TenantState::Active => {
2212 : // FIXME: due to time-of-check vs time-of-use issues, it can happen that new timelines
2213 : // are created after the transition to Stopping. That's harmless, as the Timelines
2214 : // won't be accessible to anyone afterwards, because the Tenant is in Stopping state.
2215 392 : *current_state = TenantState::Stopping { progress };
2216 392 : // Continue stopping outside the closure. We need to grab timelines.lock()
2217 392 : // and we plan to turn it into a tokio::sync::Mutex in a future patch.
2218 392 : true
2219 : }
2220 57 : TenantState::Broken { reason, .. } => {
2221 57 : info!(
2222 57 : "Cannot set tenant to Stopping state, it is in Broken state due to: {reason}"
2223 57 : );
2224 57 : err = Some(SetStoppingError::Broken);
2225 57 : false
2226 : }
2227 0 : TenantState::Stopping { progress } => {
2228 0 : info!("Tenant is already in Stopping state");
2229 0 : err = Some(SetStoppingError::AlreadyStopping(progress.clone()));
2230 0 : false
2231 : }
2232 470 : });
2233 470 : match (stopping, err) {
2234 413 : (true, None) => {} // continue
2235 57 : (false, Some(err)) => return Err(err),
2236 0 : (true, Some(_)) => unreachable!(
2237 0 : "send_if_modified closure must error out if not transitioning to Stopping"
2238 0 : ),
2239 0 : (false, None) => unreachable!(
2240 0 : "send_if_modified closure must return true if transitioning to Stopping"
2241 0 : ),
2242 : }
2243 :
2244 413 : let timelines_accessor = self.timelines.lock().unwrap();
2245 413 : let not_broken_timelines = timelines_accessor
2246 413 : .values()
2247 515 : .filter(|timeline| !timeline.is_broken());
2248 919 : for timeline in not_broken_timelines {
2249 506 : timeline.set_state(TimelineState::Stopping);
2250 506 : }
2251 413 : Ok(())
2252 470 : }
2253 :
2254 : /// Method for tenant::mgr to transition us into Broken state in case of a late failure in
2255 : /// `remove_tenant_from_memory`
2256 : ///
2257 : /// This function waits for the tenant to become active if it isn't already, before transitioning it into Stopping state.
2258 : ///
2259 : /// In tests, we also use this to set tenants to Broken state on purpose.
2260 51 : pub(crate) async fn set_broken(&self, reason: String) {
2261 51 : let mut rx = self.state.subscribe();
2262 51 :
2263 51 : // The load & attach routines own the tenant state until it has reached `Active`.
2264 51 : // So, wait until it's done.
2265 51 : rx.wait_for(|state| match state {
2266 : TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
2267 0 : info!(
2268 0 : "waiting for {} to turn Active|Broken|Stopping",
2269 0 : <&'static str>::from(state)
2270 0 : );
2271 0 : false
2272 : }
2273 51 : TenantState::Active | TenantState::Broken { .. } | TenantState::Stopping { .. } => true,
2274 51 : })
2275 0 : .await
2276 51 : .expect("cannot drop self.state while on a &self method");
2277 51 :
2278 51 : // we now know we're done activating, let's see whether this task is the winner to transition into Broken
2279 51 : self.set_broken_no_wait(reason)
2280 51 : }
2281 :
2282 51 : pub(crate) fn set_broken_no_wait(&self, reason: impl Display) {
2283 51 : let reason = reason.to_string();
2284 51 : self.state.send_modify(|current_state| {
2285 51 : match *current_state {
2286 : TenantState::Activating(_) | TenantState::Loading | TenantState::Attaching => {
2287 0 : unreachable!("we ensured above that we're done with activation, and, there is no re-activation")
2288 : }
2289 : TenantState::Active => {
2290 2 : if cfg!(feature = "testing") {
2291 2 : warn!("Changing Active tenant to Broken state, reason: {}", reason);
2292 2 : *current_state = TenantState::broken_from_reason(reason);
2293 : } else {
2294 0 : unreachable!("not allowed to call set_broken on Active tenants in non-testing builds")
2295 : }
2296 : }
2297 : TenantState::Broken { .. } => {
2298 0 : warn!("Tenant is already in Broken state");
2299 : }
2300 : // This is the only "expected" path, any other path is a bug.
2301 : TenantState::Stopping { .. } => {
2302 49 : warn!(
2303 49 : "Marking Stopping tenant as Broken state, reason: {}",
2304 49 : reason
2305 49 : );
2306 49 : *current_state = TenantState::broken_from_reason(reason);
2307 : }
2308 : }
2309 51 : });
2310 51 : }
2311 :
2312 387 : pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TenantState> {
2313 387 : self.state.subscribe()
2314 387 : }
2315 :
2316 : /// The activate_now semaphore is initialized with zero units. As soon as
2317 : /// we add a unit, waiters will be able to acquire a unit and proceed.
2318 593 : pub(crate) fn activate_now(&self) {
2319 593 : self.activate_now_sem.add_permits(1);
2320 593 : }
2321 :
2322 1216 : pub(crate) async fn wait_to_become_active(
2323 1216 : &self,
2324 1216 : timeout: Duration,
2325 1216 : ) -> Result<(), GetActiveTenantError> {
2326 1216 : let mut receiver = self.state.subscribe();
2327 1672 : loop {
2328 1672 : let current_state = receiver.borrow_and_update().clone();
2329 1672 : match current_state {
2330 : TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
2331 : // in these states, there's a chance that we can reach ::Active
2332 460 : self.activate_now();
2333 597 : match timeout_cancellable(timeout, &self.cancel, receiver.changed()).await {
2334 456 : Ok(r) => {
2335 456 : r.map_err(
2336 456 : |_e: tokio::sync::watch::error::RecvError|
2337 : // Tenant existed but was dropped: report it as non-existent
2338 456 : GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_shard_id.tenant_id))
2339 456 : )?
2340 : }
2341 : Err(TimeoutCancellableError::Cancelled) => {
2342 4 : return Err(GetActiveTenantError::Cancelled);
2343 : }
2344 : Err(TimeoutCancellableError::Timeout) => {
2345 0 : return Err(GetActiveTenantError::WaitForActiveTimeout {
2346 0 : latest_state: Some(self.current_state()),
2347 0 : wait_time: timeout,
2348 0 : });
2349 : }
2350 : }
2351 : }
2352 : TenantState::Active { .. } => {
2353 1211 : return Ok(());
2354 : }
2355 : TenantState::Broken { .. } | TenantState::Stopping { .. } => {
2356 : // There's no chance the tenant can transition back into ::Active
2357 1 : return Err(GetActiveTenantError::WillNotBecomeActive(current_state));
2358 : }
2359 : }
2360 : }
2361 1216 : }
2362 :
2363 74 : pub(crate) fn get_attach_mode(&self) -> AttachmentMode {
2364 74 : self.tenant_conf.read().unwrap().location.attach_mode
2365 74 : }
2366 :
2367 : /// For API access: generate a LocationConfig equivalent to the one that would be used to
2368 : /// create a Tenant in the same state. Do not use this in hot paths: it's for relatively
2369 : /// rare external API calls, like a reconciliation at startup.
2370 4 : pub(crate) fn get_location_conf(&self) -> models::LocationConfig {
2371 4 : let conf = self.tenant_conf.read().unwrap();
2372 :
2373 4 : let location_config_mode = match conf.location.attach_mode {
2374 4 : AttachmentMode::Single => models::LocationConfigMode::AttachedSingle,
2375 0 : AttachmentMode::Multi => models::LocationConfigMode::AttachedMulti,
2376 0 : AttachmentMode::Stale => models::LocationConfigMode::AttachedStale,
2377 : };
2378 :
2379 : // We have a pageserver TenantConf, we need the API-facing TenantConfig.
2380 4 : let tenant_config: models::TenantConfig = conf.tenant_conf.into();
2381 4 :
2382 4 : models::LocationConfig {
2383 4 : mode: location_config_mode,
2384 4 : generation: self.generation.into(),
2385 4 : secondary_conf: None,
2386 4 : shard_number: self.shard_identity.number.0,
2387 4 : shard_count: self.shard_identity.count.0,
2388 4 : shard_stripe_size: self.shard_identity.stripe_size.0,
2389 4 : tenant_conf: tenant_config,
2390 4 : }
2391 4 : }
2392 :
2393 48 : pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
2394 48 : &self.tenant_shard_id
2395 48 : }
2396 :
2397 8 : pub(crate) fn get_generation(&self) -> Generation {
2398 8 : self.generation
2399 8 : }
2400 : }
2401 :
2402 : /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id),
2403 : /// perform a topological sort, so that the parent of each timeline comes
2404 : /// before the children.
2405 : /// E extracts the ancestor from T
2406 : /// This allows for T to be different. It can be TimelineMetadata, can be Timeline itself, etc.
2407 1040 : fn tree_sort_timelines<T, E>(
2408 1040 : timelines: HashMap<TimelineId, T>,
2409 1040 : extractor: E,
2410 1040 : ) -> anyhow::Result<Vec<(TimelineId, T)>>
2411 1040 : where
2412 1040 : E: Fn(&T) -> Option<TimelineId>,
2413 1040 : {
2414 1040 : let mut result = Vec::with_capacity(timelines.len());
2415 1040 :
2416 1040 : let mut now = Vec::with_capacity(timelines.len());
2417 1040 : // (ancestor, children)
2418 1040 : let mut later: HashMap<TimelineId, Vec<(TimelineId, T)>> =
2419 1040 : HashMap::with_capacity(timelines.len());
2420 :
2421 1615 : for (timeline_id, value) in timelines {
2422 575 : if let Some(ancestor_id) = extractor(&value) {
2423 57 : let children = later.entry(ancestor_id).or_default();
2424 57 : children.push((timeline_id, value));
2425 518 : } else {
2426 518 : now.push((timeline_id, value));
2427 518 : }
2428 : }
2429 :
2430 1615 : while let Some((timeline_id, metadata)) = now.pop() {
2431 575 : result.push((timeline_id, metadata));
2432 : // All children of this can be loaded now
2433 575 : if let Some(mut children) = later.remove(&timeline_id) {
2434 46 : now.append(&mut children);
2435 529 : }
2436 : }
2437 :
2438 : // All timelines should be visited now. Unless there were timelines with missing ancestors.
2439 1040 : if !later.is_empty() {
2440 0 : for (missing_id, orphan_ids) in later {
2441 0 : for (orphan_id, _) in orphan_ids {
2442 0 : error!("could not load timeline {orphan_id} because its ancestor timeline {missing_id} could not be loaded");
2443 : }
2444 : }
2445 0 : bail!("could not load tenant because some timelines are missing ancestors");
2446 1040 : }
2447 1040 :
2448 1040 : Ok(result)
2449 1040 : }
2450 :
2451 : impl Tenant {
2452 90 : pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
2453 90 : self.tenant_conf.read().unwrap().tenant_conf
2454 90 : }
2455 :
2456 45 : pub fn effective_config(&self) -> TenantConf {
2457 45 : self.tenant_specific_overrides()
2458 45 : .merge(self.conf.default_tenant_conf)
2459 45 : }
2460 :
2461 5 : pub fn get_checkpoint_distance(&self) -> u64 {
2462 5 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2463 5 : tenant_conf
2464 5 : .checkpoint_distance
2465 5 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
2466 5 : }
2467 :
2468 5 : pub fn get_checkpoint_timeout(&self) -> Duration {
2469 5 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2470 5 : tenant_conf
2471 5 : .checkpoint_timeout
2472 5 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
2473 5 : }
2474 :
2475 5 : pub fn get_compaction_target_size(&self) -> u64 {
2476 5 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2477 5 : tenant_conf
2478 5 : .compaction_target_size
2479 5 : .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
2480 5 : }
2481 :
2482 1210 : pub fn get_compaction_period(&self) -> Duration {
2483 1210 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2484 1210 : tenant_conf
2485 1210 : .compaction_period
2486 1210 : .unwrap_or(self.conf.default_tenant_conf.compaction_period)
2487 1210 : }
2488 :
2489 5 : pub fn get_compaction_threshold(&self) -> usize {
2490 5 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2491 5 : tenant_conf
2492 5 : .compaction_threshold
2493 5 : .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
2494 5 : }
2495 :
2496 522 : pub fn get_gc_horizon(&self) -> u64 {
2497 522 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2498 522 : tenant_conf
2499 522 : .gc_horizon
2500 522 : .unwrap_or(self.conf.default_tenant_conf.gc_horizon)
2501 522 : }
2502 :
2503 1076 : pub fn get_gc_period(&self) -> Duration {
2504 1076 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2505 1076 : tenant_conf
2506 1076 : .gc_period
2507 1076 : .unwrap_or(self.conf.default_tenant_conf.gc_period)
2508 1076 : }
2509 :
2510 5 : pub fn get_image_creation_threshold(&self) -> usize {
2511 5 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2512 5 : tenant_conf
2513 5 : .image_creation_threshold
2514 5 : .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
2515 5 : }
2516 :
2517 383 : pub fn get_pitr_interval(&self) -> Duration {
2518 383 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2519 383 : tenant_conf
2520 383 : .pitr_interval
2521 383 : .unwrap_or(self.conf.default_tenant_conf.pitr_interval)
2522 383 : }
2523 :
2524 10025 : pub fn get_trace_read_requests(&self) -> bool {
2525 10025 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2526 10025 : tenant_conf
2527 10025 : .trace_read_requests
2528 10025 : .unwrap_or(self.conf.default_tenant_conf.trace_read_requests)
2529 10025 : }
2530 :
2531 35 : pub fn get_min_resident_size_override(&self) -> Option<u64> {
2532 35 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2533 35 : tenant_conf
2534 35 : .min_resident_size_override
2535 35 : .or(self.conf.default_tenant_conf.min_resident_size_override)
2536 35 : }
2537 :
2538 940 : pub fn get_heatmap_period(&self) -> Option<Duration> {
2539 940 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
2540 940 : let heatmap_period = tenant_conf
2541 940 : .heatmap_period
2542 940 : .unwrap_or(self.conf.default_tenant_conf.heatmap_period);
2543 940 : if heatmap_period.is_zero() {
2544 940 : None
2545 : } else {
2546 0 : Some(heatmap_period)
2547 : }
2548 940 : }
2549 :
2550 30 : pub fn set_new_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
2551 30 : self.tenant_conf.write().unwrap().tenant_conf = new_tenant_conf;
2552 30 : // Don't hold self.timelines.lock() during the notifies.
2553 30 : // There's no risk of deadlock right now, but there could be if we consolidate
2554 30 : // mutexes in struct Timeline in the future.
2555 30 : let timelines = self.list_timelines();
2556 62 : for timeline in timelines {
2557 32 : timeline.tenant_conf_updated();
2558 32 : }
2559 30 : }
2560 :
2561 28 : pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) {
2562 28 : *self.tenant_conf.write().unwrap() = new_conf;
2563 28 : // Don't hold self.timelines.lock() during the notifies.
2564 28 : // There's no risk of deadlock right now, but there could be if we consolidate
2565 28 : // mutexes in struct Timeline in the future.
2566 28 : let timelines = self.list_timelines();
2567 49 : for timeline in timelines {
2568 21 : timeline.tenant_conf_updated();
2569 21 : }
2570 28 : }
2571 :
2572 : /// Helper function to create a new Timeline struct.
2573 : ///
2574 : /// The returned Timeline is in Loading state. The caller is responsible for
2575 : /// initializing any on-disk state, and for inserting the Timeline to the 'timelines'
2576 : /// map.
2577 : ///
2578 : /// `validate_ancestor == false` is used when a timeline is created for deletion
2579 : /// and we might not have the ancestor present anymore which is fine for to be
2580 : /// deleted timelines.
2581 1568 : fn create_timeline_struct(
2582 1568 : &self,
2583 1568 : new_timeline_id: TimelineId,
2584 1568 : new_metadata: &TimelineMetadata,
2585 1568 : ancestor: Option<Arc<Timeline>>,
2586 1568 : resources: TimelineResources,
2587 1568 : cause: CreateTimelineCause,
2588 1568 : ) -> anyhow::Result<Arc<Timeline>> {
2589 1568 : let state = match cause {
2590 : CreateTimelineCause::Load => {
2591 1556 : let ancestor_id = new_metadata.ancestor_timeline();
2592 1556 : anyhow::ensure!(
2593 1556 : ancestor_id == ancestor.as_ref().map(|t| t.timeline_id),
2594 0 : "Timeline's {new_timeline_id} ancestor {ancestor_id:?} was not found"
2595 : );
2596 1556 : TimelineState::Loading
2597 : }
2598 12 : CreateTimelineCause::Delete => TimelineState::Stopping,
2599 : };
2600 :
2601 1568 : let pg_version = new_metadata.pg_version();
2602 1568 :
2603 1568 : let timeline = Timeline::new(
2604 1568 : self.conf,
2605 1568 : Arc::clone(&self.tenant_conf),
2606 1568 : new_metadata,
2607 1568 : ancestor,
2608 1568 : new_timeline_id,
2609 1568 : self.tenant_shard_id,
2610 1568 : self.generation,
2611 1568 : self.shard_identity,
2612 1568 : self.walredo_mgr.as_ref().map(Arc::clone),
2613 1568 : resources,
2614 1568 : pg_version,
2615 1568 : state,
2616 1568 : self.cancel.child_token(),
2617 1568 : );
2618 1568 :
2619 1568 : Ok(timeline)
2620 1568 : }
2621 :
2622 : // Allow too_many_arguments because a constructor's argument list naturally grows with the
2623 : // number of attributes in the struct: breaking these out into a builder wouldn't be helpful.
2624 : #[allow(clippy::too_many_arguments)]
2625 942 : fn new(
2626 942 : state: TenantState,
2627 942 : conf: &'static PageServerConf,
2628 942 : attached_conf: AttachedTenantConf,
2629 942 : shard_identity: ShardIdentity,
2630 942 : walredo_mgr: Option<Arc<WalRedoManager>>,
2631 942 : tenant_shard_id: TenantShardId,
2632 942 : remote_storage: Option<GenericRemoteStorage>,
2633 942 : deletion_queue_client: DeletionQueueClient,
2634 942 : ) -> Tenant {
2635 942 : let (state, mut rx) = watch::channel(state);
2636 942 :
2637 942 : tokio::spawn(async move {
2638 942 : // reflect tenant state in metrics:
2639 942 : // - global per tenant state: TENANT_STATE_METRIC
2640 942 : // - "set" of broken tenants: BROKEN_TENANTS_SET
2641 942 : //
2642 942 : // set of broken tenants should not have zero counts so that it remains accessible for
2643 942 : // alerting.
2644 942 :
2645 942 : let tid = tenant_shard_id.to_string();
2646 942 : let shard_id = tenant_shard_id.shard_slug().to_string();
2647 942 : let set_key = &[tid.as_str(), shard_id.as_str()][..];
2648 942 :
2649 2583 : fn inspect_state(state: &TenantState) -> ([&'static str; 1], bool) {
2650 2583 : ([state.into()], matches!(state, TenantState::Broken { .. }))
2651 2583 : }
2652 942 :
2653 942 : let mut tuple = inspect_state(&rx.borrow_and_update());
2654 942 :
2655 942 : let is_broken = tuple.1;
2656 942 : let mut counted_broken = if is_broken {
2657 : // add the id to the set right away, there should not be any updates on the channel
2658 : // after before tenant is removed, if ever
2659 0 : BROKEN_TENANTS_SET.with_label_values(set_key).set(1);
2660 0 : true
2661 : } else {
2662 942 : false
2663 : };
2664 :
2665 2583 : loop {
2666 2583 : let labels = &tuple.0;
2667 2583 : let current = TENANT_STATE_METRIC.with_label_values(labels);
2668 2583 : current.inc();
2669 2583 :
2670 2583 : if rx.changed().await.is_err() {
2671 : // tenant has been dropped
2672 238 : current.dec();
2673 238 : drop(BROKEN_TENANTS_SET.remove_label_values(set_key));
2674 238 : break;
2675 1641 : }
2676 1641 :
2677 1641 : current.dec();
2678 1641 : tuple = inspect_state(&rx.borrow_and_update());
2679 1641 :
2680 1641 : let is_broken = tuple.1;
2681 1641 : if is_broken && !counted_broken {
2682 58 : counted_broken = true;
2683 58 : // insert the tenant_id (back) into the set while avoiding needless counter
2684 58 : // access
2685 58 : BROKEN_TENANTS_SET.with_label_values(set_key).set(1);
2686 1583 : }
2687 : }
2688 942 : });
2689 942 :
2690 942 : Tenant {
2691 942 : tenant_shard_id,
2692 942 : shard_identity,
2693 942 : generation: attached_conf.location.generation,
2694 942 : conf,
2695 942 : // using now here is good enough approximation to catch tenants with really long
2696 942 : // activation times.
2697 942 : constructed_at: Instant::now(),
2698 942 : tenant_conf: Arc::new(RwLock::new(attached_conf)),
2699 942 : timelines: Mutex::new(HashMap::new()),
2700 942 : timelines_creating: Mutex::new(HashSet::new()),
2701 942 : gc_cs: tokio::sync::Mutex::new(()),
2702 942 : walredo_mgr,
2703 942 : remote_storage,
2704 942 : deletion_queue_client,
2705 942 : state,
2706 942 : cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
2707 942 : cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
2708 942 : eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
2709 942 : activate_now_sem: tokio::sync::Semaphore::new(0),
2710 942 : delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
2711 942 : cancel: CancellationToken::default(),
2712 942 : gate: Gate::default(),
2713 942 : }
2714 942 : }
2715 :
2716 : /// Locate and load config
2717 231 : pub(super) fn load_tenant_config(
2718 231 : conf: &'static PageServerConf,
2719 231 : tenant_shard_id: &TenantShardId,
2720 231 : ) -> anyhow::Result<LocationConf> {
2721 231 : let legacy_config_path = conf.tenant_config_path(tenant_shard_id);
2722 231 : let config_path = conf.tenant_location_config_path(tenant_shard_id);
2723 231 :
2724 231 : if config_path.exists() {
2725 : // New-style config takes precedence
2726 227 : let deserialized = Self::read_config(&config_path)?;
2727 227 : Ok(toml_edit::de::from_document::<LocationConf>(deserialized)?)
2728 4 : } else if legacy_config_path.exists() {
2729 : // Upgrade path: found an old-style configuration only
2730 0 : let deserialized = Self::read_config(&legacy_config_path)?;
2731 :
2732 0 : let mut tenant_conf = TenantConfOpt::default();
2733 0 : for (key, item) in deserialized.iter() {
2734 0 : match key {
2735 0 : "tenant_config" => {
2736 0 : tenant_conf = TenantConfOpt::try_from(item.to_owned()).context(format!("Failed to parse config from file '{legacy_config_path}' as pageserver config"))?;
2737 : }
2738 0 : _ => bail!(
2739 0 : "config file {legacy_config_path} has unrecognized pageserver option '{key}'"
2740 0 : ),
2741 : }
2742 : }
2743 :
2744 : // Legacy configs are implicitly in attached state, and do not support sharding
2745 0 : Ok(LocationConf::attached_single(
2746 0 : tenant_conf,
2747 0 : Generation::none(),
2748 0 : &models::ShardParameters::default(),
2749 0 : ))
2750 : } else {
2751 : // FIXME If the config file is not found, assume that we're attaching
2752 : // a detached tenant and config is passed via attach command.
2753 : // https://github.com/neondatabase/neon/issues/1555
2754 : // OR: we're loading after incomplete deletion that managed to remove config.
2755 4 : info!(
2756 4 : "tenant config not found in {} or {}",
2757 4 : config_path, legacy_config_path
2758 4 : );
2759 4 : Ok(LocationConf::default())
2760 : }
2761 231 : }
2762 :
2763 227 : fn read_config(path: &Utf8Path) -> anyhow::Result<toml_edit::Document> {
2764 227 : info!("loading tenant configuration from {path}");
2765 :
2766 : // load and parse file
2767 227 : let config = fs::read_to_string(path)
2768 227 : .with_context(|| format!("Failed to load config from path '{path}'"))?;
2769 :
2770 227 : config
2771 227 : .parse::<toml_edit::Document>()
2772 227 : .with_context(|| format!("Failed to parse config from file '{path}' as toml file"))
2773 227 : }
2774 :
2775 946 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
2776 : pub(super) async fn persist_tenant_config(
2777 : conf: &'static PageServerConf,
2778 : tenant_shard_id: &TenantShardId,
2779 : location_conf: &LocationConf,
2780 : ) -> anyhow::Result<()> {
2781 : let legacy_config_path = conf.tenant_config_path(tenant_shard_id);
2782 : let config_path = conf.tenant_location_config_path(tenant_shard_id);
2783 :
2784 : Self::persist_tenant_config_at(
2785 : tenant_shard_id,
2786 : &config_path,
2787 : &legacy_config_path,
2788 : location_conf,
2789 : )
2790 : .await
2791 : }
2792 :
2793 0 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
2794 : pub(super) async fn persist_tenant_config_at(
2795 : tenant_shard_id: &TenantShardId,
2796 : config_path: &Utf8Path,
2797 : legacy_config_path: &Utf8Path,
2798 : location_conf: &LocationConf,
2799 : ) -> anyhow::Result<()> {
2800 : // Forward compat: write out an old-style configuration that old versions can read, in case we roll back
2801 : Self::persist_tenant_config_legacy(
2802 : tenant_shard_id,
2803 : legacy_config_path,
2804 : &location_conf.tenant_conf,
2805 : )
2806 : .await?;
2807 :
2808 : if let LocationMode::Attached(attach_conf) = &location_conf.mode {
2809 : // Once we use LocationMode, generations are mandatory. If we aren't using generations,
2810 : // then drop out after writing legacy-style config.
2811 : if attach_conf.generation.is_none() {
2812 0 : tracing::debug!("Running without generations, not writing new-style LocationConf");
2813 : return Ok(());
2814 : }
2815 : }
2816 :
2817 0 : debug!("persisting tenantconf to {config_path}");
2818 :
2819 : let mut conf_content = r#"# This file contains a specific per-tenant's config.
2820 : # It is read in case of pageserver restart.
2821 : "#
2822 : .to_string();
2823 :
2824 1 : fail::fail_point!("tenant-config-before-write", |_| {
2825 1 : anyhow::bail!("tenant-config-before-write");
2826 1 : });
2827 :
2828 : // Convert the config to a toml file.
2829 : conf_content += &toml_edit::ser::to_string_pretty(&location_conf)?;
2830 :
2831 : let temp_path = path_with_suffix_extension(config_path, TEMP_FILE_SUFFIX);
2832 :
2833 : let tenant_shard_id = *tenant_shard_id;
2834 : let config_path = config_path.to_owned();
2835 945 : tokio::task::spawn_blocking(move || {
2836 945 : Handle::current().block_on(async move {
2837 945 : let conf_content = conf_content.as_bytes();
2838 945 : VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
2839 0 : .await
2840 945 : .with_context(|| {
2841 0 : format!("write tenant {tenant_shard_id} config to {config_path}")
2842 945 : })
2843 945 : })
2844 945 : })
2845 : .await??;
2846 :
2847 : Ok(())
2848 : }
2849 :
2850 946 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
2851 : async fn persist_tenant_config_legacy(
2852 : tenant_shard_id: &TenantShardId,
2853 : target_config_path: &Utf8Path,
2854 : tenant_conf: &TenantConfOpt,
2855 : ) -> anyhow::Result<()> {
2856 0 : debug!("persisting tenantconf to {target_config_path}");
2857 :
2858 : let mut conf_content = r#"# This file contains a specific per-tenant's config.
2859 : # It is read in case of pageserver restart.
2860 :
2861 : [tenant_config]
2862 : "#
2863 : .to_string();
2864 :
2865 : // Convert the config to a toml file.
2866 : conf_content += &toml_edit::ser::to_string(&tenant_conf)?;
2867 :
2868 : let temp_path = path_with_suffix_extension(target_config_path, TEMP_FILE_SUFFIX);
2869 :
2870 : let tenant_shard_id = *tenant_shard_id;
2871 : let target_config_path = target_config_path.to_owned();
2872 946 : tokio::task::spawn_blocking(move || {
2873 946 : Handle::current().block_on(async move {
2874 946 : let conf_content = conf_content.as_bytes();
2875 946 : VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
2876 0 : .await
2877 946 : .with_context(|| {
2878 0 : format!("write tenant {tenant_shard_id} config to {target_config_path}")
2879 946 : })
2880 946 : })
2881 946 : })
2882 : .await??;
2883 : Ok(())
2884 : }
2885 :
2886 : //
2887 : // How garbage collection works:
2888 : //
2889 : // +--bar------------->
2890 : // /
2891 : // +----+-----foo---------------->
2892 : // /
2893 : // ----main--+-------------------------->
2894 : // \
2895 : // +-----baz-------->
2896 : //
2897 : //
2898 : // 1. Grab 'gc_cs' mutex to prevent new timelines from being created while Timeline's
2899 : // `gc_infos` are being refreshed
2900 : // 2. Scan collected timelines, and on each timeline, make note of the
2901 : // all the points where other timelines have been branched off.
2902 : // We will refrain from removing page versions at those LSNs.
2903 : // 3. For each timeline, scan all layer files on the timeline.
2904 : // Remove all files for which a newer file exists and which
2905 : // don't cover any branch point LSNs.
2906 : //
2907 : // TODO:
2908 : // - if a relation has a non-incremental persistent layer on a child branch, then we
2909 : // don't need to keep that in the parent anymore. But currently
2910 : // we do.
2911 302 : async fn gc_iteration_internal(
2912 302 : &self,
2913 302 : target_timeline_id: Option<TimelineId>,
2914 302 : horizon: u64,
2915 302 : pitr: Duration,
2916 302 : cancel: &CancellationToken,
2917 302 : ctx: &RequestContext,
2918 302 : ) -> anyhow::Result<GcResult> {
2919 302 : let mut totals: GcResult = Default::default();
2920 302 : let now = Instant::now();
2921 :
2922 302 : let gc_timelines = match self
2923 302 : .refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx)
2924 264392 : .await
2925 : {
2926 300 : Ok(result) => result,
2927 1 : Err(e) => {
2928 0 : if let Some(PageReconstructError::Cancelled) =
2929 1 : e.downcast_ref::<PageReconstructError>()
2930 : {
2931 : // Handle cancellation
2932 0 : totals.elapsed = now.elapsed();
2933 0 : return Ok(totals);
2934 : } else {
2935 : // Propagate other errors
2936 1 : return Err(e);
2937 : }
2938 : }
2939 : };
2940 :
2941 3 : failpoint_support::sleep_millis_async!("gc_iteration_internal_after_getting_gc_timelines");
2942 :
2943 : // If there is nothing to GC, we don't want any messages in the INFO log.
2944 300 : if !gc_timelines.is_empty() {
2945 296 : info!("{} timelines need GC", gc_timelines.len());
2946 : } else {
2947 0 : debug!("{} timelines need GC", gc_timelines.len());
2948 : }
2949 :
2950 : // Perform GC for each timeline.
2951 : //
2952 : // Note that we don't hold the `Tenant::gc_cs` lock here because we don't want to delay the
2953 : // branch creation task, which requires the GC lock. A GC iteration can run concurrently
2954 : // with branch creation.
2955 : //
2956 : // See comments in [`Tenant::branch_timeline`] for more information about why branch
2957 : // creation task can run concurrently with timeline's GC iteration.
2958 720 : for timeline in gc_timelines {
2959 420 : if task_mgr::is_shutdown_requested() || cancel.is_cancelled() {
2960 : // We were requested to shut down. Stop and return with the progress we
2961 : // made.
2962 0 : break;
2963 420 : }
2964 420 : let result = timeline.gc().await?;
2965 420 : totals += result;
2966 : }
2967 :
2968 300 : totals.elapsed = now.elapsed();
2969 300 : Ok(totals)
2970 301 : }
2971 :
2972 : /// Refreshes the Timeline::gc_info for all timelines, returning the
2973 : /// vector of timelines which have [`Timeline::get_last_record_lsn`] past
2974 : /// [`Tenant::get_gc_horizon`].
2975 : ///
2976 : /// This is usually executed as part of periodic gc, but can now be triggered more often.
2977 82 : pub async fn refresh_gc_info(
2978 82 : &self,
2979 82 : cancel: &CancellationToken,
2980 82 : ctx: &RequestContext,
2981 82 : ) -> anyhow::Result<Vec<Arc<Timeline>>> {
2982 82 : // since this method can now be called at different rates than the configured gc loop, it
2983 82 : // might be that these configuration values get applied faster than what it was previously,
2984 82 : // since these were only read from the gc task.
2985 82 : let horizon = self.get_gc_horizon();
2986 82 : let pitr = self.get_pitr_interval();
2987 82 :
2988 82 : // refresh all timelines
2989 82 : let target_timeline_id = None;
2990 82 :
2991 82 : self.refresh_gc_info_internal(target_timeline_id, horizon, pitr, cancel, ctx)
2992 24 : .await
2993 82 : }
2994 :
2995 384 : async fn refresh_gc_info_internal(
2996 384 : &self,
2997 384 : target_timeline_id: Option<TimelineId>,
2998 384 : horizon: u64,
2999 384 : pitr: Duration,
3000 384 : cancel: &CancellationToken,
3001 384 : ctx: &RequestContext,
3002 384 : ) -> anyhow::Result<Vec<Arc<Timeline>>> {
3003 : // grab mutex to prevent new timelines from being created here.
3004 384 : let gc_cs = self.gc_cs.lock().await;
3005 :
3006 : // Scan all timelines. For each timeline, remember the timeline ID and
3007 : // the branch point where it was created.
3008 383 : let (all_branchpoints, timeline_ids): (BTreeSet<(TimelineId, Lsn)>, _) = {
3009 384 : let timelines = self.timelines.lock().unwrap();
3010 384 : let mut all_branchpoints = BTreeSet::new();
3011 383 : let timeline_ids = {
3012 384 : if let Some(target_timeline_id) = target_timeline_id.as_ref() {
3013 258 : if timelines.get(target_timeline_id).is_none() {
3014 1 : bail!("gc target timeline does not exist")
3015 257 : }
3016 126 : };
3017 :
3018 383 : timelines
3019 383 : .iter()
3020 791 : .map(|(timeline_id, timeline_entry)| {
3021 413 : if let Some(ancestor_timeline_id) =
3022 791 : &timeline_entry.get_ancestor_timeline_id()
3023 : {
3024 : // If target_timeline is specified, we only need to know branchpoints of its children
3025 413 : if let Some(timeline_id) = target_timeline_id {
3026 236 : if ancestor_timeline_id == &timeline_id {
3027 9 : all_branchpoints.insert((
3028 9 : *ancestor_timeline_id,
3029 9 : timeline_entry.get_ancestor_lsn(),
3030 9 : ));
3031 227 : }
3032 : }
3033 : // Collect branchpoints for all timelines
3034 177 : else {
3035 177 : all_branchpoints.insert((
3036 177 : *ancestor_timeline_id,
3037 177 : timeline_entry.get_ancestor_lsn(),
3038 177 : ));
3039 177 : }
3040 378 : }
3041 :
3042 791 : *timeline_id
3043 791 : })
3044 383 : .collect::<Vec<_>>()
3045 383 : };
3046 383 : (all_branchpoints, timeline_ids)
3047 383 : };
3048 383 :
3049 383 : // Ok, we now know all the branch points.
3050 383 : // Update the GC information for each timeline.
3051 383 : let mut gc_timelines = Vec::with_capacity(timeline_ids.len());
3052 1166 : for timeline_id in timeline_ids {
3053 : // Timeline is known to be local and loaded.
3054 784 : let timeline = self
3055 784 : .get_timeline(timeline_id, false)
3056 784 : .with_context(|| format!("Timeline {timeline_id} was not found"))?;
3057 :
3058 : // If target_timeline is specified, ignore all other timelines
3059 784 : if let Some(target_timeline_id) = target_timeline_id {
3060 493 : if timeline_id != target_timeline_id {
3061 236 : continue;
3062 257 : }
3063 291 : }
3064 :
3065 548 : if let Some(cutoff) = timeline.get_last_record_lsn().checked_sub(horizon) {
3066 481 : let branchpoints: Vec<Lsn> = all_branchpoints
3067 481 : .range((
3068 481 : Included((timeline_id, Lsn(0))),
3069 481 : Included((timeline_id, Lsn(u64::MAX))),
3070 481 : ))
3071 481 : .map(|&x| x.1)
3072 481 : .collect();
3073 481 : timeline
3074 481 : .update_gc_info(branchpoints, cutoff, pitr, cancel, ctx)
3075 264416 : .await?;
3076 :
3077 480 : gc_timelines.push(timeline);
3078 67 : }
3079 : }
3080 382 : drop(gc_cs);
3081 382 : Ok(gc_timelines)
3082 383 : }
3083 :
3084 : /// A substitute for `branch_timeline` for use in unit tests.
3085 : /// The returned timeline will have state value `Active` to make various `anyhow::ensure!()`
3086 : /// calls pass, but, we do not actually call `.activate()` under the hood. So, none of the
3087 : /// timeline background tasks are launched, except the flush loop.
3088 : #[cfg(test)]
3089 214 : async fn branch_timeline_test(
3090 214 : &self,
3091 214 : src_timeline: &Arc<Timeline>,
3092 214 : dst_id: TimelineId,
3093 214 : start_lsn: Option<Lsn>,
3094 214 : ctx: &RequestContext,
3095 214 : ) -> Result<Arc<Timeline>, CreateTimelineError> {
3096 214 : let uninit_mark = self.create_timeline_uninit_mark(dst_id).unwrap();
3097 214 : let tl = self
3098 214 : .branch_timeline_impl(src_timeline, dst_id, start_lsn, uninit_mark, ctx)
3099 217 : .await?;
3100 210 : tl.set_state(TimelineState::Active);
3101 210 : Ok(tl)
3102 214 : }
3103 :
3104 : /// Branch an existing timeline.
3105 : ///
3106 : /// The caller is responsible for activating the returned timeline.
3107 262 : async fn branch_timeline(
3108 262 : &self,
3109 262 : src_timeline: &Arc<Timeline>,
3110 262 : dst_id: TimelineId,
3111 262 : start_lsn: Option<Lsn>,
3112 262 : timeline_uninit_mark: TimelineUninitMark<'_>,
3113 262 : ctx: &RequestContext,
3114 262 : ) -> Result<Arc<Timeline>, CreateTimelineError> {
3115 262 : self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_uninit_mark, ctx)
3116 6 : .await
3117 262 : }
3118 :
3119 476 : async fn branch_timeline_impl(
3120 476 : &self,
3121 476 : src_timeline: &Arc<Timeline>,
3122 476 : dst_id: TimelineId,
3123 476 : start_lsn: Option<Lsn>,
3124 476 : timeline_uninit_mark: TimelineUninitMark<'_>,
3125 476 : _ctx: &RequestContext,
3126 476 : ) -> Result<Arc<Timeline>, CreateTimelineError> {
3127 476 : let src_id = src_timeline.timeline_id;
3128 :
3129 : // We will validate our ancestor LSN in this function. Acquire the GC lock so that
3130 : // this check cannot race with GC, and the ancestor LSN is guaranteed to remain
3131 : // valid while we are creating the branch.
3132 476 : let _gc_cs = self.gc_cs.lock().await;
3133 :
3134 : // If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
3135 476 : let start_lsn = start_lsn.unwrap_or_else(|| {
3136 231 : let lsn = src_timeline.get_last_record_lsn();
3137 231 : info!("branching timeline {dst_id} from timeline {src_id} at last record LSN: {lsn}");
3138 231 : lsn
3139 476 : });
3140 476 :
3141 476 : // Ensure that `start_lsn` is valid, i.e. the LSN is within the PITR
3142 476 : // horizon on the source timeline
3143 476 : //
3144 476 : // We check it against both the planned GC cutoff stored in 'gc_info',
3145 476 : // and the 'latest_gc_cutoff' of the last GC that was performed. The
3146 476 : // planned GC cutoff in 'gc_info' is normally larger than
3147 476 : // 'latest_gc_cutoff_lsn', but beware of corner cases like if you just
3148 476 : // changed the GC settings for the tenant to make the PITR window
3149 476 : // larger, but some of the data was already removed by an earlier GC
3150 476 : // iteration.
3151 476 :
3152 476 : // check against last actual 'latest_gc_cutoff' first
3153 476 : let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
3154 476 : src_timeline
3155 476 : .check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
3156 476 : .context(format!(
3157 476 : "invalid branch start lsn: less than latest GC cutoff {}",
3158 476 : *latest_gc_cutoff_lsn,
3159 476 : ))
3160 476 : .map_err(CreateTimelineError::AncestorLsn)?;
3161 :
3162 : // and then the planned GC cutoff
3163 : {
3164 466 : let gc_info = src_timeline.gc_info.read().unwrap();
3165 466 : let cutoff = min(gc_info.pitr_cutoff, gc_info.horizon_cutoff);
3166 466 : if start_lsn < cutoff {
3167 0 : return Err(CreateTimelineError::AncestorLsn(anyhow::anyhow!(
3168 0 : "invalid branch start lsn: less than planned GC cutoff {cutoff}"
3169 0 : )));
3170 466 : }
3171 466 : }
3172 466 :
3173 466 : //
3174 466 : // The branch point is valid, and we are still holding the 'gc_cs' lock
3175 466 : // so that GC cannot advance the GC cutoff until we are finished.
3176 466 : // Proceed with the branch creation.
3177 466 : //
3178 466 :
3179 466 : // Determine prev-LSN for the new timeline. We can only determine it if
3180 466 : // the timeline was branched at the current end of the source timeline.
3181 466 : let RecordLsn {
3182 466 : last: src_last,
3183 466 : prev: src_prev,
3184 466 : } = src_timeline.get_last_record_rlsn();
3185 466 : let dst_prev = if src_last == start_lsn {
3186 446 : Some(src_prev)
3187 : } else {
3188 20 : None
3189 : };
3190 :
3191 : // Create the metadata file, noting the ancestor of the new timeline.
3192 : // There is initially no data in it, but all the read-calls know to look
3193 : // into the ancestor.
3194 466 : let metadata = TimelineMetadata::new(
3195 466 : start_lsn,
3196 466 : dst_prev,
3197 466 : Some(src_id),
3198 466 : start_lsn,
3199 466 : *src_timeline.latest_gc_cutoff_lsn.read(), // FIXME: should we hold onto this guard longer?
3200 466 : src_timeline.initdb_lsn,
3201 466 : src_timeline.pg_version,
3202 466 : );
3203 :
3204 466 : let uninitialized_timeline = self
3205 466 : .prepare_new_timeline(
3206 466 : dst_id,
3207 466 : &metadata,
3208 466 : timeline_uninit_mark,
3209 466 : start_lsn + 1,
3210 466 : Some(Arc::clone(src_timeline)),
3211 466 : )
3212 217 : .await?;
3213 :
3214 466 : let new_timeline = uninitialized_timeline.finish_creation()?;
3215 :
3216 : // Root timeline gets its layers during creation and uploads them along with the metadata.
3217 : // A branch timeline though, when created, can get no writes for some time, hence won't get any layers created.
3218 : // We still need to upload its metadata eagerly: if other nodes `attach` the tenant and miss this timeline, their GC
3219 : // could get incorrect information and remove more layers, than needed.
3220 : // See also https://github.com/neondatabase/neon/issues/3865
3221 466 : if let Some(remote_client) = new_timeline.remote_client.as_ref() {
3222 466 : remote_client
3223 466 : .schedule_index_upload_for_metadata_update(&metadata)
3224 466 : .context("branch initial metadata upload")?;
3225 0 : }
3226 :
3227 466 : Ok(new_timeline)
3228 476 : }
3229 :
3230 : /// For unit tests, make this visible so that other modules can directly create timelines
3231 : #[cfg(test)]
3232 2 : #[tracing::instrument(fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), %timeline_id))]
3233 : pub(crate) async fn bootstrap_timeline_test(
3234 : &self,
3235 : timeline_id: TimelineId,
3236 : pg_version: u32,
3237 : load_existing_initdb: Option<TimelineId>,
3238 : ctx: &RequestContext,
3239 : ) -> anyhow::Result<Arc<Timeline>> {
3240 : let uninit_mark = self.create_timeline_uninit_mark(timeline_id).unwrap();
3241 : self.bootstrap_timeline(
3242 : timeline_id,
3243 : pg_version,
3244 : load_existing_initdb,
3245 : uninit_mark,
3246 : ctx,
3247 : )
3248 : .await
3249 : }
3250 :
3251 566 : async fn upload_initdb(
3252 566 : &self,
3253 566 : timelines_path: &Utf8PathBuf,
3254 566 : pgdata_path: &Utf8PathBuf,
3255 566 : timeline_id: &TimelineId,
3256 566 : ) -> anyhow::Result<()> {
3257 566 : let Some(storage) = &self.remote_storage else {
3258 : // No remote storage? No upload.
3259 0 : return Ok(());
3260 : };
3261 :
3262 566 : let temp_path = timelines_path.join(format!(
3263 566 : "{INITDB_PATH}.upload-{timeline_id}.{TEMP_FILE_SUFFIX}"
3264 566 : ));
3265 :
3266 565 : scopeguard::defer! {
3267 565 : if let Err(e) = fs::remove_file(&temp_path) {
3268 0 : error!("Failed to remove temporary initdb archive '{temp_path}': {e}");
3269 565 : }
3270 : }
3271 :
3272 565 : let (pgdata_zstd, tar_zst_size) =
3273 4640603 : import_datadir::create_tar_zst(pgdata_path, &temp_path).await?;
3274 :
3275 565 : pausable_failpoint!("before-initdb-upload");
3276 :
3277 565 : backoff::retry(
3278 632 : || async {
3279 632 : self::remote_timeline_client::upload_initdb_dir(
3280 632 : storage,
3281 632 : &self.tenant_shard_id.tenant_id,
3282 632 : timeline_id,
3283 632 : pgdata_zstd.try_clone().await?,
3284 632 : tar_zst_size,
3285 632 : &self.cancel,
3286 : )
3287 27428 : .await
3288 632 : },
3289 565 : |_| false,
3290 565 : 3,
3291 565 : u32::MAX,
3292 565 : "persist_initdb_tar_zst",
3293 565 : &self.cancel,
3294 565 : )
3295 28056 : .await
3296 565 : .ok_or_else(|| anyhow::anyhow!("Cancelled"))
3297 565 : .and_then(|x| x)
3298 565 : }
3299 :
3300 : /// - run initdb to init temporary instance and get bootstrap data
3301 : /// - after initialization completes, tar up the temp dir and upload it to S3.
3302 : ///
3303 : /// The caller is responsible for activating the returned timeline.
3304 597 : async fn bootstrap_timeline(
3305 597 : &self,
3306 597 : timeline_id: TimelineId,
3307 597 : pg_version: u32,
3308 597 : load_existing_initdb: Option<TimelineId>,
3309 597 : timeline_uninit_mark: TimelineUninitMark<'_>,
3310 597 : ctx: &RequestContext,
3311 597 : ) -> anyhow::Result<Arc<Timeline>> {
3312 597 : // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
3313 597 : // temporary directory for basebackup files for the given timeline.
3314 597 :
3315 597 : let timelines_path = self.conf.timelines_path(&self.tenant_shard_id);
3316 597 : let pgdata_path = path_with_suffix_extension(
3317 597 : timelines_path.join(format!("basebackup-{timeline_id}")),
3318 597 : TEMP_FILE_SUFFIX,
3319 597 : );
3320 597 :
3321 597 : // an uninit mark was placed before, nothing else can access this timeline files
3322 597 : // current initdb was not run yet, so remove whatever was left from the previous runs
3323 597 : if pgdata_path.exists() {
3324 0 : fs::remove_dir_all(&pgdata_path).with_context(|| {
3325 0 : format!("Failed to remove already existing initdb directory: {pgdata_path}")
3326 0 : })?;
3327 597 : }
3328 : // this new directory is very temporary, set to remove it immediately after bootstrap, we don't need it
3329 595 : scopeguard::defer! {
3330 595 : if let Err(e) = fs::remove_dir_all(&pgdata_path) {
3331 : // this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call
3332 0 : error!("Failed to remove temporary initdb directory '{pgdata_path}': {e}");
3333 595 : }
3334 : }
3335 597 : if let Some(existing_initdb_timeline_id) = load_existing_initdb {
3336 4 : let Some(storage) = &self.remote_storage else {
3337 0 : bail!("no storage configured but load_existing_initdb set to {existing_initdb_timeline_id}");
3338 : };
3339 4 : if existing_initdb_timeline_id != timeline_id {
3340 0 : let source_path = &remote_initdb_archive_path(
3341 0 : &self.tenant_shard_id.tenant_id,
3342 0 : &existing_initdb_timeline_id,
3343 0 : );
3344 0 : let dest_path =
3345 0 : &remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &timeline_id);
3346 0 : storage
3347 0 : .copy_object(source_path, dest_path)
3348 0 : .await
3349 0 : .context("copy initdb tar")?;
3350 4 : }
3351 4 : let (initdb_tar_zst_path, initdb_tar_zst) =
3352 4 : self::remote_timeline_client::download_initdb_tar_zst(
3353 4 : self.conf,
3354 4 : storage,
3355 4 : &self.tenant_shard_id,
3356 4 : &existing_initdb_timeline_id,
3357 4 : &self.cancel,
3358 4 : )
3359 1471 : .await
3360 4 : .context("download initdb tar")?;
3361 :
3362 4 : scopeguard::defer! {
3363 4 : if let Err(e) = fs::remove_file(&initdb_tar_zst_path) {
3364 0 : error!("Failed to remove temporary initdb archive '{initdb_tar_zst_path}': {e}");
3365 4 : }
3366 : }
3367 :
3368 4 : let buf_read =
3369 4 : BufReader::with_capacity(remote_timeline_client::BUFFER_SIZE, initdb_tar_zst);
3370 4 : import_datadir::extract_tar_zst(&pgdata_path, buf_read)
3371 21517 : .await
3372 4 : .context("extract initdb tar")?;
3373 : } else {
3374 : // Init temporarily repo to get bootstrap data, this creates a directory in the `pgdata_path` path
3375 1161 : run_initdb(self.conf, &pgdata_path, pg_version, &self.cancel).await?;
3376 :
3377 : // Upload the created data dir to S3
3378 591 : if self.tenant_shard_id().is_zero() {
3379 566 : self.upload_initdb(&timelines_path, &pgdata_path, &timeline_id)
3380 4669218 : .await?;
3381 25 : }
3382 : }
3383 594 : let pgdata_lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
3384 594 :
3385 594 : // Import the contents of the data directory at the initial checkpoint
3386 594 : // LSN, and any WAL after that.
3387 594 : // Initdb lsn will be equal to last_record_lsn which will be set after import.
3388 594 : // Because we know it upfront avoid having an option or dummy zero value by passing it to the metadata.
3389 594 : let new_metadata = TimelineMetadata::new(
3390 594 : Lsn(0),
3391 594 : None,
3392 594 : None,
3393 594 : Lsn(0),
3394 594 : pgdata_lsn,
3395 594 : pgdata_lsn,
3396 594 : pg_version,
3397 594 : );
3398 594 : let raw_timeline = self
3399 594 : .prepare_new_timeline(
3400 594 : timeline_id,
3401 594 : &new_metadata,
3402 594 : timeline_uninit_mark,
3403 594 : pgdata_lsn,
3404 594 : None,
3405 594 : )
3406 3 : .await?;
3407 :
3408 593 : let tenant_shard_id = raw_timeline.owning_tenant.tenant_shard_id;
3409 593 : let unfinished_timeline = raw_timeline.raw_timeline()?;
3410 :
3411 593 : import_datadir::import_timeline_from_postgres_datadir(
3412 593 : unfinished_timeline,
3413 593 : &pgdata_path,
3414 593 : pgdata_lsn,
3415 593 : ctx,
3416 593 : )
3417 2966584 : .await
3418 593 : .with_context(|| {
3419 0 : format!("Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}")
3420 593 : })?;
3421 :
3422 : // Flush the new layer files to disk, before we make the timeline as available to
3423 : // the outside world.
3424 : //
3425 : // Flush loop needs to be spawned in order to be able to flush.
3426 593 : unfinished_timeline.maybe_spawn_flush_loop();
3427 593 :
3428 593 : fail::fail_point!("before-checkpoint-new-timeline", |_| {
3429 2 : anyhow::bail!("failpoint before-checkpoint-new-timeline");
3430 593 : });
3431 :
3432 591 : unfinished_timeline
3433 591 : .freeze_and_flush()
3434 590 : .await
3435 590 : .with_context(|| {
3436 0 : format!(
3437 0 : "Failed to flush after pgdatadir import for timeline {tenant_shard_id}/{timeline_id}"
3438 0 : )
3439 590 : })?;
3440 :
3441 : // All done!
3442 590 : let timeline = raw_timeline.finish_creation()?;
3443 :
3444 589 : Ok(timeline)
3445 595 : }
3446 :
3447 : /// Call this before constructing a timeline, to build its required structures
3448 1144 : fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
3449 1144 : let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
3450 1144 : let remote_client = RemoteTimelineClient::new(
3451 1144 : remote_storage.clone(),
3452 1144 : self.deletion_queue_client.clone(),
3453 1144 : self.conf,
3454 1144 : self.tenant_shard_id,
3455 1144 : timeline_id,
3456 1144 : self.generation,
3457 1144 : );
3458 1144 : Some(remote_client)
3459 : } else {
3460 0 : None
3461 : };
3462 :
3463 1144 : TimelineResources {
3464 1144 : remote_client,
3465 1144 : deletion_queue_client: self.deletion_queue_client.clone(),
3466 1144 : }
3467 1144 : }
3468 :
3469 : /// Creates intermediate timeline structure and its files.
3470 : ///
3471 : /// An empty layer map is initialized, and new data and WAL can be imported starting
3472 : /// at 'disk_consistent_lsn'. After any initial data has been imported, call
3473 : /// `finish_creation` to insert the Timeline into the timelines map and to remove the
3474 : /// uninit mark file.
3475 1144 : async fn prepare_new_timeline<'a>(
3476 1144 : &'a self,
3477 1144 : new_timeline_id: TimelineId,
3478 1144 : new_metadata: &TimelineMetadata,
3479 1144 : uninit_mark: TimelineUninitMark<'a>,
3480 1144 : start_lsn: Lsn,
3481 1144 : ancestor: Option<Arc<Timeline>>,
3482 1144 : ) -> anyhow::Result<UninitializedTimeline> {
3483 1144 : let tenant_shard_id = self.tenant_shard_id;
3484 1144 :
3485 1144 : let resources = self.build_timeline_resources(new_timeline_id);
3486 1144 : if let Some(remote_client) = &resources.remote_client {
3487 1144 : remote_client.init_upload_queue_for_empty_remote(new_metadata)?;
3488 0 : }
3489 :
3490 1144 : let timeline_struct = self
3491 1144 : .create_timeline_struct(
3492 1144 : new_timeline_id,
3493 1144 : new_metadata,
3494 1144 : ancestor,
3495 1144 : resources,
3496 1144 : CreateTimelineCause::Load,
3497 1144 : )
3498 1144 : .context("Failed to create timeline data structure")?;
3499 :
3500 1144 : timeline_struct.init_empty_layer_map(start_lsn);
3501 :
3502 1144 : if let Err(e) = self
3503 1144 : .create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
3504 328 : .await
3505 : {
3506 1 : error!("Failed to create initial files for timeline {tenant_shard_id}/{new_timeline_id}, cleaning up: {e:?}");
3507 1 : cleanup_timeline_directory(uninit_mark);
3508 1 : return Err(e);
3509 1143 : }
3510 :
3511 0 : debug!(
3512 0 : "Successfully created initial files for timeline {tenant_shard_id}/{new_timeline_id}"
3513 0 : );
3514 :
3515 1143 : Ok(UninitializedTimeline::new(
3516 1143 : self,
3517 1143 : new_timeline_id,
3518 1143 : Some((timeline_struct, uninit_mark)),
3519 1143 : ))
3520 1144 : }
3521 :
3522 1144 : async fn create_timeline_files(
3523 1144 : &self,
3524 1144 : timeline_path: &Utf8Path,
3525 1144 : new_timeline_id: &TimelineId,
3526 1144 : new_metadata: &TimelineMetadata,
3527 1144 : ) -> anyhow::Result<()> {
3528 1144 : crashsafe::create_dir(timeline_path).context("Failed to create timeline directory")?;
3529 :
3530 1144 : fail::fail_point!("after-timeline-uninit-mark-creation", |_| {
3531 1 : anyhow::bail!("failpoint after-timeline-uninit-mark-creation");
3532 1144 : });
3533 :
3534 1143 : save_metadata(
3535 1143 : self.conf,
3536 1143 : &self.tenant_shard_id,
3537 1143 : new_timeline_id,
3538 1143 : new_metadata,
3539 1143 : )
3540 328 : .await
3541 1143 : .context("Failed to create timeline metadata")?;
3542 1143 : Ok(())
3543 1144 : }
3544 :
3545 : /// Attempts to create an uninit mark file for the timeline initialization.
3546 : /// Bails, if the timeline is already loaded into the memory (i.e. initialized before), or the uninit mark file already exists.
3547 : ///
3548 : /// This way, we need to hold the timelines lock only for small amount of time during the mark check/creation per timeline init.
3549 1195 : fn create_timeline_uninit_mark(
3550 1195 : &self,
3551 1195 : timeline_id: TimelineId,
3552 1195 : ) -> Result<TimelineUninitMark, TimelineExclusionError> {
3553 1195 : let tenant_shard_id = self.tenant_shard_id;
3554 1195 :
3555 1195 : let uninit_mark_path = self
3556 1195 : .conf
3557 1195 : .timeline_uninit_mark_file_path(tenant_shard_id, timeline_id);
3558 1195 : let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id);
3559 :
3560 1195 : let uninit_mark = TimelineUninitMark::new(
3561 1195 : self,
3562 1195 : timeline_id,
3563 1195 : uninit_mark_path.clone(),
3564 1195 : timeline_path.clone(),
3565 1195 : )?;
3566 :
3567 : // At this stage, we have got exclusive access to in-memory state for this timeline ID
3568 : // for creation.
3569 : // A timeline directory should never exist on disk already:
3570 : // - a previous failed creation would have cleaned up after itself
3571 : // - a pageserver restart would clean up timeline directories that don't have valid remote state
3572 : //
3573 : // Therefore it is an unexpected internal error to encounter a timeline directory already existing here,
3574 : // this error may indicate a bug in cleanup on failed creations.
3575 1165 : if timeline_path.exists() {
3576 0 : return Err(TimelineExclusionError::Other(anyhow::anyhow!(
3577 0 : "Timeline directory already exists! This is a bug."
3578 0 : )));
3579 1165 : }
3580 1165 :
3581 1165 : // Create the on-disk uninit mark _after_ the in-memory acquisition of the tenant ID: guarantees
3582 1165 : // that during process runtime, colliding creations will be caught in-memory without getting
3583 1165 : // as far as failing to write a file.
3584 1165 : fs::OpenOptions::new()
3585 1165 : .write(true)
3586 1165 : .create_new(true)
3587 1165 : .open(&uninit_mark_path)
3588 1165 : .context("Failed to create uninit mark file")
3589 1165 : .and_then(|_| {
3590 1165 : crashsafe::fsync_file_and_parent(&uninit_mark_path)
3591 1165 : .context("Failed to fsync uninit mark file")
3592 1165 : })
3593 1165 : .with_context(|| {
3594 0 : format!("Failed to crate uninit mark for timeline {tenant_shard_id}/{timeline_id}")
3595 1165 : })?;
3596 :
3597 1165 : Ok(uninit_mark)
3598 1195 : }
3599 :
3600 : /// Gathers inputs from all of the timelines to produce a sizing model input.
3601 : ///
3602 : /// Future is cancellation safe. Only one calculation can be running at once per tenant.
3603 76 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
3604 : pub async fn gather_size_inputs(
3605 : &self,
3606 : // `max_retention_period` overrides the cutoff that is used to calculate the size
3607 : // (only if it is shorter than the real cutoff).
3608 : max_retention_period: Option<u64>,
3609 : cause: LogicalSizeCalculationCause,
3610 : cancel: &CancellationToken,
3611 : ctx: &RequestContext,
3612 : ) -> anyhow::Result<size::ModelInputs> {
3613 : let logical_sizes_at_once = self
3614 : .conf
3615 : .concurrent_tenant_size_logical_size_queries
3616 : .inner();
3617 :
3618 : // TODO: Having a single mutex block concurrent reads is not great for performance.
3619 : //
3620 : // But the only case where we need to run multiple of these at once is when we
3621 : // request a size for a tenant manually via API, while another background calculation
3622 : // is in progress (which is not a common case).
3623 : //
3624 : // See more for on the issue #2748 condenced out of the initial PR review.
3625 : let mut shared_cache = self.cached_logical_sizes.lock().await;
3626 :
3627 : size::gather_inputs(
3628 : self,
3629 : logical_sizes_at_once,
3630 : max_retention_period,
3631 : &mut shared_cache,
3632 : cause,
3633 : cancel,
3634 : ctx,
3635 : )
3636 : .await
3637 : }
3638 :
3639 : /// Calculate synthetic tenant size and cache the result.
3640 : /// This is periodically called by background worker.
3641 : /// result is cached in tenant struct
3642 23 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
3643 : pub async fn calculate_synthetic_size(
3644 : &self,
3645 : cause: LogicalSizeCalculationCause,
3646 : cancel: &CancellationToken,
3647 : ctx: &RequestContext,
3648 : ) -> anyhow::Result<u64> {
3649 : let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?;
3650 :
3651 : let size = inputs.calculate()?;
3652 :
3653 : self.set_cached_synthetic_size(size);
3654 :
3655 : Ok(size)
3656 : }
3657 :
3658 : /// Cache given synthetic size and update the metric value
3659 23 : pub fn set_cached_synthetic_size(&self, size: u64) {
3660 23 : self.cached_synthetic_tenant_size
3661 23 : .store(size, Ordering::Relaxed);
3662 :
3663 : // Only shard zero should be calculating synthetic sizes
3664 23 : debug_assert!(self.shard_identity.is_zero());
3665 :
3666 23 : TENANT_SYNTHETIC_SIZE_METRIC
3667 23 : .get_metric_with_label_values(&[&self.tenant_shard_id.tenant_id.to_string()])
3668 23 : .unwrap()
3669 23 : .set(size);
3670 23 : }
3671 :
3672 23 : pub fn cached_synthetic_size(&self) -> u64 {
3673 23 : self.cached_synthetic_tenant_size.load(Ordering::Relaxed)
3674 23 : }
3675 :
3676 : /// Flush any in-progress layers, schedule uploads, and wait for uploads to complete.
3677 : ///
3678 : /// This function can take a long time: callers should wrap it in a timeout if calling
3679 : /// from an external API handler.
3680 : ///
3681 : /// Cancel-safety: cancelling this function may leave I/O running, but such I/O is
3682 : /// still bounded by tenant/timeline shutdown.
3683 0 : #[tracing::instrument(skip_all)]
3684 : pub(crate) async fn flush_remote(&self) -> anyhow::Result<()> {
3685 : let timelines = self.timelines.lock().unwrap().clone();
3686 :
3687 1 : async fn flush_timeline(_gate: GateGuard, timeline: Arc<Timeline>) -> anyhow::Result<()> {
3688 1 : tracing::info!(timeline_id=%timeline.timeline_id, "Flushing...");
3689 1 : timeline.freeze_and_flush().await?;
3690 1 : tracing::info!(timeline_id=%timeline.timeline_id, "Waiting for uploads...");
3691 1 : if let Some(client) = &timeline.remote_client {
3692 1 : client.wait_completion().await?;
3693 0 : }
3694 :
3695 1 : Ok(())
3696 1 : }
3697 :
3698 : // We do not use a JoinSet for these tasks, because we don't want them to be
3699 : // aborted when this function's future is cancelled: they should stay alive
3700 : // holding their GateGuard until they complete, to ensure their I/Os complete
3701 : // before Timeline shutdown completes.
3702 : let mut results = FuturesUnordered::new();
3703 :
3704 : for (_timeline_id, timeline) in timelines {
3705 : // Run each timeline's flush in a task holding the timeline's gate: this
3706 : // means that if this function's future is cancelled, the Timeline shutdown
3707 : // will still wait for any I/O in here to complete.
3708 : let gate = match timeline.gate.enter() {
3709 : Ok(g) => g,
3710 : Err(_) => continue,
3711 : };
3712 2 : let jh = tokio::task::spawn(async move { flush_timeline(gate, timeline).await });
3713 : results.push(jh);
3714 : }
3715 :
3716 : while let Some(r) = results.next().await {
3717 : if let Err(e) = r {
3718 : if !e.is_cancelled() && !e.is_panic() {
3719 0 : tracing::error!("unexpected join error: {e:?}");
3720 : }
3721 : }
3722 : }
3723 :
3724 : // The flushes we did above were just writes, but the Tenant might have had
3725 : // pending deletions as well from recent compaction/gc: we want to flush those
3726 : // as well. This requires flushing the global delete queue. This is cheap
3727 : // because it's typically a no-op.
3728 : match self.deletion_queue_client.flush_execute().await {
3729 : Ok(_) => {}
3730 : Err(DeletionQueueError::ShuttingDown) => {}
3731 : }
3732 :
3733 : Ok(())
3734 : }
3735 : }
3736 :
3737 2 : fn remove_timeline_and_uninit_mark(
3738 2 : timeline_dir: &Utf8Path,
3739 2 : uninit_mark: &Utf8Path,
3740 2 : ) -> anyhow::Result<()> {
3741 2 : fs::remove_dir_all(timeline_dir)
3742 2 : .or_else(|e| {
3743 0 : if e.kind() == std::io::ErrorKind::NotFound {
3744 : // we can leave the uninit mark without a timeline dir,
3745 : // just remove the mark then
3746 0 : Ok(())
3747 : } else {
3748 0 : Err(e)
3749 : }
3750 2 : })
3751 2 : .with_context(|| {
3752 0 : format!("Failed to remove unit marked timeline directory {timeline_dir}")
3753 2 : })?;
3754 2 : fs::remove_file(uninit_mark)
3755 2 : .with_context(|| format!("Failed to remove timeline uninit mark file {uninit_mark}"))?;
3756 :
3757 2 : Ok(())
3758 2 : }
3759 :
3760 : /// Create the cluster temporarily in 'initdbpath' directory inside the repository
3761 : /// to get bootstrap data for timeline initialization.
3762 593 : async fn run_initdb(
3763 593 : conf: &'static PageServerConf,
3764 593 : initdb_target_dir: &Utf8Path,
3765 593 : pg_version: u32,
3766 593 : cancel: &CancellationToken,
3767 593 : ) -> Result<(), InitdbError> {
3768 593 : let initdb_bin_path = conf
3769 593 : .pg_bin_dir(pg_version)
3770 593 : .map_err(InitdbError::Other)?
3771 593 : .join("initdb");
3772 593 : let initdb_lib_dir = conf.pg_lib_dir(pg_version).map_err(InitdbError::Other)?;
3773 593 : info!(
3774 593 : "running {} in {}, libdir: {}",
3775 593 : initdb_bin_path, initdb_target_dir, initdb_lib_dir,
3776 593 : );
3777 :
3778 593 : let _permit = INIT_DB_SEMAPHORE.acquire().await;
3779 :
3780 593 : let initdb_command = tokio::process::Command::new(&initdb_bin_path)
3781 593 : .args(["-D", initdb_target_dir.as_ref()])
3782 593 : .args(["-U", &conf.superuser])
3783 593 : .args(["-E", "utf8"])
3784 593 : .arg("--no-instructions")
3785 593 : .arg("--no-sync")
3786 593 : .env_clear()
3787 593 : .env("LD_LIBRARY_PATH", &initdb_lib_dir)
3788 593 : .env("DYLD_LIBRARY_PATH", &initdb_lib_dir)
3789 593 : .stdin(std::process::Stdio::null())
3790 593 : // stdout invocation produces the same output every time, we don't need it
3791 593 : .stdout(std::process::Stdio::null())
3792 593 : // we would be interested in the stderr output, if there was any
3793 593 : .stderr(std::process::Stdio::piped())
3794 593 : .spawn()?;
3795 :
3796 : // Ideally we'd select here with the cancellation token, but the problem is that
3797 : // we can't safely terminate initdb: it launches processes of its own, and killing
3798 : // initdb doesn't kill them. After we return from this function, we want the target
3799 : // directory to be able to be cleaned up.
3800 : // See https://github.com/neondatabase/neon/issues/6385
3801 1161 : let initdb_output = initdb_command.wait_with_output().await?;
3802 593 : if !initdb_output.status.success() {
3803 0 : return Err(InitdbError::Failed(
3804 0 : initdb_output.status,
3805 0 : initdb_output.stderr,
3806 0 : ));
3807 593 : }
3808 593 :
3809 593 : // This isn't true cancellation support, see above. Still return an error to
3810 593 : // excercise the cancellation code path.
3811 593 : if cancel.is_cancelled() {
3812 2 : return Err(InitdbError::Cancelled);
3813 591 : }
3814 591 :
3815 591 : Ok(())
3816 593 : }
3817 :
3818 : impl Drop for Tenant {
3819 314 : fn drop(&mut self) {
3820 314 : remove_tenant_metrics(&self.tenant_shard_id);
3821 314 : }
3822 : }
3823 : /// Dump contents of a layer file to stdout.
3824 0 : pub async fn dump_layerfile_from_path(
3825 0 : path: &Utf8Path,
3826 0 : verbose: bool,
3827 0 : ctx: &RequestContext,
3828 0 : ) -> anyhow::Result<()> {
3829 : use std::os::unix::fs::FileExt;
3830 :
3831 : // All layer files start with a two-byte "magic" value, to identify the kind of
3832 : // file.
3833 0 : let file = File::open(path)?;
3834 0 : let mut header_buf = [0u8; 2];
3835 0 : file.read_exact_at(&mut header_buf, 0)?;
3836 :
3837 0 : match u16::from_be_bytes(header_buf) {
3838 : crate::IMAGE_FILE_MAGIC => {
3839 0 : ImageLayer::new_for_path(path, file)?
3840 0 : .dump(verbose, ctx)
3841 0 : .await?
3842 : }
3843 : crate::DELTA_FILE_MAGIC => {
3844 0 : DeltaLayer::new_for_path(path, file)?
3845 0 : .dump(verbose, ctx)
3846 0 : .await?
3847 : }
3848 0 : magic => bail!("unrecognized magic identifier: {:?}", magic),
3849 : }
3850 :
3851 0 : Ok(())
3852 0 : }
3853 :
3854 : #[cfg(test)]
3855 : pub(crate) mod harness {
3856 : use bytes::{Bytes, BytesMut};
3857 : use camino::Utf8PathBuf;
3858 : use once_cell::sync::OnceCell;
3859 : use pageserver_api::models::ShardParameters;
3860 : use pageserver_api::shard::ShardIndex;
3861 : use std::fs;
3862 : use std::sync::Arc;
3863 : use utils::logging;
3864 : use utils::lsn::Lsn;
3865 :
3866 : use crate::deletion_queue::mock::MockDeletionQueue;
3867 : use crate::{
3868 : config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
3869 : };
3870 :
3871 : use super::*;
3872 : use crate::tenant::config::{TenantConf, TenantConfOpt};
3873 : use hex_literal::hex;
3874 : use utils::id::{TenantId, TimelineId};
3875 :
3876 : pub const TIMELINE_ID: TimelineId =
3877 : TimelineId::from_array(hex!("11223344556677881122334455667788"));
3878 : pub const NEW_TIMELINE_ID: TimelineId =
3879 : TimelineId::from_array(hex!("AA223344556677881122334455667788"));
3880 :
3881 : /// Convenience function to create a page image with given string as the only content
3882 : #[allow(non_snake_case)]
3883 1708258 : pub fn TEST_IMG(s: &str) -> Bytes {
3884 1708258 : let mut buf = BytesMut::new();
3885 1708258 : buf.extend_from_slice(s.as_bytes());
3886 1708258 : buf.resize(64, 0);
3887 1708258 :
3888 1708258 : buf.freeze()
3889 1708258 : }
3890 :
3891 : impl From<TenantConf> for TenantConfOpt {
3892 84 : fn from(tenant_conf: TenantConf) -> Self {
3893 84 : Self {
3894 84 : checkpoint_distance: Some(tenant_conf.checkpoint_distance),
3895 84 : checkpoint_timeout: Some(tenant_conf.checkpoint_timeout),
3896 84 : compaction_target_size: Some(tenant_conf.compaction_target_size),
3897 84 : compaction_period: Some(tenant_conf.compaction_period),
3898 84 : compaction_threshold: Some(tenant_conf.compaction_threshold),
3899 84 : gc_horizon: Some(tenant_conf.gc_horizon),
3900 84 : gc_period: Some(tenant_conf.gc_period),
3901 84 : image_creation_threshold: Some(tenant_conf.image_creation_threshold),
3902 84 : pitr_interval: Some(tenant_conf.pitr_interval),
3903 84 : walreceiver_connect_timeout: Some(tenant_conf.walreceiver_connect_timeout),
3904 84 : lagging_wal_timeout: Some(tenant_conf.lagging_wal_timeout),
3905 84 : max_lsn_wal_lag: Some(tenant_conf.max_lsn_wal_lag),
3906 84 : trace_read_requests: Some(tenant_conf.trace_read_requests),
3907 84 : eviction_policy: Some(tenant_conf.eviction_policy),
3908 84 : min_resident_size_override: tenant_conf.min_resident_size_override,
3909 84 : evictions_low_residence_duration_metric_threshold: Some(
3910 84 : tenant_conf.evictions_low_residence_duration_metric_threshold,
3911 84 : ),
3912 84 : gc_feedback: Some(tenant_conf.gc_feedback),
3913 84 : heatmap_period: Some(tenant_conf.heatmap_period),
3914 84 : lazy_slru_download: Some(tenant_conf.lazy_slru_download),
3915 84 : }
3916 84 : }
3917 : }
3918 :
3919 : enum LoadMode {
3920 : Local,
3921 : Remote,
3922 : }
3923 :
3924 : pub struct TenantHarness {
3925 : pub conf: &'static PageServerConf,
3926 : pub tenant_conf: TenantConf,
3927 : pub tenant_shard_id: TenantShardId,
3928 : pub generation: Generation,
3929 : pub shard: ShardIndex,
3930 : pub remote_storage: GenericRemoteStorage,
3931 : pub remote_fs_dir: Utf8PathBuf,
3932 : pub deletion_queue: MockDeletionQueue,
3933 : }
3934 :
3935 : static LOG_HANDLE: OnceCell<()> = OnceCell::new();
3936 :
3937 88 : pub(crate) fn setup_logging() {
3938 88 : LOG_HANDLE.get_or_init(|| {
3939 88 : logging::init(
3940 88 : logging::LogFormat::Test,
3941 88 : // enable it in case the tests exercise code paths that use
3942 88 : // debug_assert_current_span_has_tenant_and_timeline_id
3943 88 : logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
3944 88 : logging::Output::Stdout,
3945 88 : )
3946 88 : .expect("Failed to init test logging")
3947 88 : });
3948 88 : }
3949 :
3950 : impl TenantHarness {
3951 82 : pub fn create(test_name: &'static str) -> anyhow::Result<Self> {
3952 82 : setup_logging();
3953 82 :
3954 82 : let repo_dir = PageServerConf::test_repo_dir(test_name);
3955 82 : let _ = fs::remove_dir_all(&repo_dir);
3956 82 : fs::create_dir_all(&repo_dir)?;
3957 :
3958 82 : let conf = PageServerConf::dummy_conf(repo_dir);
3959 82 : // Make a static copy of the config. This can never be free'd, but that's
3960 82 : // OK in a test.
3961 82 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
3962 82 :
3963 82 : // Disable automatic GC and compaction to make the unit tests more deterministic.
3964 82 : // The tests perform them manually if needed.
3965 82 : let tenant_conf = TenantConf {
3966 82 : gc_period: Duration::ZERO,
3967 82 : compaction_period: Duration::ZERO,
3968 82 : ..TenantConf::default()
3969 82 : };
3970 82 :
3971 82 : let tenant_id = TenantId::generate();
3972 82 : let tenant_shard_id = TenantShardId::unsharded(tenant_id);
3973 82 : fs::create_dir_all(conf.tenant_path(&tenant_shard_id))?;
3974 82 : fs::create_dir_all(conf.timelines_path(&tenant_shard_id))?;
3975 :
3976 : use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
3977 82 : let remote_fs_dir = conf.workdir.join("localfs");
3978 82 : std::fs::create_dir_all(&remote_fs_dir).unwrap();
3979 82 : let config = RemoteStorageConfig {
3980 82 : storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
3981 82 : };
3982 82 : let remote_storage = GenericRemoteStorage::from_config(&config).unwrap();
3983 82 : let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()));
3984 82 :
3985 82 : Ok(Self {
3986 82 : conf,
3987 82 : tenant_conf,
3988 82 : tenant_shard_id,
3989 82 : generation: Generation::new(0xdeadbeef),
3990 82 : shard: ShardIndex::unsharded(),
3991 82 : remote_storage,
3992 82 : remote_fs_dir,
3993 82 : deletion_queue,
3994 82 : })
3995 82 : }
3996 :
3997 10 : pub fn span(&self) -> tracing::Span {
3998 10 : info_span!("TenantHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
3999 10 : }
4000 :
4001 80 : pub async fn load(&self) -> (Arc<Tenant>, RequestContext) {
4002 80 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
4003 80 : (
4004 80 : self.try_load(&ctx)
4005 107 : .await
4006 80 : .expect("failed to load test tenant"),
4007 80 : ctx,
4008 80 : )
4009 80 : }
4010 :
4011 82 : fn remote_empty(&self) -> bool {
4012 82 : let tenant_path = self.conf.tenant_path(&self.tenant_shard_id);
4013 82 : let remote_tenant_dir = self
4014 82 : .remote_fs_dir
4015 82 : .join(tenant_path.strip_prefix(&self.conf.workdir).unwrap());
4016 82 : if std::fs::metadata(&remote_tenant_dir).is_err() {
4017 78 : return true;
4018 4 : }
4019 4 :
4020 4 : match std::fs::read_dir(remote_tenant_dir)
4021 4 : .unwrap()
4022 4 : .flatten()
4023 4 : .next()
4024 : {
4025 4 : Some(entry) => {
4026 4 : tracing::debug!(
4027 0 : "remote_empty: not empty, found file {}",
4028 0 : entry.file_name().to_string_lossy(),
4029 0 : );
4030 4 : false
4031 : }
4032 0 : None => true,
4033 : }
4034 82 : }
4035 :
4036 84 : async fn do_try_load(
4037 84 : &self,
4038 84 : ctx: &RequestContext,
4039 84 : mode: LoadMode,
4040 84 : ) -> anyhow::Result<Arc<Tenant>> {
4041 84 : let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
4042 84 :
4043 84 : let tenant = Arc::new(Tenant::new(
4044 84 : TenantState::Loading,
4045 84 : self.conf,
4046 84 : AttachedTenantConf::try_from(LocationConf::attached_single(
4047 84 : TenantConfOpt::from(self.tenant_conf),
4048 84 : self.generation,
4049 84 : &ShardParameters::default(),
4050 84 : ))
4051 84 : .unwrap(),
4052 84 : // This is a legacy/test code path: sharding isn't supported here.
4053 84 : ShardIdentity::unsharded(),
4054 84 : Some(walredo_mgr),
4055 84 : self.tenant_shard_id,
4056 84 : Some(self.remote_storage.clone()),
4057 84 : self.deletion_queue.new_client(),
4058 84 : ));
4059 84 :
4060 84 : match mode {
4061 : LoadMode::Local => {
4062 80 : tenant
4063 80 : .load_local(ctx)
4064 80 : .instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
4065 80 : .await?;
4066 : }
4067 : LoadMode::Remote => {
4068 4 : let preload = tenant
4069 4 : .preload(&self.remote_storage, CancellationToken::new())
4070 4 : .instrument(info_span!("try_load_preload", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
4071 15 : .await?;
4072 4 : tenant
4073 4 : .attach(Some(preload), SpawnMode::Normal, ctx)
4074 4 : .instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
4075 16 : .await?;
4076 : }
4077 : }
4078 :
4079 82 : tenant.state.send_replace(TenantState::Active);
4080 82 : for timeline in tenant.timelines.lock().unwrap().values() {
4081 6 : timeline.set_state(TimelineState::Active);
4082 6 : }
4083 82 : Ok(tenant)
4084 84 : }
4085 :
4086 : /// For tests that specifically want to exercise the local load path, which does
4087 : /// not use remote storage.
4088 2 : pub async fn try_load_local(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
4089 2 : self.do_try_load(ctx, LoadMode::Local).await
4090 2 : }
4091 :
4092 : /// The 'load' in this function is either a local load or a normal attachment,
4093 82 : pub async fn try_load(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
4094 : // If we have nothing in remote storage, must use load_local instead of attach: attach
4095 : // will error out if there are no timelines.
4096 : //
4097 : // See https://github.com/neondatabase/neon/issues/5456 for how we will eliminate
4098 : // this weird state of a Tenant which exists but doesn't have any timelines.
4099 82 : let mode = match self.remote_empty() {
4100 78 : true => LoadMode::Local,
4101 4 : false => LoadMode::Remote,
4102 : };
4103 :
4104 109 : self.do_try_load(ctx, mode).await
4105 82 : }
4106 :
4107 6 : pub fn timeline_path(&self, timeline_id: &TimelineId) -> Utf8PathBuf {
4108 6 : self.conf.timeline_path(&self.tenant_shard_id, timeline_id)
4109 6 : }
4110 : }
4111 :
4112 : // Mock WAL redo manager that doesn't do much
4113 : pub(crate) struct TestRedoManager;
4114 :
4115 : impl TestRedoManager {
4116 : /// # Cancel-Safety
4117 : ///
4118 : /// This method is cancellation-safe.
4119 0 : pub async fn request_redo(
4120 0 : &self,
4121 0 : key: Key,
4122 0 : lsn: Lsn,
4123 0 : base_img: Option<(Lsn, Bytes)>,
4124 0 : records: Vec<(Lsn, NeonWalRecord)>,
4125 0 : _pg_version: u32,
4126 0 : ) -> anyhow::Result<Bytes> {
4127 0 : let s = format!(
4128 0 : "redo for {} to get to {}, with {} and {} records",
4129 0 : key,
4130 0 : lsn,
4131 0 : if base_img.is_some() {
4132 0 : "base image"
4133 : } else {
4134 0 : "no base image"
4135 : },
4136 0 : records.len()
4137 0 : );
4138 0 : println!("{s}");
4139 0 :
4140 0 : Ok(TEST_IMG(&s))
4141 0 : }
4142 : }
4143 : }
4144 :
4145 : #[cfg(test)]
4146 : mod tests {
4147 : use super::*;
4148 : use crate::keyspace::KeySpaceAccum;
4149 : use crate::repository::{Key, Value};
4150 : use crate::tenant::harness::*;
4151 : use crate::DEFAULT_PG_VERSION;
4152 : use crate::METADATA_FILE_NAME;
4153 : use bytes::BytesMut;
4154 : use hex_literal::hex;
4155 : use once_cell::sync::Lazy;
4156 : use rand::{thread_rng, Rng};
4157 : use tokio_util::sync::CancellationToken;
4158 :
4159 : static TEST_KEY: Lazy<Key> =
4160 18 : Lazy::new(|| Key::from_slice(&hex!("010000000033333333444444445500000001")));
4161 :
4162 2 : #[tokio::test]
4163 2 : async fn test_basic() -> anyhow::Result<()> {
4164 2 : let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
4165 2 : let tline = tenant
4166 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
4167 6 : .await?;
4168 :
4169 2 : let writer = tline.writer().await;
4170 2 : writer
4171 2 : .put(
4172 2 : *TEST_KEY,
4173 2 : Lsn(0x10),
4174 2 : &Value::Image(TEST_IMG("foo at 0x10")),
4175 2 : &ctx,
4176 2 : )
4177 1 : .await?;
4178 2 : writer.finish_write(Lsn(0x10));
4179 2 : drop(writer);
4180 :
4181 2 : let writer = tline.writer().await;
4182 2 : writer
4183 2 : .put(
4184 2 : *TEST_KEY,
4185 2 : Lsn(0x20),
4186 2 : &Value::Image(TEST_IMG("foo at 0x20")),
4187 2 : &ctx,
4188 2 : )
4189 0 : .await?;
4190 2 : writer.finish_write(Lsn(0x20));
4191 2 : drop(writer);
4192 :
4193 2 : assert_eq!(
4194 2 : tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
4195 2 : TEST_IMG("foo at 0x10")
4196 : );
4197 2 : assert_eq!(
4198 2 : tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
4199 2 : TEST_IMG("foo at 0x10")
4200 : );
4201 2 : assert_eq!(
4202 2 : tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
4203 2 : TEST_IMG("foo at 0x20")
4204 : );
4205 :
4206 2 : Ok(())
4207 : }
4208 :
4209 2 : #[tokio::test]
4210 2 : async fn no_duplicate_timelines() -> anyhow::Result<()> {
4211 2 : let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
4212 2 : .load()
4213 2 : .await;
4214 2 : let _ = tenant
4215 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4216 8 : .await?;
4217 :
4218 2 : match tenant
4219 2 : .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4220 0 : .await
4221 : {
4222 0 : Ok(_) => panic!("duplicate timeline creation should fail"),
4223 2 : Err(e) => assert_eq!(e.to_string(), "Already exists".to_string()),
4224 : }
4225 :
4226 2 : Ok(())
4227 : }
4228 :
4229 : /// Convenience function to create a page image with given string as the only content
4230 10 : pub fn test_value(s: &str) -> Value {
4231 10 : let mut buf = BytesMut::new();
4232 10 : buf.extend_from_slice(s.as_bytes());
4233 10 : Value::Image(buf.freeze())
4234 10 : }
4235 :
4236 : ///
4237 : /// Test branch creation
4238 : ///
4239 2 : #[tokio::test]
4240 2 : async fn test_branch() -> anyhow::Result<()> {
4241 : use std::str::from_utf8;
4242 :
4243 2 : let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
4244 2 : let tline = tenant
4245 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4246 7 : .await?;
4247 2 : let writer = tline.writer().await;
4248 :
4249 : #[allow(non_snake_case)]
4250 2 : let TEST_KEY_A: Key = Key::from_hex("110000000033333333444444445500000001").unwrap();
4251 2 : #[allow(non_snake_case)]
4252 2 : let TEST_KEY_B: Key = Key::from_hex("110000000033333333444444445500000002").unwrap();
4253 2 :
4254 2 : // Insert a value on the timeline
4255 2 : writer
4256 2 : .put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"), &ctx)
4257 1 : .await?;
4258 2 : writer
4259 2 : .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"), &ctx)
4260 0 : .await?;
4261 2 : writer.finish_write(Lsn(0x20));
4262 2 :
4263 2 : writer
4264 2 : .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"), &ctx)
4265 0 : .await?;
4266 2 : writer.finish_write(Lsn(0x30));
4267 2 : writer
4268 2 : .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"), &ctx)
4269 0 : .await?;
4270 2 : writer.finish_write(Lsn(0x40));
4271 2 :
4272 2 : //assert_current_logical_size(&tline, Lsn(0x40));
4273 2 :
4274 2 : // Branch the history, modify relation differently on the new timeline
4275 2 : tenant
4276 2 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx)
4277 2 : .await?;
4278 2 : let newtline = tenant
4279 2 : .get_timeline(NEW_TIMELINE_ID, true)
4280 2 : .expect("Should have a local timeline");
4281 2 : let new_writer = newtline.writer().await;
4282 2 : new_writer
4283 2 : .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"), &ctx)
4284 1 : .await?;
4285 2 : new_writer.finish_write(Lsn(0x40));
4286 :
4287 : // Check page contents on both branches
4288 2 : assert_eq!(
4289 2 : from_utf8(&tline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
4290 : "foo at 0x40"
4291 : );
4292 2 : assert_eq!(
4293 2 : from_utf8(&newtline.get(TEST_KEY_A, Lsn(0x40), &ctx).await?)?,
4294 : "bar at 0x40"
4295 : );
4296 2 : assert_eq!(
4297 2 : from_utf8(&newtline.get(TEST_KEY_B, Lsn(0x40), &ctx).await?)?,
4298 : "foobar at 0x20"
4299 : );
4300 :
4301 : //assert_current_logical_size(&tline, Lsn(0x40));
4302 :
4303 2 : Ok(())
4304 : }
4305 :
4306 20 : async fn make_some_layers(
4307 20 : tline: &Timeline,
4308 20 : start_lsn: Lsn,
4309 20 : ctx: &RequestContext,
4310 20 : ) -> anyhow::Result<()> {
4311 20 : let mut lsn = start_lsn;
4312 : #[allow(non_snake_case)]
4313 : {
4314 20 : let writer = tline.writer().await;
4315 : // Create a relation on the timeline
4316 20 : writer
4317 20 : .put(
4318 20 : *TEST_KEY,
4319 20 : lsn,
4320 20 : &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
4321 20 : ctx,
4322 20 : )
4323 10 : .await?;
4324 20 : writer.finish_write(lsn);
4325 20 : lsn += 0x10;
4326 20 : writer
4327 20 : .put(
4328 20 : *TEST_KEY,
4329 20 : lsn,
4330 20 : &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
4331 20 : ctx,
4332 20 : )
4333 0 : .await?;
4334 20 : writer.finish_write(lsn);
4335 20 : lsn += 0x10;
4336 20 : }
4337 20 : tline.freeze_and_flush().await?;
4338 : {
4339 20 : let writer = tline.writer().await;
4340 20 : writer
4341 20 : .put(
4342 20 : *TEST_KEY,
4343 20 : lsn,
4344 20 : &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
4345 20 : ctx,
4346 20 : )
4347 10 : .await?;
4348 20 : writer.finish_write(lsn);
4349 20 : lsn += 0x10;
4350 20 : writer
4351 20 : .put(
4352 20 : *TEST_KEY,
4353 20 : lsn,
4354 20 : &Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
4355 20 : ctx,
4356 20 : )
4357 0 : .await?;
4358 20 : writer.finish_write(lsn);
4359 20 : }
4360 20 : tline.freeze_and_flush().await
4361 20 : }
4362 :
4363 2 : #[tokio::test]
4364 2 : async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
4365 2 : let (tenant, ctx) =
4366 2 : TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
4367 2 : .load()
4368 2 : .await;
4369 2 : let tline = tenant
4370 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4371 7 : .await?;
4372 6 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4373 :
4374 : // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
4375 : // FIXME: this doesn't actually remove any layer currently, given how the flushing
4376 : // and compaction works. But it does set the 'cutoff' point so that the cross check
4377 : // below should fail.
4378 2 : tenant
4379 2 : .gc_iteration(
4380 2 : Some(TIMELINE_ID),
4381 2 : 0x10,
4382 2 : Duration::ZERO,
4383 2 : &CancellationToken::new(),
4384 2 : &ctx,
4385 2 : )
4386 0 : .await?;
4387 :
4388 : // try to branch at lsn 25, should fail because we already garbage collected the data
4389 2 : match tenant
4390 2 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
4391 0 : .await
4392 : {
4393 0 : Ok(_) => panic!("branching should have failed"),
4394 2 : Err(err) => {
4395 2 : let CreateTimelineError::AncestorLsn(err) = err else {
4396 0 : panic!("wrong error type")
4397 : };
4398 2 : assert!(err.to_string().contains("invalid branch start lsn"));
4399 2 : assert!(err
4400 2 : .source()
4401 2 : .unwrap()
4402 2 : .to_string()
4403 2 : .contains("we might've already garbage collected needed data"))
4404 : }
4405 : }
4406 :
4407 2 : Ok(())
4408 : }
4409 :
4410 2 : #[tokio::test]
4411 2 : async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
4412 2 : let (tenant, ctx) =
4413 2 : TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?
4414 2 : .load()
4415 2 : .await;
4416 :
4417 2 : let tline = tenant
4418 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)
4419 7 : .await?;
4420 : // try to branch at lsn 0x25, should fail because initdb lsn is 0x50
4421 2 : match tenant
4422 2 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
4423 0 : .await
4424 : {
4425 0 : Ok(_) => panic!("branching should have failed"),
4426 2 : Err(err) => {
4427 2 : let CreateTimelineError::AncestorLsn(err) = err else {
4428 0 : panic!("wrong error type");
4429 : };
4430 2 : assert!(&err.to_string().contains("invalid branch start lsn"));
4431 2 : assert!(&err
4432 2 : .source()
4433 2 : .unwrap()
4434 2 : .to_string()
4435 2 : .contains("is earlier than latest GC horizon"));
4436 : }
4437 : }
4438 :
4439 2 : Ok(())
4440 : }
4441 :
4442 : /*
4443 : // FIXME: This currently fails to error out. Calling GC doesn't currently
4444 : // remove the old value, we'd need to work a little harder
4445 : #[tokio::test]
4446 : async fn test_prohibit_get_for_garbage_collected_data() -> anyhow::Result<()> {
4447 : let repo =
4448 : RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
4449 : .load();
4450 :
4451 : let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
4452 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4453 :
4454 : repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
4455 : let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
4456 : assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
4457 : match tline.get(*TEST_KEY, Lsn(0x25)) {
4458 : Ok(_) => panic!("request for page should have failed"),
4459 : Err(err) => assert!(err.to_string().contains("not found at")),
4460 : }
4461 : Ok(())
4462 : }
4463 : */
4464 :
4465 2 : #[tokio::test]
4466 2 : async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> {
4467 2 : let (tenant, ctx) =
4468 2 : TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
4469 2 : .load()
4470 2 : .await;
4471 2 : let tline = tenant
4472 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4473 8 : .await?;
4474 6 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4475 :
4476 2 : tenant
4477 2 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
4478 2 : .await?;
4479 2 : let newtline = tenant
4480 2 : .get_timeline(NEW_TIMELINE_ID, true)
4481 2 : .expect("Should have a local timeline");
4482 2 :
4483 6 : make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
4484 :
4485 2 : tline.set_broken("test".to_owned());
4486 2 :
4487 2 : tenant
4488 2 : .gc_iteration(
4489 2 : Some(TIMELINE_ID),
4490 2 : 0x10,
4491 2 : Duration::ZERO,
4492 2 : &CancellationToken::new(),
4493 2 : &ctx,
4494 2 : )
4495 0 : .await?;
4496 :
4497 : // The branchpoints should contain all timelines, even ones marked
4498 : // as Broken.
4499 : {
4500 2 : let branchpoints = &tline.gc_info.read().unwrap().retain_lsns;
4501 2 : assert_eq!(branchpoints.len(), 1);
4502 2 : assert_eq!(branchpoints[0], Lsn(0x40));
4503 : }
4504 :
4505 : // You can read the key from the child branch even though the parent is
4506 : // Broken, as long as you don't need to access data from the parent.
4507 2 : assert_eq!(
4508 4 : newtline.get(*TEST_KEY, Lsn(0x70), &ctx).await?,
4509 2 : TEST_IMG(&format!("foo at {}", Lsn(0x70)))
4510 : );
4511 :
4512 : // This needs to traverse to the parent, and fails.
4513 2 : let err = newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await.unwrap_err();
4514 2 : assert!(err
4515 2 : .to_string()
4516 2 : .contains("will not become active. Current state: Broken"));
4517 :
4518 2 : Ok(())
4519 : }
4520 :
4521 2 : #[tokio::test]
4522 2 : async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
4523 2 : let (tenant, ctx) =
4524 2 : TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
4525 2 : .load()
4526 2 : .await;
4527 2 : let tline = tenant
4528 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4529 7 : .await?;
4530 6 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4531 :
4532 2 : tenant
4533 2 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
4534 2 : .await?;
4535 2 : let newtline = tenant
4536 2 : .get_timeline(NEW_TIMELINE_ID, true)
4537 2 : .expect("Should have a local timeline");
4538 2 : // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
4539 2 : tenant
4540 2 : .gc_iteration(
4541 2 : Some(TIMELINE_ID),
4542 2 : 0x10,
4543 2 : Duration::ZERO,
4544 2 : &CancellationToken::new(),
4545 2 : &ctx,
4546 2 : )
4547 0 : .await?;
4548 4 : assert!(newtline.get(*TEST_KEY, Lsn(0x25), &ctx).await.is_ok());
4549 :
4550 2 : Ok(())
4551 : }
4552 2 : #[tokio::test]
4553 2 : async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
4554 2 : let (tenant, ctx) =
4555 2 : TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
4556 2 : .load()
4557 2 : .await;
4558 2 : let tline = tenant
4559 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4560 6 : .await?;
4561 6 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4562 :
4563 2 : tenant
4564 2 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
4565 2 : .await?;
4566 2 : let newtline = tenant
4567 2 : .get_timeline(NEW_TIMELINE_ID, true)
4568 2 : .expect("Should have a local timeline");
4569 2 :
4570 6 : make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
4571 :
4572 : // run gc on parent
4573 2 : tenant
4574 2 : .gc_iteration(
4575 2 : Some(TIMELINE_ID),
4576 2 : 0x10,
4577 2 : Duration::ZERO,
4578 2 : &CancellationToken::new(),
4579 2 : &ctx,
4580 2 : )
4581 0 : .await?;
4582 :
4583 : // Check that the data is still accessible on the branch.
4584 2 : assert_eq!(
4585 7 : newtline.get(*TEST_KEY, Lsn(0x50), &ctx).await?,
4586 2 : TEST_IMG(&format!("foo at {}", Lsn(0x40)))
4587 : );
4588 :
4589 2 : Ok(())
4590 : }
4591 :
4592 2 : #[tokio::test]
4593 2 : async fn timeline_load() -> anyhow::Result<()> {
4594 : const TEST_NAME: &str = "timeline_load";
4595 2 : let harness = TenantHarness::create(TEST_NAME)?;
4596 : {
4597 2 : let (tenant, ctx) = harness.load().await;
4598 2 : let tline = tenant
4599 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)
4600 7 : .await?;
4601 6 : make_some_layers(tline.as_ref(), Lsn(0x8000), &ctx).await?;
4602 : // so that all uploads finish & we can call harness.load() below again
4603 2 : tenant
4604 2 : .shutdown(Default::default(), true)
4605 2 : .instrument(harness.span())
4606 2 : .await
4607 2 : .ok()
4608 2 : .unwrap();
4609 : }
4610 :
4611 12 : let (tenant, _ctx) = harness.load().await;
4612 2 : tenant
4613 2 : .get_timeline(TIMELINE_ID, true)
4614 2 : .expect("cannot load timeline");
4615 2 :
4616 2 : Ok(())
4617 : }
4618 :
4619 2 : #[tokio::test]
4620 2 : async fn timeline_load_with_ancestor() -> anyhow::Result<()> {
4621 : const TEST_NAME: &str = "timeline_load_with_ancestor";
4622 2 : let harness = TenantHarness::create(TEST_NAME)?;
4623 : // create two timelines
4624 : {
4625 2 : let (tenant, ctx) = harness.load().await;
4626 2 : let tline = tenant
4627 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4628 6 : .await?;
4629 :
4630 6 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4631 :
4632 2 : let child_tline = tenant
4633 2 : .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
4634 2 : .await?;
4635 2 : child_tline.set_state(TimelineState::Active);
4636 2 :
4637 2 : let newtline = tenant
4638 2 : .get_timeline(NEW_TIMELINE_ID, true)
4639 2 : .expect("Should have a local timeline");
4640 2 :
4641 6 : make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
4642 :
4643 : // so that all uploads finish & we can call harness.load() below again
4644 2 : tenant
4645 2 : .shutdown(Default::default(), true)
4646 2 : .instrument(harness.span())
4647 4 : .await
4648 2 : .ok()
4649 2 : .unwrap();
4650 : }
4651 :
4652 : // check that both of them are initially unloaded
4653 19 : let (tenant, _ctx) = harness.load().await;
4654 :
4655 : // check that both, child and ancestor are loaded
4656 2 : let _child_tline = tenant
4657 2 : .get_timeline(NEW_TIMELINE_ID, true)
4658 2 : .expect("cannot get child timeline loaded");
4659 2 :
4660 2 : let _ancestor_tline = tenant
4661 2 : .get_timeline(TIMELINE_ID, true)
4662 2 : .expect("cannot get ancestor timeline loaded");
4663 2 :
4664 2 : Ok(())
4665 : }
4666 :
4667 2 : #[tokio::test]
4668 2 : async fn delta_layer_dumping() -> anyhow::Result<()> {
4669 : use storage_layer::AsLayerDesc;
4670 2 : let (tenant, ctx) = TenantHarness::create("test_layer_dumping")?.load().await;
4671 2 : let tline = tenant
4672 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4673 7 : .await?;
4674 6 : make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
4675 :
4676 2 : let layer_map = tline.layers.read().await;
4677 2 : let level0_deltas = layer_map
4678 2 : .layer_map()
4679 2 : .get_level0_deltas()?
4680 2 : .into_iter()
4681 4 : .map(|desc| layer_map.get_from_desc(&desc))
4682 2 : .collect::<Vec<_>>();
4683 :
4684 2 : assert!(!level0_deltas.is_empty());
4685 :
4686 6 : for delta in level0_deltas {
4687 : // Ensure we are dumping a delta layer here
4688 4 : assert!(delta.layer_desc().is_delta);
4689 8 : delta.dump(true, &ctx).await.unwrap();
4690 : }
4691 :
4692 2 : Ok(())
4693 : }
4694 :
4695 2 : #[tokio::test]
4696 2 : async fn corrupt_local_metadata() -> anyhow::Result<()> {
4697 : const TEST_NAME: &str = "corrupt_metadata";
4698 2 : let harness = TenantHarness::create(TEST_NAME)?;
4699 2 : let (tenant, ctx) = harness.load().await;
4700 :
4701 2 : let tline = tenant
4702 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4703 8 : .await?;
4704 2 : drop(tline);
4705 2 : // so that all uploads finish & we can call harness.try_load() below again
4706 2 : tenant
4707 2 : .shutdown(Default::default(), true)
4708 2 : .instrument(harness.span())
4709 2 : .await
4710 2 : .ok()
4711 2 : .unwrap();
4712 2 : drop(tenant);
4713 2 :
4714 2 : // Corrupt local metadata
4715 2 : let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
4716 2 : assert!(metadata_path.is_file());
4717 2 : let mut metadata_bytes = std::fs::read(&metadata_path)?;
4718 2 : assert_eq!(metadata_bytes.len(), 512);
4719 2 : metadata_bytes[8] ^= 1;
4720 2 : std::fs::write(metadata_path, metadata_bytes)?;
4721 :
4722 2 : let err = harness.try_load_local(&ctx).await.expect_err("should fail");
4723 2 : // get all the stack with all .context, not only the last one
4724 2 : let message = format!("{err:#}");
4725 2 : let expected = "failed to load metadata";
4726 2 : assert!(
4727 2 : message.contains(expected),
4728 0 : "message '{message}' expected to contain {expected}"
4729 : );
4730 :
4731 2 : let mut found_error_message = false;
4732 2 : let mut err_source = err.source();
4733 2 : while let Some(source) = err_source {
4734 2 : if source.to_string().contains("metadata checksum mismatch") {
4735 2 : found_error_message = true;
4736 2 : break;
4737 0 : }
4738 0 : err_source = source.source();
4739 : }
4740 2 : assert!(
4741 2 : found_error_message,
4742 0 : "didn't find the corrupted metadata error in {}",
4743 : message
4744 : );
4745 :
4746 2 : Ok(())
4747 : }
4748 :
4749 2 : #[tokio::test]
4750 2 : async fn test_images() -> anyhow::Result<()> {
4751 2 : let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
4752 2 : let tline = tenant
4753 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
4754 7 : .await?;
4755 :
4756 2 : let writer = tline.writer().await;
4757 2 : writer
4758 2 : .put(
4759 2 : *TEST_KEY,
4760 2 : Lsn(0x10),
4761 2 : &Value::Image(TEST_IMG("foo at 0x10")),
4762 2 : &ctx,
4763 2 : )
4764 1 : .await?;
4765 2 : writer.finish_write(Lsn(0x10));
4766 2 : drop(writer);
4767 2 :
4768 2 : tline.freeze_and_flush().await?;
4769 2 : tline
4770 2 : .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
4771 0 : .await?;
4772 :
4773 2 : let writer = tline.writer().await;
4774 2 : writer
4775 2 : .put(
4776 2 : *TEST_KEY,
4777 2 : Lsn(0x20),
4778 2 : &Value::Image(TEST_IMG("foo at 0x20")),
4779 2 : &ctx,
4780 2 : )
4781 1 : .await?;
4782 2 : writer.finish_write(Lsn(0x20));
4783 2 : drop(writer);
4784 2 :
4785 2 : tline.freeze_and_flush().await?;
4786 2 : tline
4787 2 : .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
4788 0 : .await?;
4789 :
4790 2 : let writer = tline.writer().await;
4791 2 : writer
4792 2 : .put(
4793 2 : *TEST_KEY,
4794 2 : Lsn(0x30),
4795 2 : &Value::Image(TEST_IMG("foo at 0x30")),
4796 2 : &ctx,
4797 2 : )
4798 1 : .await?;
4799 2 : writer.finish_write(Lsn(0x30));
4800 2 : drop(writer);
4801 2 :
4802 2 : tline.freeze_and_flush().await?;
4803 2 : tline
4804 2 : .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
4805 0 : .await?;
4806 :
4807 2 : let writer = tline.writer().await;
4808 2 : writer
4809 2 : .put(
4810 2 : *TEST_KEY,
4811 2 : Lsn(0x40),
4812 2 : &Value::Image(TEST_IMG("foo at 0x40")),
4813 2 : &ctx,
4814 2 : )
4815 1 : .await?;
4816 2 : writer.finish_write(Lsn(0x40));
4817 2 : drop(writer);
4818 2 :
4819 2 : tline.freeze_and_flush().await?;
4820 2 : tline
4821 2 : .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
4822 0 : .await?;
4823 :
4824 2 : assert_eq!(
4825 4 : tline.get(*TEST_KEY, Lsn(0x10), &ctx).await?,
4826 2 : TEST_IMG("foo at 0x10")
4827 : );
4828 2 : assert_eq!(
4829 3 : tline.get(*TEST_KEY, Lsn(0x1f), &ctx).await?,
4830 2 : TEST_IMG("foo at 0x10")
4831 : );
4832 2 : assert_eq!(
4833 2 : tline.get(*TEST_KEY, Lsn(0x20), &ctx).await?,
4834 2 : TEST_IMG("foo at 0x20")
4835 : );
4836 2 : assert_eq!(
4837 4 : tline.get(*TEST_KEY, Lsn(0x30), &ctx).await?,
4838 2 : TEST_IMG("foo at 0x30")
4839 : );
4840 2 : assert_eq!(
4841 4 : tline.get(*TEST_KEY, Lsn(0x40), &ctx).await?,
4842 2 : TEST_IMG("foo at 0x40")
4843 : );
4844 :
4845 2 : Ok(())
4846 : }
4847 :
4848 : //
4849 : // Insert 1000 key-value pairs with increasing keys, flush, compact, GC.
4850 : // Repeat 50 times.
4851 : //
4852 2 : #[tokio::test]
4853 2 : async fn test_bulk_insert() -> anyhow::Result<()> {
4854 2 : let harness = TenantHarness::create("test_bulk_insert")?;
4855 2 : let (tenant, ctx) = harness.load().await;
4856 2 : let tline = tenant
4857 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
4858 8 : .await?;
4859 :
4860 2 : let mut lsn = Lsn(0x10);
4861 2 :
4862 2 : let mut keyspace = KeySpaceAccum::new();
4863 2 :
4864 2 : let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
4865 2 : let mut blknum = 0;
4866 102 : for _ in 0..50 {
4867 1000100 : for _ in 0..10000 {
4868 1000000 : test_key.field6 = blknum;
4869 1000000 : let writer = tline.writer().await;
4870 1000000 : writer
4871 1000000 : .put(
4872 1000000 : test_key,
4873 1000000 : lsn,
4874 1000000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
4875 1000000 : &ctx,
4876 1000000 : )
4877 15854 : .await?;
4878 1000000 : writer.finish_write(lsn);
4879 1000000 : drop(writer);
4880 1000000 :
4881 1000000 : keyspace.add_key(test_key);
4882 1000000 :
4883 1000000 : lsn = Lsn(lsn.0 + 0x10);
4884 1000000 : blknum += 1;
4885 : }
4886 :
4887 100 : let cutoff = tline.get_last_record_lsn();
4888 100 :
4889 100 : tline
4890 100 : .update_gc_info(
4891 100 : Vec::new(),
4892 100 : cutoff,
4893 100 : Duration::ZERO,
4894 100 : &CancellationToken::new(),
4895 100 : &ctx,
4896 100 : )
4897 0 : .await?;
4898 100 : tline.freeze_and_flush().await?;
4899 100 : tline
4900 100 : .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
4901 18642 : .await?;
4902 100 : tline.gc().await?;
4903 : }
4904 :
4905 2 : Ok(())
4906 : }
4907 :
4908 2 : #[tokio::test]
4909 2 : async fn test_random_updates() -> anyhow::Result<()> {
4910 2 : let harness = TenantHarness::create("test_random_updates")?;
4911 2 : let (tenant, ctx) = harness.load().await;
4912 2 : let tline = tenant
4913 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
4914 6 : .await?;
4915 :
4916 : const NUM_KEYS: usize = 1000;
4917 :
4918 2 : let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
4919 2 :
4920 2 : let mut keyspace = KeySpaceAccum::new();
4921 2 :
4922 2 : // Track when each page was last modified. Used to assert that
4923 2 : // a read sees the latest page version.
4924 2 : let mut updated = [Lsn(0); NUM_KEYS];
4925 2 :
4926 2 : let mut lsn = Lsn(0x10);
4927 : #[allow(clippy::needless_range_loop)]
4928 2002 : for blknum in 0..NUM_KEYS {
4929 2000 : lsn = Lsn(lsn.0 + 0x10);
4930 2000 : test_key.field6 = blknum as u32;
4931 2000 : let writer = tline.writer().await;
4932 2000 : writer
4933 2000 : .put(
4934 2000 : test_key,
4935 2000 : lsn,
4936 2000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
4937 2000 : &ctx,
4938 2000 : )
4939 33 : .await?;
4940 2000 : writer.finish_write(lsn);
4941 2000 : updated[blknum] = lsn;
4942 2000 : drop(writer);
4943 2000 :
4944 2000 : keyspace.add_key(test_key);
4945 : }
4946 :
4947 102 : for _ in 0..50 {
4948 100100 : for _ in 0..NUM_KEYS {
4949 100000 : lsn = Lsn(lsn.0 + 0x10);
4950 100000 : let blknum = thread_rng().gen_range(0..NUM_KEYS);
4951 100000 : test_key.field6 = blknum as u32;
4952 100000 : let writer = tline.writer().await;
4953 100000 : writer
4954 100000 : .put(
4955 100000 : test_key,
4956 100000 : lsn,
4957 100000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
4958 100000 : &ctx,
4959 100000 : )
4960 1604 : .await?;
4961 100000 : writer.finish_write(lsn);
4962 100000 : drop(writer);
4963 100000 : updated[blknum] = lsn;
4964 : }
4965 :
4966 : // Read all the blocks
4967 100000 : for (blknum, last_lsn) in updated.iter().enumerate() {
4968 100000 : test_key.field6 = blknum as u32;
4969 100000 : assert_eq!(
4970 100000 : tline.get(test_key, lsn, &ctx).await?,
4971 100000 : TEST_IMG(&format!("{} at {}", blknum, last_lsn))
4972 : );
4973 : }
4974 :
4975 : // Perform a cycle of flush, compact, and GC
4976 100 : let cutoff = tline.get_last_record_lsn();
4977 100 : tline
4978 100 : .update_gc_info(
4979 100 : Vec::new(),
4980 100 : cutoff,
4981 100 : Duration::ZERO,
4982 100 : &CancellationToken::new(),
4983 100 : &ctx,
4984 100 : )
4985 0 : .await?;
4986 100 : tline.freeze_and_flush().await?;
4987 100 : tline
4988 100 : .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
4989 2279 : .await?;
4990 100 : tline.gc().await?;
4991 : }
4992 :
4993 2 : Ok(())
4994 : }
4995 :
4996 2 : #[tokio::test]
4997 2 : async fn test_traverse_branches() -> anyhow::Result<()> {
4998 2 : let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
4999 2 : .load()
5000 2 : .await;
5001 2 : let mut tline = tenant
5002 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
5003 7 : .await?;
5004 :
5005 : const NUM_KEYS: usize = 1000;
5006 :
5007 2 : let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
5008 2 :
5009 2 : let mut keyspace = KeySpaceAccum::new();
5010 2 :
5011 2 : // Track when each page was last modified. Used to assert that
5012 2 : // a read sees the latest page version.
5013 2 : let mut updated = [Lsn(0); NUM_KEYS];
5014 2 :
5015 2 : let mut lsn = Lsn(0x10);
5016 : #[allow(clippy::needless_range_loop)]
5017 2002 : for blknum in 0..NUM_KEYS {
5018 2000 : lsn = Lsn(lsn.0 + 0x10);
5019 2000 : test_key.field6 = blknum as u32;
5020 2000 : let writer = tline.writer().await;
5021 2000 : writer
5022 2000 : .put(
5023 2000 : test_key,
5024 2000 : lsn,
5025 2000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
5026 2000 : &ctx,
5027 2000 : )
5028 33 : .await?;
5029 2000 : writer.finish_write(lsn);
5030 2000 : updated[blknum] = lsn;
5031 2000 : drop(writer);
5032 2000 :
5033 2000 : keyspace.add_key(test_key);
5034 : }
5035 :
5036 102 : for _ in 0..50 {
5037 100 : let new_tline_id = TimelineId::generate();
5038 100 : tenant
5039 100 : .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
5040 101 : .await?;
5041 100 : tline = tenant
5042 100 : .get_timeline(new_tline_id, true)
5043 100 : .expect("Should have the branched timeline");
5044 :
5045 100100 : for _ in 0..NUM_KEYS {
5046 100000 : lsn = Lsn(lsn.0 + 0x10);
5047 100000 : let blknum = thread_rng().gen_range(0..NUM_KEYS);
5048 100000 : test_key.field6 = blknum as u32;
5049 100000 : let writer = tline.writer().await;
5050 100000 : writer
5051 100000 : .put(
5052 100000 : test_key,
5053 100000 : lsn,
5054 100000 : &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
5055 100000 : &ctx,
5056 100000 : )
5057 1640 : .await?;
5058 100000 : println!("updating {} at {}", blknum, lsn);
5059 100000 : writer.finish_write(lsn);
5060 100000 : drop(writer);
5061 100000 : updated[blknum] = lsn;
5062 : }
5063 :
5064 : // Read all the blocks
5065 100000 : for (blknum, last_lsn) in updated.iter().enumerate() {
5066 100000 : test_key.field6 = blknum as u32;
5067 100000 : assert_eq!(
5068 100000 : tline.get(test_key, lsn, &ctx).await?,
5069 100000 : TEST_IMG(&format!("{} at {}", blknum, last_lsn))
5070 : );
5071 : }
5072 :
5073 : // Perform a cycle of flush, compact, and GC
5074 100 : let cutoff = tline.get_last_record_lsn();
5075 100 : tline
5076 100 : .update_gc_info(
5077 100 : Vec::new(),
5078 100 : cutoff,
5079 100 : Duration::ZERO,
5080 100 : &CancellationToken::new(),
5081 100 : &ctx,
5082 100 : )
5083 0 : .await?;
5084 102 : tline.freeze_and_flush().await?;
5085 100 : tline
5086 100 : .compact(&CancellationToken::new(), EnumSet::empty(), &ctx)
5087 13317 : .await?;
5088 100 : tline.gc().await?;
5089 : }
5090 :
5091 2 : Ok(())
5092 : }
5093 :
5094 2 : #[tokio::test]
5095 2 : async fn test_traverse_ancestors() -> anyhow::Result<()> {
5096 2 : let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
5097 2 : .load()
5098 2 : .await;
5099 2 : let mut tline = tenant
5100 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
5101 7 : .await?;
5102 :
5103 : const NUM_KEYS: usize = 100;
5104 : const NUM_TLINES: usize = 50;
5105 :
5106 2 : let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
5107 2 : // Track page mutation lsns across different timelines.
5108 2 : let mut updated = [[Lsn(0); NUM_KEYS]; NUM_TLINES];
5109 2 :
5110 2 : let mut lsn = Lsn(0x10);
5111 :
5112 : #[allow(clippy::needless_range_loop)]
5113 102 : for idx in 0..NUM_TLINES {
5114 100 : let new_tline_id = TimelineId::generate();
5115 100 : tenant
5116 100 : .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
5117 106 : .await?;
5118 100 : tline = tenant
5119 100 : .get_timeline(new_tline_id, true)
5120 100 : .expect("Should have the branched timeline");
5121 :
5122 10100 : for _ in 0..NUM_KEYS {
5123 10000 : lsn = Lsn(lsn.0 + 0x10);
5124 10000 : let blknum = thread_rng().gen_range(0..NUM_KEYS);
5125 10000 : test_key.field6 = blknum as u32;
5126 10000 : let writer = tline.writer().await;
5127 10000 : writer
5128 10000 : .put(
5129 10000 : test_key,
5130 10000 : lsn,
5131 10000 : &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
5132 10000 : &ctx,
5133 10000 : )
5134 174 : .await?;
5135 10000 : println!("updating [{}][{}] at {}", idx, blknum, lsn);
5136 10000 : writer.finish_write(lsn);
5137 10000 : drop(writer);
5138 10000 : updated[idx][blknum] = lsn;
5139 : }
5140 : }
5141 :
5142 : // Read pages from leaf timeline across all ancestors.
5143 100 : for (idx, lsns) in updated.iter().enumerate() {
5144 10000 : for (blknum, lsn) in lsns.iter().enumerate() {
5145 : // Skip empty mutations.
5146 10000 : if lsn.0 == 0 {
5147 3642 : continue;
5148 6358 : }
5149 6358 : println!("checking [{idx}][{blknum}] at {lsn}");
5150 6358 : test_key.field6 = blknum as u32;
5151 6358 : assert_eq!(
5152 6358 : tline.get(test_key, *lsn, &ctx).await?,
5153 6358 : TEST_IMG(&format!("{idx} {blknum} at {lsn}"))
5154 : );
5155 : }
5156 : }
5157 2 : Ok(())
5158 : }
5159 :
5160 2 : #[tokio::test]
5161 2 : async fn test_write_at_initdb_lsn_takes_optimization_code_path() -> anyhow::Result<()> {
5162 2 : let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")?
5163 2 : .load()
5164 2 : .await;
5165 :
5166 2 : let initdb_lsn = Lsn(0x20);
5167 2 : let utline = tenant
5168 2 : .create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
5169 3 : .await?;
5170 2 : let tline = utline.raw_timeline().unwrap();
5171 2 :
5172 2 : // Spawn flush loop now so that we can set the `expect_initdb_optimization`
5173 2 : tline.maybe_spawn_flush_loop();
5174 2 :
5175 2 : // Make sure the timeline has the minimum set of required keys for operation.
5176 2 : // The only operation you can always do on an empty timeline is to `put` new data.
5177 2 : // Except if you `put` at `initdb_lsn`.
5178 2 : // In that case, there's an optimization to directly create image layers instead of delta layers.
5179 2 : // It uses `repartition()`, which assumes some keys to be present.
5180 2 : // Let's make sure the test timeline can handle that case.
5181 2 : {
5182 2 : let mut state = tline.flush_loop_state.lock().unwrap();
5183 2 : assert_eq!(
5184 2 : timeline::FlushLoopState::Running {
5185 2 : expect_initdb_optimization: false,
5186 2 : initdb_optimization_count: 0,
5187 2 : },
5188 2 : *state
5189 2 : );
5190 2 : *state = timeline::FlushLoopState::Running {
5191 2 : expect_initdb_optimization: true,
5192 2 : initdb_optimization_count: 0,
5193 2 : };
5194 2 : }
5195 2 :
5196 2 : // Make writes at the initdb_lsn. When we flush it below, it should be handled by the optimization.
5197 2 : // As explained above, the optimization requires some keys to be present.
5198 2 : // As per `create_empty_timeline` documentation, use init_empty to set them.
5199 2 : // This is what `create_test_timeline` does, by the way.
5200 2 : let mut modification = tline.begin_modification(initdb_lsn);
5201 2 : modification
5202 2 : .init_empty_test_timeline()
5203 2 : .context("init_empty_test_timeline")?;
5204 2 : modification
5205 2 : .commit(&ctx)
5206 1 : .await
5207 2 : .context("commit init_empty_test_timeline modification")?;
5208 :
5209 : // Do the flush. The flush code will check the expectations that we set above.
5210 2 : tline.freeze_and_flush().await?;
5211 :
5212 : // assert freeze_and_flush exercised the initdb optimization
5213 : {
5214 2 : let state = tline.flush_loop_state.lock().unwrap();
5215 : let timeline::FlushLoopState::Running {
5216 2 : expect_initdb_optimization,
5217 2 : initdb_optimization_count,
5218 2 : } = *state
5219 : else {
5220 0 : panic!("unexpected state: {:?}", *state);
5221 : };
5222 2 : assert!(expect_initdb_optimization);
5223 2 : assert!(initdb_optimization_count > 0);
5224 : }
5225 2 : Ok(())
5226 : }
5227 :
5228 2 : #[tokio::test]
5229 2 : async fn test_uninit_mark_crash() -> anyhow::Result<()> {
5230 2 : let name = "test_uninit_mark_crash";
5231 2 : let harness = TenantHarness::create(name)?;
5232 : {
5233 2 : let (tenant, ctx) = harness.load().await;
5234 2 : let tline = tenant
5235 2 : .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
5236 3 : .await?;
5237 : // Keeps uninit mark in place
5238 2 : let raw_tline = tline.raw_timeline().unwrap();
5239 2 : raw_tline
5240 2 : .shutdown()
5241 2 : .instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_shard_id, shard_id=%raw_tline.tenant_shard_id.shard_slug(), timeline_id=%TIMELINE_ID))
5242 0 : .await;
5243 2 : std::mem::forget(tline);
5244 : }
5245 :
5246 2 : let (tenant, _) = harness.load().await;
5247 2 : match tenant.get_timeline(TIMELINE_ID, false) {
5248 0 : Ok(_) => panic!("timeline should've been removed during load"),
5249 2 : Err(e) => {
5250 2 : assert_eq!(
5251 2 : e,
5252 2 : GetTimelineError::NotFound {
5253 2 : tenant_id: tenant.tenant_shard_id,
5254 2 : timeline_id: TIMELINE_ID,
5255 2 : }
5256 2 : )
5257 : }
5258 : }
5259 :
5260 2 : assert!(!harness
5261 2 : .conf
5262 2 : .timeline_path(&tenant.tenant_shard_id, &TIMELINE_ID)
5263 2 : .exists());
5264 :
5265 2 : assert!(!harness
5266 2 : .conf
5267 2 : .timeline_uninit_mark_file_path(tenant.tenant_shard_id, TIMELINE_ID)
5268 2 : .exists());
5269 :
5270 2 : Ok(())
5271 : }
5272 : }
|