LCOV - code coverage report
Current view: top level - pageserver/src/tenant - timeline.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 60.3 % 3092 1863
Test Date: 2024-02-29 11:57:12 Functions: 44.4 % 446 198

            Line data    Source code
       1              : mod compaction;
       2              : pub mod delete;
       3              : mod eviction_task;
       4              : mod init;
       5              : pub mod layer_manager;
       6              : pub(crate) mod logical_size;
       7              : pub mod span;
       8              : pub mod uninit;
       9              : mod walreceiver;
      10              : 
      11              : use anyhow::{anyhow, bail, ensure, Context, Result};
      12              : use bytes::Bytes;
      13              : use camino::{Utf8Path, Utf8PathBuf};
      14              : use enumset::EnumSet;
      15              : use fail::fail_point;
      16              : use futures::stream::StreamExt;
      17              : use itertools::Itertools;
      18              : use once_cell::sync::Lazy;
      19              : use pageserver_api::{
      20              :     keyspace::KeySpaceAccum,
      21              :     models::{
      22              :         CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
      23              :         EvictionPolicy, LayerMapInfo, TimelineState,
      24              :     },
      25              :     reltag::BlockNumber,
      26              :     shard::{ShardIdentity, TenantShardId},
      27              : };
      28              : use rand::Rng;
      29              : use serde_with::serde_as;
      30              : use std::ops::{Deref, Range};
      31              : use std::pin::pin;
      32              : use std::sync::atomic::Ordering as AtomicOrdering;
      33              : use std::sync::{Arc, Mutex, RwLock, Weak};
      34              : use std::time::{Duration, Instant, SystemTime};
      35              : use std::{
      36              :     array,
      37              :     collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
      38              :     sync::atomic::AtomicU64,
      39              : };
      40              : use std::{
      41              :     cmp::{max, min, Ordering},
      42              :     ops::ControlFlow,
      43              : };
      44              : use storage_broker::BrokerClientChannel;
      45              : use tokio::{
      46              :     runtime::Handle,
      47              :     sync::{oneshot, watch},
      48              : };
      49              : use tokio_util::sync::CancellationToken;
      50              : use tracing::*;
      51              : use utils::sync::gate::{Gate, GateGuard};
      52              : 
      53              : use crate::pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind};
      54              : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
      55              : use crate::tenant::{
      56              :     layer_map::{LayerMap, SearchResult},
      57              :     metadata::TimelineMetadata,
      58              :     par_fsync,
      59              : };
      60              : use crate::{
      61              :     context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder},
      62              :     disk_usage_eviction_task::DiskUsageEvictionInfo,
      63              :     pgdatadir_mapping::CollectKeySpaceError,
      64              : };
      65              : use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
      66              : use crate::{
      67              :     disk_usage_eviction_task::finite_f32,
      68              :     tenant::storage_layer::{
      69              :         AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
      70              :         LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
      71              :         ValueReconstructState, ValuesReconstructState,
      72              :     },
      73              : };
      74              : use crate::{
      75              :     disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
      76              : };
      77              : use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
      78              : 
      79              : use crate::config::PageServerConf;
      80              : use crate::keyspace::{KeyPartitioning, KeySpace};
      81              : use crate::metrics::{
      82              :     TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
      83              : };
      84              : use crate::pgdatadir_mapping::CalculateLogicalSizeError;
      85              : use crate::tenant::config::TenantConfOpt;
      86              : use pageserver_api::key::{is_inherited_key, is_rel_fsm_block_key, is_rel_vm_block_key};
      87              : use pageserver_api::reltag::RelTag;
      88              : use pageserver_api::shard::ShardIndex;
      89              : 
      90              : use postgres_connection::PgConnectionConfig;
      91              : use postgres_ffi::to_pg_timestamp;
      92              : use utils::{
      93              :     completion,
      94              :     generation::Generation,
      95              :     id::TimelineId,
      96              :     lsn::{AtomicLsn, Lsn, RecordLsn},
      97              :     seqwait::SeqWait,
      98              :     simple_rcu::{Rcu, RcuReadGuard},
      99              : };
     100              : 
     101              : use crate::page_cache;
     102              : use crate::repository::GcResult;
     103              : use crate::repository::{Key, Value};
     104              : use crate::task_mgr;
     105              : use crate::task_mgr::TaskKind;
     106              : use crate::ZERO_PAGE;
     107              : 
     108              : use self::delete::DeleteTimelineFlow;
     109              : pub(super) use self::eviction_task::EvictionTaskTenantState;
     110              : use self::eviction_task::EvictionTaskTimelineState;
     111              : use self::layer_manager::LayerManager;
     112              : use self::logical_size::LogicalSize;
     113              : use self::walreceiver::{WalReceiver, WalReceiverConf};
     114              : 
     115              : use super::remote_timeline_client::RemoteTimelineClient;
     116              : use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
     117              : use super::{config::TenantConf, storage_layer::ReadableLayerDesc};
     118              : use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
     119              : use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
     120              : 
     121            2 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
     122              : pub(super) enum FlushLoopState {
     123              :     NotStarted,
     124              :     Running {
     125              :         #[cfg(test)]
     126              :         expect_initdb_optimization: bool,
     127              :         #[cfg(test)]
     128              :         initdb_optimization_count: usize,
     129              :     },
     130              :     Exited,
     131              : }
     132              : 
     133              : /// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
     134            0 : #[derive(Debug, Clone, PartialEq, Eq)]
     135              : pub(crate) struct Hole {
     136              :     key_range: Range<Key>,
     137              :     coverage_size: usize,
     138              : }
     139              : 
     140              : impl Ord for Hole {
     141            0 :     fn cmp(&self, other: &Self) -> Ordering {
     142            0 :         other.coverage_size.cmp(&self.coverage_size) // inverse order
     143            0 :     }
     144              : }
     145              : 
     146              : impl PartialOrd for Hole {
     147            0 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     148            0 :         Some(self.cmp(other))
     149            0 :     }
     150              : }
     151              : 
     152              : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
     153              : /// Can be removed after all refactors are done.
     154           30 : fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
     155           30 :     drop(rlock)
     156           30 : }
     157              : 
     158              : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
     159              : /// Can be removed after all refactors are done.
     160          512 : fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
     161          512 :     drop(rlock)
     162          512 : }
     163              : 
     164              : /// The outward-facing resources required to build a Timeline
     165              : pub struct TimelineResources {
     166              :     pub remote_client: Option<RemoteTimelineClient>,
     167              :     pub deletion_queue_client: DeletionQueueClient,
     168              :     pub timeline_get_throttle: Arc<
     169              :         crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
     170              :     >,
     171              : }
     172              : 
     173              : pub(crate) struct AuxFilesState {
     174              :     pub(crate) dir: Option<AuxFilesDirectory>,
     175              :     pub(crate) n_deltas: usize,
     176              : }
     177              : 
     178              : pub struct Timeline {
     179              :     conf: &'static PageServerConf,
     180              :     tenant_conf: Arc<RwLock<AttachedTenantConf>>,
     181              : 
     182              :     myself: Weak<Self>,
     183              : 
     184              :     pub(crate) tenant_shard_id: TenantShardId,
     185              :     pub timeline_id: TimelineId,
     186              : 
     187              :     /// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
     188              :     /// Never changes for the lifetime of this [`Timeline`] object.
     189              :     ///
     190              :     /// This duplicates the generation stored in LocationConf, but that structure is mutable:
     191              :     /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
     192              :     pub(crate) generation: Generation,
     193              : 
     194              :     /// The detailed sharding information from our parent Tenant.  This enables us to map keys
     195              :     /// to shards, and is constant through the lifetime of this Timeline.
     196              :     shard_identity: ShardIdentity,
     197              : 
     198              :     pub pg_version: u32,
     199              : 
     200              :     /// The tuple has two elements.
     201              :     /// 1. `LayerFileManager` keeps track of the various physical representations of the layer files (inmem, local, remote).
     202              :     /// 2. `LayerMap`, the acceleration data structure for `get_reconstruct_data`.
     203              :     ///
     204              :     /// `LayerMap` maps out the `(PAGE,LSN) / (KEY,LSN)` space, which is composed of `(KeyRange, LsnRange)` rectangles.
     205              :     /// We describe these rectangles through the `PersistentLayerDesc` struct.
     206              :     ///
     207              :     /// When we want to reconstruct a page, we first find the `PersistentLayerDesc`'s that we need for page reconstruction,
     208              :     /// using `LayerMap`. Then, we use `LayerFileManager` to get the `PersistentLayer`'s that correspond to the
     209              :     /// `PersistentLayerDesc`'s.
     210              :     ///
     211              :     /// Hence, it's important to keep things coherent. The `LayerFileManager` must always have an entry for all
     212              :     /// `PersistentLayerDesc`'s in the `LayerMap`. If it doesn't, `LayerFileManager::get_from_desc` will panic at
     213              :     /// runtime, e.g., during page reconstruction.
     214              :     ///
     215              :     /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
     216              :     /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
     217              :     pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
     218              : 
     219              :     last_freeze_at: AtomicLsn,
     220              :     // Atomic would be more appropriate here.
     221              :     last_freeze_ts: RwLock<Instant>,
     222              : 
     223              :     // WAL redo manager. `None` only for broken tenants.
     224              :     walredo_mgr: Option<Arc<super::WalRedoManager>>,
     225              : 
     226              :     /// Remote storage client.
     227              :     /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
     228              :     pub remote_client: Option<Arc<RemoteTimelineClient>>,
     229              : 
     230              :     // What page versions do we hold in the repository? If we get a
     231              :     // request > last_record_lsn, we need to wait until we receive all
     232              :     // the WAL up to the request. The SeqWait provides functions for
     233              :     // that. TODO: If we get a request for an old LSN, such that the
     234              :     // versions have already been garbage collected away, we should
     235              :     // throw an error, but we don't track that currently.
     236              :     //
     237              :     // last_record_lsn.load().last points to the end of last processed WAL record.
     238              :     //
     239              :     // We also remember the starting point of the previous record in
     240              :     // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
     241              :     // first WAL record when the node is started up. But here, we just
     242              :     // keep track of it.
     243              :     last_record_lsn: SeqWait<RecordLsn, Lsn>,
     244              : 
     245              :     // All WAL records have been processed and stored durably on files on
     246              :     // local disk, up to this LSN. On crash and restart, we need to re-process
     247              :     // the WAL starting from this point.
     248              :     //
     249              :     // Some later WAL records might have been processed and also flushed to disk
     250              :     // already, so don't be surprised to see some, but there's no guarantee on
     251              :     // them yet.
     252              :     disk_consistent_lsn: AtomicLsn,
     253              : 
     254              :     // Parent timeline that this timeline was branched from, and the LSN
     255              :     // of the branch point.
     256              :     ancestor_timeline: Option<Arc<Timeline>>,
     257              :     ancestor_lsn: Lsn,
     258              : 
     259              :     pub(super) metrics: TimelineMetrics,
     260              : 
     261              :     // `Timeline` doesn't write these metrics itself, but it manages the lifetime.  Code
     262              :     // in `crate::page_service` writes these metrics.
     263              :     pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
     264              : 
     265              :     directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM],
     266              : 
     267              :     /// Ensures layers aren't frozen by checkpointer between
     268              :     /// [`Timeline::get_layer_for_write`] and layer reads.
     269              :     /// Locked automatically by [`TimelineWriter`] and checkpointer.
     270              :     /// Must always be acquired before the layer map/individual layer lock
     271              :     /// to avoid deadlock.
     272              :     write_lock: tokio::sync::Mutex<()>,
     273              : 
     274              :     /// Used to avoid multiple `flush_loop` tasks running
     275              :     pub(super) flush_loop_state: Mutex<FlushLoopState>,
     276              : 
     277              :     /// layer_flush_start_tx can be used to wake up the layer-flushing task.
     278              :     /// The value is a counter, incremented every time a new flush cycle is requested.
     279              :     /// The flush cycle counter is sent back on the layer_flush_done channel when
     280              :     /// the flush finishes. You can use that to wait for the flush to finish.
     281              :     layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
     282              :     /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
     283              :     layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
     284              : 
     285              :     // Needed to ensure that we can't create a branch at a point that was already garbage collected
     286              :     pub latest_gc_cutoff_lsn: Rcu<Lsn>,
     287              : 
     288              :     // List of child timelines and their branch points. This is needed to avoid
     289              :     // garbage collecting data that is still needed by the child timelines.
     290              :     pub gc_info: std::sync::RwLock<GcInfo>,
     291              : 
     292              :     // It may change across major versions so for simplicity
     293              :     // keep it after running initdb for a timeline.
     294              :     // It is needed in checks when we want to error on some operations
     295              :     // when they are requested for pre-initdb lsn.
     296              :     // It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn",
     297              :     // though let's keep them both for better error visibility.
     298              :     pub initdb_lsn: Lsn,
     299              : 
     300              :     /// When did we last calculate the partitioning?
     301              :     partitioning: tokio::sync::Mutex<(KeyPartitioning, Lsn)>,
     302              : 
     303              :     /// Configuration: how often should the partitioning be recalculated.
     304              :     repartition_threshold: u64,
     305              : 
     306              :     /// Current logical size of the "datadir", at the last LSN.
     307              :     current_logical_size: LogicalSize,
     308              : 
     309              :     /// Information about the last processed message by the WAL receiver,
     310              :     /// or None if WAL receiver has not received anything for this timeline
     311              :     /// yet.
     312              :     pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
     313              :     pub walreceiver: Mutex<Option<WalReceiver>>,
     314              : 
     315              :     /// Relation size cache
     316              :     pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
     317              : 
     318              :     download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
     319              : 
     320              :     state: watch::Sender<TimelineState>,
     321              : 
     322              :     /// Prevent two tasks from deleting the timeline at the same time. If held, the
     323              :     /// timeline is being deleted. If 'true', the timeline has already been deleted.
     324              :     pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
     325              : 
     326              :     eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
     327              : 
     328              :     /// Load or creation time information about the disk_consistent_lsn and when the loading
     329              :     /// happened. Used for consumption metrics.
     330              :     pub(crate) loaded_at: (Lsn, SystemTime),
     331              : 
     332              :     /// Gate to prevent shutdown completing while I/O is still happening to this timeline's data
     333              :     pub(crate) gate: Gate,
     334              : 
     335              :     /// Cancellation token scoped to this timeline: anything doing long-running work relating
     336              :     /// to the timeline should drop out when this token fires.
     337              :     pub(crate) cancel: CancellationToken,
     338              : 
     339              :     /// Make sure we only have one running compaction at a time in tests.
     340              :     ///
     341              :     /// Must only be taken in two places:
     342              :     /// - [`Timeline::compact`] (this file)
     343              :     /// - [`delete::delete_local_timeline_directory`]
     344              :     ///
     345              :     /// Timeline deletion will acquire both compaction and gc locks in whatever order.
     346              :     compaction_lock: tokio::sync::Mutex<()>,
     347              : 
     348              :     /// Make sure we only have one running gc at a time.
     349              :     ///
     350              :     /// Must only be taken in two places:
     351              :     /// - [`Timeline::gc`] (this file)
     352              :     /// - [`delete::delete_local_timeline_directory`]
     353              :     ///
     354              :     /// Timeline deletion will acquire both compaction and gc locks in whatever order.
     355              :     gc_lock: tokio::sync::Mutex<()>,
     356              : 
     357              :     /// Cloned from [`super::Tenant::timeline_get_throttle`] on construction.
     358              :     timeline_get_throttle: Arc<
     359              :         crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
     360              :     >,
     361              : 
     362              :     /// Keep aux directory cache to avoid it's reconstruction on each update
     363              :     pub(crate) aux_files: tokio::sync::Mutex<AuxFilesState>,
     364              : }
     365              : 
     366              : pub struct WalReceiverInfo {
     367              :     pub wal_source_connconf: PgConnectionConfig,
     368              :     pub last_received_msg_lsn: Lsn,
     369              :     pub last_received_msg_ts: u128,
     370              : }
     371              : 
     372              : ///
     373              : /// Information about how much history needs to be retained, needed by
     374              : /// Garbage Collection.
     375              : ///
     376              : pub struct GcInfo {
     377              :     /// Specific LSNs that are needed.
     378              :     ///
     379              :     /// Currently, this includes all points where child branches have
     380              :     /// been forked off from. In the future, could also include
     381              :     /// explicit user-defined snapshot points.
     382              :     pub retain_lsns: Vec<Lsn>,
     383              : 
     384              :     /// In addition to 'retain_lsns', keep everything newer than this
     385              :     /// point.
     386              :     ///
     387              :     /// This is calculated by subtracting 'gc_horizon' setting from
     388              :     /// last-record LSN
     389              :     ///
     390              :     /// FIXME: is this inclusive or exclusive?
     391              :     pub horizon_cutoff: Lsn,
     392              : 
     393              :     /// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this
     394              :     /// point.
     395              :     ///
     396              :     /// This is calculated by finding a number such that a record is needed for PITR
     397              :     /// if only if its LSN is larger than 'pitr_cutoff'.
     398              :     pub pitr_cutoff: Lsn,
     399              : }
     400              : 
     401              : /// An error happened in a get() operation.
     402            2 : #[derive(thiserror::Error, Debug)]
     403              : pub(crate) enum PageReconstructError {
     404              :     #[error(transparent)]
     405              :     Other(#[from] anyhow::Error),
     406              : 
     407              :     #[error("Ancestor LSN wait error: {0}")]
     408              :     AncestorLsnTimeout(#[from] WaitLsnError),
     409              : 
     410              :     #[error("timeline shutting down")]
     411              :     Cancelled,
     412              : 
     413              :     /// The ancestor of this is being stopped
     414              :     #[error("ancestor timeline {0} is being stopped")]
     415              :     AncestorStopping(TimelineId),
     416              : 
     417              :     /// An error happened replaying WAL records
     418              :     #[error(transparent)]
     419              :     WalRedo(anyhow::Error),
     420              : }
     421              : 
     422              : impl PageReconstructError {
     423              :     /// Returns true if this error indicates a tenant/timeline shutdown alike situation
     424            0 :     pub(crate) fn is_stopping(&self) -> bool {
     425            0 :         use PageReconstructError::*;
     426            0 :         match self {
     427            0 :             Other(_) => false,
     428            0 :             AncestorLsnTimeout(_) => false,
     429            0 :             Cancelled | AncestorStopping(_) => true,
     430            0 :             WalRedo(_) => false,
     431              :         }
     432            0 :     }
     433              : }
     434              : 
     435            0 : #[derive(thiserror::Error, Debug)]
     436              : enum CreateImageLayersError {
     437              :     #[error("timeline shutting down")]
     438              :     Cancelled,
     439              : 
     440              :     #[error(transparent)]
     441              :     GetVectoredError(GetVectoredError),
     442              : 
     443              :     #[error(transparent)]
     444              :     PageReconstructError(PageReconstructError),
     445              : 
     446              :     #[error(transparent)]
     447              :     Other(#[from] anyhow::Error),
     448              : }
     449              : 
     450            0 : #[derive(thiserror::Error, Debug)]
     451              : enum FlushLayerError {
     452              :     /// Timeline cancellation token was cancelled
     453              :     #[error("timeline shutting down")]
     454              :     Cancelled,
     455              : 
     456              :     #[error(transparent)]
     457              :     CreateImageLayersError(CreateImageLayersError),
     458              : 
     459              :     #[error(transparent)]
     460              :     Other(#[from] anyhow::Error),
     461              : }
     462              : 
     463            0 : #[derive(thiserror::Error, Debug)]
     464              : pub(crate) enum GetVectoredError {
     465              :     #[error("timeline shutting down")]
     466              :     Cancelled,
     467              : 
     468              :     #[error("Requested too many keys: {0} > {}", Timeline::MAX_GET_VECTORED_KEYS)]
     469              :     Oversized(u64),
     470              : 
     471              :     #[error("Requested at invalid LSN: {0}")]
     472              :     InvalidLsn(Lsn),
     473              : 
     474              :     #[error("Requested key {0} not found")]
     475              :     MissingKey(Key),
     476              : 
     477              :     #[error(transparent)]
     478              :     GetReadyAncestorError(GetReadyAncestorError),
     479              : 
     480              :     #[error(transparent)]
     481              :     Other(#[from] anyhow::Error),
     482              : }
     483              : 
     484            0 : #[derive(thiserror::Error, Debug)]
     485              : pub(crate) enum GetReadyAncestorError {
     486              :     #[error("ancestor timeline {0} is being stopped")]
     487              :     AncestorStopping(TimelineId),
     488              : 
     489              :     #[error("Ancestor LSN wait error: {0}")]
     490              :     AncestorLsnTimeout(#[from] WaitLsnError),
     491              : 
     492              :     #[error("Cancelled")]
     493              :     Cancelled,
     494              : 
     495              :     #[error(transparent)]
     496              :     Other(#[from] anyhow::Error),
     497              : }
     498              : 
     499            0 : #[derive(Clone, Copy)]
     500              : pub enum LogicalSizeCalculationCause {
     501              :     Initial,
     502              :     ConsumptionMetricsSyntheticSize,
     503              :     EvictionTaskImitation,
     504              :     TenantSizeHandler,
     505              : }
     506              : 
     507              : pub enum GetLogicalSizePriority {
     508              :     User,
     509              :     Background,
     510              : }
     511              : 
     512            0 : #[derive(enumset::EnumSetType)]
     513              : pub(crate) enum CompactFlags {
     514              :     ForceRepartition,
     515              :     ForceImageLayerCreation,
     516              : }
     517              : 
     518              : impl std::fmt::Debug for Timeline {
     519            0 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
     520            0 :         write!(f, "Timeline<{}>", self.timeline_id)
     521            0 :     }
     522              : }
     523              : 
     524            0 : #[derive(thiserror::Error, Debug)]
     525              : pub(crate) enum WaitLsnError {
     526              :     // Called on a timeline which is shutting down
     527              :     #[error("Shutdown")]
     528              :     Shutdown,
     529              : 
     530              :     // Called on an timeline not in active state or shutting down
     531              :     #[error("Bad state (not active)")]
     532              :     BadState,
     533              : 
     534              :     // Timeout expired while waiting for LSN to catch up with goal.
     535              :     #[error("{0}")]
     536              :     Timeout(String),
     537              : }
     538              : 
     539              : // The impls below achieve cancellation mapping for errors.
     540              : // Perhaps there's a way of achieving this with less cruft.
     541              : 
     542              : impl From<CreateImageLayersError> for CompactionError {
     543            0 :     fn from(e: CreateImageLayersError) -> Self {
     544            0 :         match e {
     545            0 :             CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
     546            0 :             _ => CompactionError::Other(e.into()),
     547              :         }
     548            0 :     }
     549              : }
     550              : 
     551              : impl From<CreateImageLayersError> for FlushLayerError {
     552            0 :     fn from(e: CreateImageLayersError) -> Self {
     553            0 :         match e {
     554            0 :             CreateImageLayersError::Cancelled => FlushLayerError::Cancelled,
     555            0 :             any => FlushLayerError::CreateImageLayersError(any),
     556              :         }
     557            0 :     }
     558              : }
     559              : 
     560              : impl From<PageReconstructError> for CreateImageLayersError {
     561            0 :     fn from(e: PageReconstructError) -> Self {
     562            0 :         match e {
     563            0 :             PageReconstructError::Cancelled => CreateImageLayersError::Cancelled,
     564            0 :             _ => CreateImageLayersError::PageReconstructError(e),
     565              :         }
     566            0 :     }
     567              : }
     568              : 
     569              : impl From<GetVectoredError> for CreateImageLayersError {
     570            0 :     fn from(e: GetVectoredError) -> Self {
     571            0 :         match e {
     572            0 :             GetVectoredError::Cancelled => CreateImageLayersError::Cancelled,
     573            0 :             _ => CreateImageLayersError::GetVectoredError(e),
     574              :         }
     575            0 :     }
     576              : }
     577              : 
     578              : impl From<GetReadyAncestorError> for PageReconstructError {
     579            2 :     fn from(e: GetReadyAncestorError) -> Self {
     580            2 :         use GetReadyAncestorError::*;
     581            2 :         match e {
     582            0 :             AncestorStopping(tid) => PageReconstructError::AncestorStopping(tid),
     583            0 :             AncestorLsnTimeout(wait_err) => PageReconstructError::AncestorLsnTimeout(wait_err),
     584            0 :             Cancelled => PageReconstructError::Cancelled,
     585            2 :             Other(other) => PageReconstructError::Other(other),
     586              :         }
     587            2 :     }
     588              : }
     589              : 
     590              : #[derive(
     591              :     Eq,
     592            4 :     PartialEq,
     593            0 :     Debug,
     594              :     Copy,
     595            0 :     Clone,
     596          119 :     strum_macros::EnumString,
     597            0 :     strum_macros::Display,
     598            0 :     serde_with::DeserializeFromStr,
     599            0 :     serde_with::SerializeDisplay,
     600              : )]
     601              : #[strum(serialize_all = "kebab-case")]
     602              : pub enum GetVectoredImpl {
     603              :     Sequential,
     604              :     Vectored,
     605              : }
     606              : 
     607              : /// Public interface functions
     608              : impl Timeline {
     609              :     /// Get the LSN where this branch was created
     610            6 :     pub(crate) fn get_ancestor_lsn(&self) -> Lsn {
     611            6 :         self.ancestor_lsn
     612            6 :     }
     613              : 
     614              :     /// Get the ancestor's timeline id
     615           14 :     pub(crate) fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
     616           14 :         self.ancestor_timeline
     617           14 :             .as_ref()
     618           14 :             .map(|ancestor| ancestor.timeline_id)
     619           14 :     }
     620              : 
     621              :     /// Lock and get timeline's GC cutoff
     622          622 :     pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
     623          622 :         self.latest_gc_cutoff_lsn.read()
     624          622 :     }
     625              : 
     626              :     /// Look up given page version.
     627              :     ///
     628              :     /// If a remote layer file is needed, it is downloaded as part of this
     629              :     /// call.
     630              :     ///
     631              :     /// NOTE: It is considered an error to 'get' a key that doesn't exist. The
     632              :     /// abstraction above this needs to store suitable metadata to track what
     633              :     /// data exists with what keys, in separate metadata entries. If a
     634              :     /// non-existent key is requested, we may incorrectly return a value from
     635              :     /// an ancestor branch, for example, or waste a lot of cycles chasing the
     636              :     /// non-existing key.
     637              :     ///
     638              :     /// # Cancel-Safety
     639              :     ///
     640              :     /// This method is cancellation-safe.
     641       502524 :     pub(crate) async fn get(
     642       502524 :         &self,
     643       502524 :         key: Key,
     644       502524 :         lsn: Lsn,
     645       502524 :         ctx: &RequestContext,
     646       502524 :     ) -> Result<Bytes, PageReconstructError> {
     647       502524 :         if !lsn.is_valid() {
     648            0 :             return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
     649       502524 :         }
     650       502524 : 
     651       502524 :         self.timeline_get_throttle.throttle(ctx, 1).await;
     652              : 
     653              :         // This check is debug-only because of the cost of hashing, and because it's a double-check: we
     654              :         // already checked the key against the shard_identity when looking up the Timeline from
     655              :         // page_service.
     656       502524 :         debug_assert!(!self.shard_identity.is_key_disposable(&key));
     657              : 
     658              :         // XXX: structured stats collection for layer eviction here.
     659            0 :         trace!(
     660            0 :             "get page request for {}@{} from task kind {:?}",
     661            0 :             key,
     662            0 :             lsn,
     663            0 :             ctx.task_kind()
     664            0 :         );
     665              : 
     666              :         // Check the page cache. We will get back the most recent page with lsn <= `lsn`.
     667              :         // The cached image can be returned directly if there is no WAL between the cached image
     668              :         // and requested LSN. The cached image can also be used to reduce the amount of WAL needed
     669              :         // for redo.
     670       502524 :         let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
     671            0 :             Some((cached_lsn, cached_img)) => {
     672            0 :                 match cached_lsn.cmp(&lsn) {
     673            0 :                     Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
     674              :                     Ordering::Equal => {
     675            0 :                         MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
     676            0 :                         return Ok(cached_img); // exact LSN match, return the image
     677              :                     }
     678              :                     Ordering::Greater => {
     679            0 :                         unreachable!("the returned lsn should never be after the requested lsn")
     680              :                     }
     681              :                 }
     682            0 :                 Some((cached_lsn, cached_img))
     683              :             }
     684       502524 :             None => None,
     685              :         };
     686              : 
     687       502524 :         let mut reconstruct_state = ValueReconstructState {
     688       502524 :             records: Vec::new(),
     689       502524 :             img: cached_page_img,
     690       502524 :         };
     691       502524 : 
     692       502524 :         let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
     693       502524 :         let path = self
     694       502524 :             .get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
     695        36291 :             .await?;
     696       502416 :         timer.stop_and_record();
     697       502416 : 
     698       502416 :         let start = Instant::now();
     699       502416 :         let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
     700       502416 :         let elapsed = start.elapsed();
     701       502416 :         crate::metrics::RECONSTRUCT_TIME
     702       502416 :             .for_result(&res)
     703       502416 :             .observe(elapsed.as_secs_f64());
     704       502416 : 
     705       502416 :         if cfg!(feature = "testing") && res.is_err() {
     706              :             // it can only be walredo issue
     707              :             use std::fmt::Write;
     708              : 
     709            0 :             let mut msg = String::new();
     710            0 : 
     711            0 :             path.into_iter().for_each(|(res, cont_lsn, layer)| {
     712            0 :                 writeln!(
     713            0 :                     msg,
     714            0 :                     "- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
     715            0 :                     layer(),
     716            0 :                 )
     717            0 :                 .expect("string grows")
     718            0 :             });
     719              : 
     720              :             // this is to rule out or provide evidence that we could in some cases read a duplicate
     721              :             // walrecord
     722            0 :             tracing::info!("walredo failed, path:\n{msg}");
     723       502416 :         }
     724              : 
     725       502416 :         res
     726       502524 :     }
     727              : 
     728              :     pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
     729              : 
     730              :     /// Look up multiple page versions at a given LSN
     731              :     ///
     732              :     /// This naive implementation will be replaced with a more efficient one
     733              :     /// which actually vectorizes the read path.
     734          444 :     pub(crate) async fn get_vectored(
     735          444 :         &self,
     736          444 :         keyspace: KeySpace,
     737          444 :         lsn: Lsn,
     738          444 :         ctx: &RequestContext,
     739          444 :     ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
     740          444 :         if !lsn.is_valid() {
     741            0 :             return Err(GetVectoredError::InvalidLsn(lsn));
     742          444 :         }
     743          444 : 
     744          444 :         let key_count = keyspace.total_size().try_into().unwrap();
     745          444 :         if key_count > Timeline::MAX_GET_VECTORED_KEYS {
     746            0 :             return Err(GetVectoredError::Oversized(key_count));
     747          444 :         }
     748          444 : 
     749          444 :         self.timeline_get_throttle
     750          444 :             .throttle(ctx, key_count as usize)
     751            0 :             .await;
     752              : 
     753          888 :         for range in &keyspace.ranges {
     754          444 :             let mut key = range.start;
     755         1036 :             while key != range.end {
     756          592 :                 assert!(!self.shard_identity.is_key_disposable(&key));
     757          592 :                 key = key.next();
     758              :             }
     759              :         }
     760              : 
     761            0 :         trace!(
     762            0 :             "get vectored request for {:?}@{} from task kind {:?} will use {} implementation",
     763            0 :             keyspace,
     764            0 :             lsn,
     765            0 :             ctx.task_kind(),
     766            0 :             self.conf.get_vectored_impl
     767            0 :         );
     768              : 
     769          444 :         let _timer = crate::metrics::GET_VECTORED_LATENCY
     770          444 :             .for_task_kind(ctx.task_kind())
     771          444 :             .map(|t| t.start_timer());
     772          444 : 
     773          444 :         match self.conf.get_vectored_impl {
     774              :             GetVectoredImpl::Sequential => {
     775          444 :                 self.get_vectored_sequential_impl(keyspace, lsn, ctx).await
     776              :             }
     777              :             GetVectoredImpl::Vectored => {
     778            0 :                 let vectored_res = self.get_vectored_impl(keyspace.clone(), lsn, ctx).await;
     779              : 
     780            0 :                 if self.conf.validate_vectored_get {
     781            0 :                     self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
     782            0 :                         .await;
     783            0 :                 }
     784              : 
     785            0 :                 vectored_res
     786              :             }
     787              :         }
     788          444 :     }
     789              : 
     790          454 :     pub(super) async fn get_vectored_sequential_impl(
     791          454 :         &self,
     792          454 :         keyspace: KeySpace,
     793          454 :         lsn: Lsn,
     794          454 :         ctx: &RequestContext,
     795          454 :     ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
     796          454 :         let mut values = BTreeMap::new();
     797          908 :         for range in keyspace.ranges {
     798          454 :             let mut key = range.start;
     799         1366 :             while key != range.end {
     800          912 :                 let block = self.get(key, lsn, ctx).await;
     801              : 
     802              :                 use PageReconstructError::*;
     803            0 :                 match block {
     804              :                     Err(Cancelled | AncestorStopping(_)) => {
     805            0 :                         return Err(GetVectoredError::Cancelled)
     806              :                     }
     807            0 :                     Err(Other(err)) if err.to_string().contains("could not find data for key") => {
     808            0 :                         return Err(GetVectoredError::MissingKey(key))
     809              :                     }
     810          912 :                     _ => {
     811          912 :                         values.insert(key, block);
     812          912 :                         key = key.next();
     813          912 :                     }
     814              :                 }
     815              :             }
     816              :         }
     817              : 
     818          454 :         Ok(values)
     819          454 :     }
     820              : 
     821           10 :     pub(super) async fn get_vectored_impl(
     822           10 :         &self,
     823           10 :         keyspace: KeySpace,
     824           10 :         lsn: Lsn,
     825           10 :         ctx: &RequestContext,
     826           10 :     ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
     827           10 :         let mut reconstruct_state = ValuesReconstructState::new();
     828           10 : 
     829           10 :         self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx)
     830           25 :             .await?;
     831              : 
     832           10 :         let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
     833          330 :         for (key, res) in reconstruct_state.keys {
     834          320 :             match res {
     835            0 :                 Err(err) => {
     836            0 :                     results.insert(key, Err(err));
     837            0 :                 }
     838          320 :                 Ok(state) => {
     839          320 :                     let state = ValueReconstructState::from(state);
     840              : 
     841          320 :                     let reconstruct_res = self.reconstruct_value(key, lsn, state).await;
     842          320 :                     results.insert(key, reconstruct_res);
     843              :                 }
     844              :             }
     845              :         }
     846              : 
     847           10 :         Ok(results)
     848           10 :     }
     849              : 
     850           10 :     pub(super) async fn validate_get_vectored_impl(
     851           10 :         &self,
     852           10 :         vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>,
     853           10 :         keyspace: KeySpace,
     854           10 :         lsn: Lsn,
     855           10 :         ctx: &RequestContext,
     856           10 :     ) {
     857           10 :         let sequential_res = self
     858           10 :             .get_vectored_sequential_impl(keyspace.clone(), lsn, ctx)
     859           20 :             .await;
     860              : 
     861            0 :         fn errors_match(lhs: &GetVectoredError, rhs: &GetVectoredError) -> bool {
     862            0 :             use GetVectoredError::*;
     863            0 :             match (lhs, rhs) {
     864            0 :                 (Cancelled, Cancelled) => true,
     865            0 :                 (_, Cancelled) => true,
     866            0 :                 (Oversized(l), Oversized(r)) => l == r,
     867            0 :                 (InvalidLsn(l), InvalidLsn(r)) => l == r,
     868            0 :                 (MissingKey(l), MissingKey(r)) => l == r,
     869            0 :                 (GetReadyAncestorError(_), GetReadyAncestorError(_)) => true,
     870            0 :                 (Other(_), Other(_)) => true,
     871            0 :                 _ => false,
     872              :             }
     873            0 :         }
     874              : 
     875           10 :         match (&sequential_res, vectored_res) {
     876            0 :             (Err(seq_err), Ok(_)) => {
     877            0 :                 panic!(concat!("Sequential get failed with {}, but vectored get did not",
     878            0 :                                " - keyspace={:?} lsn={}"),
     879            0 :                        seq_err, keyspace, lsn) },
     880            0 :             (Ok(_), Err(vec_err)) => {
     881            0 :                 panic!(concat!("Vectored get failed with {}, but sequential get did not",
     882            0 :                                " - keyspace={:?} lsn={}"),
     883            0 :                        vec_err, keyspace, lsn) },
     884            0 :             (Err(seq_err), Err(vec_err)) => {
     885            0 :                 assert!(errors_match(seq_err, vec_err),
     886            0 :                         "Mismatched errors: {seq_err} != {vec_err} - keyspace={keyspace:?} lsn={lsn}")},
     887           10 :             (Ok(seq_values), Ok(vec_values)) => {
     888          320 :                 seq_values.iter().zip(vec_values.iter()).for_each(|((seq_key, seq_res), (vec_key, vec_res))| {
     889          320 :                     assert_eq!(seq_key, vec_key);
     890          320 :                     match (seq_res, vec_res) {
     891          320 :                         (Ok(seq_blob), Ok(vec_blob)) => {
     892          320 :                             assert_eq!(seq_blob, vec_blob,
     893            0 :                                        "Image mismatch for key {seq_key} - keyspace={keyspace:?} lsn={lsn}");
     894              :                         },
     895            0 :                         (Err(err), Ok(_)) => {
     896            0 :                             panic!(
     897            0 :                                 concat!("Sequential get failed with {} for key {}, but vectored get did not",
     898            0 :                                         " - keyspace={:?} lsn={}"),
     899            0 :                                 err, seq_key, keyspace, lsn) },
     900            0 :                         (Ok(_), Err(err)) => {
     901            0 :                             panic!(
     902            0 :                                 concat!("Vectored get failed with {} for key {}, but sequential get did not",
     903            0 :                                         " - keyspace={:?} lsn={}"),
     904            0 :                                 err, seq_key, keyspace, lsn) },
     905            0 :                         (Err(_), Err(_)) => {}
     906              :                     }
     907          320 :                 })
     908              :             }
     909              :         }
     910           10 :     }
     911              : 
     912              :     /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
     913      2900534 :     pub(crate) fn get_last_record_lsn(&self) -> Lsn {
     914      2900534 :         self.last_record_lsn.load().last
     915      2900534 :     }
     916              : 
     917            0 :     pub(crate) fn get_prev_record_lsn(&self) -> Lsn {
     918            0 :         self.last_record_lsn.load().prev
     919            0 :     }
     920              : 
     921              :     /// Atomically get both last and prev.
     922          210 :     pub(crate) fn get_last_record_rlsn(&self) -> RecordLsn {
     923          210 :         self.last_record_lsn.load()
     924          210 :     }
     925              : 
     926          698 :     pub(crate) fn get_disk_consistent_lsn(&self) -> Lsn {
     927          698 :         self.disk_consistent_lsn.load()
     928          698 :     }
     929              : 
     930              :     /// remote_consistent_lsn from the perspective of the tenant's current generation,
     931              :     /// not validated with control plane yet.
     932              :     /// See [`Self::get_remote_consistent_lsn_visible`].
     933            0 :     pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
     934            0 :         if let Some(remote_client) = &self.remote_client {
     935            0 :             remote_client.remote_consistent_lsn_projected()
     936              :         } else {
     937            0 :             None
     938              :         }
     939            0 :     }
     940              : 
     941              :     /// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
     942              :     /// i.e. a value of remote_consistent_lsn_projected which has undergone
     943              :     /// generation validation in the deletion queue.
     944            0 :     pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
     945            0 :         if let Some(remote_client) = &self.remote_client {
     946            0 :             remote_client.remote_consistent_lsn_visible()
     947              :         } else {
     948            0 :             None
     949              :         }
     950            0 :     }
     951              : 
     952              :     /// The sum of the file size of all historic layers in the layer map.
     953              :     /// This method makes no distinction between local and remote layers.
     954              :     /// Hence, the result **does not represent local filesystem usage**.
     955            0 :     pub(crate) async fn layer_size_sum(&self) -> u64 {
     956            0 :         let guard = self.layers.read().await;
     957            0 :         let layer_map = guard.layer_map();
     958            0 :         let mut size = 0;
     959            0 :         for l in layer_map.iter_historic_layers() {
     960            0 :             size += l.file_size();
     961            0 :         }
     962            0 :         size
     963            0 :     }
     964              : 
     965            0 :     pub(crate) fn resident_physical_size(&self) -> u64 {
     966            0 :         self.metrics.resident_physical_size_get()
     967            0 :     }
     968              : 
     969            0 :     pub(crate) fn get_directory_metrics(&self) -> [u64; DirectoryKind::KINDS_NUM] {
     970            0 :         array::from_fn(|idx| self.directory_metrics[idx].load(AtomicOrdering::Relaxed))
     971            0 :     }
     972              : 
     973              :     ///
     974              :     /// Wait until WAL has been received and processed up to this LSN.
     975              :     ///
     976              :     /// You should call this before any of the other get_* or list_* functions. Calling
     977              :     /// those functions with an LSN that has been processed yet is an error.
     978              :     ///
     979       226767 :     pub(crate) async fn wait_lsn(
     980       226767 :         &self,
     981       226767 :         lsn: Lsn,
     982       226767 :         _ctx: &RequestContext, /* Prepare for use by cancellation */
     983       226767 :     ) -> Result<(), WaitLsnError> {
     984       226767 :         if self.cancel.is_cancelled() {
     985            0 :             return Err(WaitLsnError::Shutdown);
     986       226767 :         } else if !self.is_active() {
     987            0 :             return Err(WaitLsnError::BadState);
     988       226767 :         }
     989              : 
     990              :         // This should never be called from the WAL receiver, because that could lead
     991              :         // to a deadlock.
     992              :         debug_assert!(
     993       226767 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
     994            0 :             "wait_lsn cannot be called in WAL receiver"
     995              :         );
     996              :         debug_assert!(
     997       226767 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
     998            0 :             "wait_lsn cannot be called in WAL receiver"
     999              :         );
    1000              :         debug_assert!(
    1001       226767 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
    1002            0 :             "wait_lsn cannot be called in WAL receiver"
    1003              :         );
    1004              : 
    1005       226767 :         let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
    1006       226767 : 
    1007       226767 :         match self
    1008       226767 :             .last_record_lsn
    1009       226767 :             .wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
    1010            0 :             .await
    1011              :         {
    1012       226767 :             Ok(()) => Ok(()),
    1013            0 :             Err(e) => {
    1014            0 :                 use utils::seqwait::SeqWaitError::*;
    1015            0 :                 match e {
    1016            0 :                     Shutdown => Err(WaitLsnError::Shutdown),
    1017              :                     Timeout => {
    1018              :                         // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
    1019            0 :                         drop(_timer);
    1020            0 :                         let walreceiver_status = self.walreceiver_status();
    1021            0 :                         Err(WaitLsnError::Timeout(format!(
    1022            0 :                         "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
    1023            0 :                         lsn,
    1024            0 :                         self.get_last_record_lsn(),
    1025            0 :                         self.get_disk_consistent_lsn(),
    1026            0 :                         walreceiver_status,
    1027            0 :                     )))
    1028              :                     }
    1029              :                 }
    1030              :             }
    1031              :         }
    1032       226767 :     }
    1033              : 
    1034            0 :     pub(crate) fn walreceiver_status(&self) -> String {
    1035            0 :         match &*self.walreceiver.lock().unwrap() {
    1036            0 :             None => "stopping or stopped".to_string(),
    1037            0 :             Some(walreceiver) => match walreceiver.status() {
    1038            0 :                 Some(status) => status.to_human_readable_string(),
    1039            0 :                 None => "Not active".to_string(),
    1040              :             },
    1041              :         }
    1042            0 :     }
    1043              : 
    1044              :     /// Check that it is valid to request operations with that lsn.
    1045          214 :     pub(crate) fn check_lsn_is_in_scope(
    1046          214 :         &self,
    1047          214 :         lsn: Lsn,
    1048          214 :         latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
    1049          214 :     ) -> anyhow::Result<()> {
    1050          214 :         ensure!(
    1051          214 :             lsn >= **latest_gc_cutoff_lsn,
    1052            4 :             "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
    1053            4 :             lsn,
    1054            4 :             **latest_gc_cutoff_lsn,
    1055              :         );
    1056          210 :         Ok(())
    1057          214 :     }
    1058              : 
    1059              :     /// Flush to disk all data that was written with the put_* functions
    1060         1060 :     #[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
    1061              :     pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
    1062              :         self.freeze_inmem_layer(false).await;
    1063              :         self.flush_frozen_layers_and_wait().await
    1064              :     }
    1065              : 
    1066              :     /// Outermost timeline compaction operation; downloads needed layers.
    1067          408 :     pub(crate) async fn compact(
    1068          408 :         self: &Arc<Self>,
    1069          408 :         cancel: &CancellationToken,
    1070          408 :         flags: EnumSet<CompactFlags>,
    1071          408 :         ctx: &RequestContext,
    1072          408 :     ) -> Result<(), CompactionError> {
    1073          408 :         // most likely the cancellation token is from background task, but in tests it could be the
    1074          408 :         // request task as well.
    1075          408 : 
    1076          408 :         let prepare = async move {
    1077          408 :             let guard = self.compaction_lock.lock().await;
    1078              : 
    1079          408 :             let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
    1080          408 :                 BackgroundLoopKind::Compaction,
    1081          408 :                 ctx,
    1082          408 :             )
    1083            0 :             .await;
    1084              : 
    1085          408 :             (guard, permit)
    1086          408 :         };
    1087              : 
    1088              :         // this wait probably never needs any "long time spent" logging, because we already nag if
    1089              :         // compaction task goes over it's period (20s) which is quite often in production.
    1090          408 :         let (_guard, _permit) = tokio::select! {
    1091          408 :             tuple = prepare => { tuple },
    1092              :             _ = self.cancel.cancelled() => return Ok(()),
    1093              :             _ = cancel.cancelled() => return Ok(()),
    1094              :         };
    1095              : 
    1096          408 :         let last_record_lsn = self.get_last_record_lsn();
    1097          408 : 
    1098          408 :         // Last record Lsn could be zero in case the timeline was just created
    1099          408 :         if !last_record_lsn.is_valid() {
    1100            0 :             warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
    1101            0 :             return Ok(());
    1102          408 :         }
    1103          408 : 
    1104          408 :         match self.get_compaction_algorithm() {
    1105            0 :             CompactionAlgorithm::Tiered => self.compact_tiered(cancel, ctx).await,
    1106        52874 :             CompactionAlgorithm::Legacy => self.compact_legacy(cancel, flags, ctx).await,
    1107              :         }
    1108          408 :     }
    1109              : 
    1110              :     /// TODO: cancellation
    1111          408 :     async fn compact_legacy(
    1112          408 :         self: &Arc<Self>,
    1113          408 :         _cancel: &CancellationToken,
    1114          408 :         flags: EnumSet<CompactFlags>,
    1115          408 :         ctx: &RequestContext,
    1116          408 :     ) -> Result<(), CompactionError> {
    1117          408 :         // High level strategy for compaction / image creation:
    1118          408 :         //
    1119          408 :         // 1. First, calculate the desired "partitioning" of the
    1120          408 :         // currently in-use key space. The goal is to partition the
    1121          408 :         // key space into roughly fixed-size chunks, but also take into
    1122          408 :         // account any existing image layers, and try to align the
    1123          408 :         // chunk boundaries with the existing image layers to avoid
    1124          408 :         // too much churn. Also try to align chunk boundaries with
    1125          408 :         // relation boundaries.  In principle, we don't know about
    1126          408 :         // relation boundaries here, we just deal with key-value
    1127          408 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
    1128          408 :         // map relations into key-value pairs. But in practice we know
    1129          408 :         // that 'field6' is the block number, and the fields 1-5
    1130          408 :         // identify a relation. This is just an optimization,
    1131          408 :         // though.
    1132          408 :         //
    1133          408 :         // 2. Once we know the partitioning, for each partition,
    1134          408 :         // decide if it's time to create a new image layer. The
    1135          408 :         // criteria is: there has been too much "churn" since the last
    1136          408 :         // image layer? The "churn" is fuzzy concept, it's a
    1137          408 :         // combination of too many delta files, or too much WAL in
    1138          408 :         // total in the delta file. Or perhaps: if creating an image
    1139          408 :         // file would allow to delete some older files.
    1140          408 :         //
    1141          408 :         // 3. After that, we compact all level0 delta files if there
    1142          408 :         // are too many of them.  While compacting, we also garbage
    1143          408 :         // collect any page versions that are no longer needed because
    1144          408 :         // of the new image layers we created in step 2.
    1145          408 :         //
    1146          408 :         // TODO: This high level strategy hasn't been implemented yet.
    1147          408 :         // Below are functions compact_level0() and create_image_layers()
    1148          408 :         // but they are a bit ad hoc and don't quite work like it's explained
    1149          408 :         // above. Rewrite it.
    1150          408 : 
    1151          408 :         // Is the timeline being deleted?
    1152          408 :         if self.is_stopping() {
    1153            0 :             trace!("Dropping out of compaction on timeline shutdown");
    1154            0 :             return Err(CompactionError::ShuttingDown);
    1155          408 :         }
    1156          408 : 
    1157          408 :         let target_file_size = self.get_checkpoint_distance();
    1158          408 : 
    1159          408 :         // Define partitioning schema if needed
    1160          408 : 
    1161          408 :         // FIXME: the match should only cover repartitioning, not the next steps
    1162          408 :         match self
    1163          408 :             .repartition(
    1164          408 :                 self.get_last_record_lsn(),
    1165          408 :                 self.get_compaction_target_size(),
    1166          408 :                 flags,
    1167          408 :                 ctx,
    1168          408 :             )
    1169        13313 :             .await
    1170              :         {
    1171          408 :             Ok((partitioning, lsn)) => {
    1172          408 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
    1173          408 :                 let image_ctx = RequestContextBuilder::extend(ctx)
    1174          408 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
    1175          408 :                     .build();
    1176          408 : 
    1177          408 :                 // 2. Compact
    1178          408 :                 let timer = self.metrics.compact_time_histo.start_timer();
    1179        39561 :                 self.compact_level0(target_file_size, ctx).await?;
    1180          408 :                 timer.stop_and_record();
    1181              : 
    1182              :                 // 3. Create new image layers for partitions that have been modified
    1183              :                 // "enough".
    1184          408 :                 let layers = self
    1185          408 :                     .create_image_layers(
    1186          408 :                         &partitioning,
    1187          408 :                         lsn,
    1188          408 :                         flags.contains(CompactFlags::ForceImageLayerCreation),
    1189          408 :                         &image_ctx,
    1190          408 :                     )
    1191            0 :                     .await
    1192          408 :                     .map_err(anyhow::Error::from)?;
    1193          408 :                 if let Some(remote_client) = &self.remote_client {
    1194          408 :                     for layer in layers {
    1195            0 :                         remote_client.schedule_layer_file_upload(layer)?;
    1196              :                     }
    1197            0 :                 }
    1198              : 
    1199          408 :                 if let Some(remote_client) = &self.remote_client {
    1200              :                     // should any new image layer been created, not uploading index_part will
    1201              :                     // result in a mismatch between remote_physical_size and layermap calculated
    1202              :                     // size, which will fail some tests, but should not be an issue otherwise.
    1203          408 :                     remote_client.schedule_index_upload_for_file_changes()?;
    1204            0 :                 }
    1205              :             }
    1206            0 :             Err(err) => {
    1207            0 :                 // no partitioning? This is normal, if the timeline was just created
    1208            0 :                 // as an empty timeline. Also in unit tests, when we use the timeline
    1209            0 :                 // as a simple key-value store, ignoring the datadir layout. Log the
    1210            0 :                 // error but continue.
    1211            0 :                 //
    1212            0 :                 // Suppress error when it's due to cancellation
    1213            0 :                 if !self.cancel.is_cancelled() {
    1214            0 :                     error!("could not compact, repartitioning keyspace failed: {err:?}");
    1215            0 :                 }
    1216              :             }
    1217              :         };
    1218              : 
    1219          408 :         Ok(())
    1220          408 :     }
    1221              : 
    1222              :     /// Mutate the timeline with a [`TimelineWriter`].
    1223      2957012 :     pub(crate) async fn writer(&self) -> TimelineWriter<'_> {
    1224      2957012 :         TimelineWriter {
    1225      2957012 :             tl: self,
    1226      2957012 :             _write_guard: self.write_lock.lock().await,
    1227              :         }
    1228      2957012 :     }
    1229              : 
    1230              :     /// Check if more than 'checkpoint_distance' of WAL has been accumulated in
    1231              :     /// the in-memory layer, and initiate flushing it if so.
    1232              :     ///
    1233              :     /// Also flush after a period of time without new data -- it helps
    1234              :     /// safekeepers to regard pageserver as caught up and suspend activity.
    1235            0 :     pub(crate) async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
    1236            0 :         let last_lsn = self.get_last_record_lsn();
    1237            0 :         let open_layer_size = {
    1238            0 :             let guard = self.layers.read().await;
    1239            0 :             let layers = guard.layer_map();
    1240            0 :             let Some(open_layer) = layers.open_layer.as_ref() else {
    1241            0 :                 return Ok(());
    1242              :             };
    1243            0 :             open_layer.size().await?
    1244              :         };
    1245            0 :         let last_freeze_at = self.last_freeze_at.load();
    1246            0 :         let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
    1247            0 :         let distance = last_lsn.widening_sub(last_freeze_at);
    1248            0 :         // Rolling the open layer can be triggered by:
    1249            0 :         // 1. The distance from the last LSN we rolled at. This bounds the amount of WAL that
    1250            0 :         //    the safekeepers need to store.  For sharded tenants, we multiply by shard count to
    1251            0 :         //    account for how writes are distributed across shards: we expect each node to consume
    1252            0 :         //    1/count of the LSN on average.
    1253            0 :         // 2. The size of the currently open layer.
    1254            0 :         // 3. The time since the last roll. It helps safekeepers to regard pageserver as caught
    1255            0 :         //    up and suspend activity.
    1256            0 :         if (distance
    1257            0 :             >= self.get_checkpoint_distance() as i128 * self.shard_identity.count.count() as i128)
    1258            0 :             || open_layer_size > self.get_checkpoint_distance()
    1259            0 :             || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
    1260              :         {
    1261            0 :             info!(
    1262            0 :                 "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
    1263            0 :                 distance,
    1264            0 :                 open_layer_size,
    1265            0 :                 last_freeze_ts.elapsed()
    1266            0 :             );
    1267              : 
    1268            0 :             self.freeze_inmem_layer(true).await;
    1269            0 :             self.last_freeze_at.store(last_lsn);
    1270            0 :             *(self.last_freeze_ts.write().unwrap()) = Instant::now();
    1271            0 : 
    1272            0 :             // Wake up the layer flusher
    1273            0 :             self.flush_frozen_layers();
    1274            0 :         }
    1275            0 :         Ok(())
    1276            0 :     }
    1277              : 
    1278            0 :     pub(crate) fn activate(
    1279            0 :         self: &Arc<Self>,
    1280            0 :         broker_client: BrokerClientChannel,
    1281            0 :         background_jobs_can_start: Option<&completion::Barrier>,
    1282            0 :         ctx: &RequestContext,
    1283            0 :     ) {
    1284            0 :         if self.tenant_shard_id.is_zero() {
    1285            0 :             // Logical size is only maintained accurately on shard zero.
    1286            0 :             self.spawn_initial_logical_size_computation_task(ctx);
    1287            0 :         }
    1288            0 :         self.launch_wal_receiver(ctx, broker_client);
    1289            0 :         self.set_state(TimelineState::Active);
    1290            0 :         self.launch_eviction_task(background_jobs_can_start);
    1291            0 :     }
    1292              : 
    1293              :     /// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
    1294              :     /// also to remote storage.  This method can easily take multiple seconds for a busy timeline.
    1295              :     ///
    1296              :     /// While we are flushing, we continue to accept read I/O.
    1297            6 :     pub(crate) async fn flush_and_shutdown(&self) {
    1298            6 :         debug_assert_current_span_has_tenant_and_timeline_id();
    1299              : 
    1300              :         // Stop ingesting data, so that we are not still writing to an InMemoryLayer while
    1301              :         // trying to flush
    1302            0 :         tracing::debug!("Waiting for WalReceiverManager...");
    1303            6 :         task_mgr::shutdown_tasks(
    1304            6 :             Some(TaskKind::WalReceiverManager),
    1305            6 :             Some(self.tenant_shard_id),
    1306            6 :             Some(self.timeline_id),
    1307            6 :         )
    1308            0 :         .await;
    1309              : 
    1310              :         // Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
    1311            6 :         self.last_record_lsn.shutdown();
    1312            6 : 
    1313            6 :         // now all writers to InMemory layer are gone, do the final flush if requested
    1314            6 :         match self.freeze_and_flush().await {
    1315              :             Ok(_) => {
    1316              :                 // drain the upload queue
    1317            6 :                 if let Some(client) = self.remote_client.as_ref() {
    1318              :                     // if we did not wait for completion here, it might be our shutdown process
    1319              :                     // didn't wait for remote uploads to complete at all, as new tasks can forever
    1320              :                     // be spawned.
    1321              :                     //
    1322              :                     // what is problematic is the shutting down of RemoteTimelineClient, because
    1323              :                     // obviously it does not make sense to stop while we wait for it, but what
    1324              :                     // about corner cases like s3 suddenly hanging up?
    1325            6 :                     if let Err(e) = client.shutdown().await {
    1326              :                         // Non-fatal.  Shutdown is infallible.  Failures to flush just mean that
    1327              :                         // we have some extra WAL replay to do next time the timeline starts.
    1328            0 :                         warn!("failed to flush to remote storage: {e:#}");
    1329            6 :                     }
    1330            0 :                 }
    1331              :             }
    1332            0 :             Err(e) => {
    1333              :                 // Non-fatal.  Shutdown is infallible.  Failures to flush just mean that
    1334              :                 // we have some extra WAL replay to do next time the timeline starts.
    1335            0 :                 warn!("failed to freeze and flush: {e:#}");
    1336              :             }
    1337              :         }
    1338              : 
    1339            6 :         self.shutdown().await;
    1340            6 :     }
    1341              : 
    1342              :     /// Shut down immediately, without waiting for any open layers to flush to disk.  This is a subset of
    1343              :     /// the graceful [`Timeline::flush_and_shutdown`] function.
    1344            8 :     pub(crate) async fn shutdown(&self) {
    1345            8 :         debug_assert_current_span_has_tenant_and_timeline_id();
    1346              : 
    1347              :         // Signal any subscribers to our cancellation token to drop out
    1348            0 :         tracing::debug!("Cancelling CancellationToken");
    1349            8 :         self.cancel.cancel();
    1350            8 : 
    1351            8 :         // Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
    1352            8 :         // while doing so.
    1353            8 :         self.last_record_lsn.shutdown();
    1354            8 : 
    1355            8 :         // Shut down the layer flush task before the remote client, as one depends on the other
    1356            8 :         task_mgr::shutdown_tasks(
    1357            8 :             Some(TaskKind::LayerFlushTask),
    1358            8 :             Some(self.tenant_shard_id),
    1359            8 :             Some(self.timeline_id),
    1360            8 :         )
    1361            6 :         .await;
    1362              : 
    1363              :         // Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
    1364              :         // case our caller wants to use that for a deletion
    1365            8 :         if let Some(remote_client) = self.remote_client.as_ref() {
    1366            8 :             match remote_client.stop() {
    1367            8 :                 Ok(()) => {}
    1368            0 :                 Err(StopError::QueueUninitialized) => {
    1369            0 :                     // Shutting down during initialization is legal
    1370            0 :                 }
    1371              :             }
    1372            0 :         }
    1373              : 
    1374            0 :         tracing::debug!("Waiting for tasks...");
    1375              : 
    1376            8 :         task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;
    1377              : 
    1378              :         // Finally wait until any gate-holders are complete
    1379            8 :         self.gate.close().await;
    1380            8 :     }
    1381              : 
    1382          298 :     pub(crate) fn set_state(&self, new_state: TimelineState) {
    1383          298 :         match (self.current_state(), new_state) {
    1384          298 :             (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
    1385            2 :                 info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
    1386              :             }
    1387            0 :             (st, TimelineState::Loading) => {
    1388            0 :                 error!("ignoring transition from {st:?} into Loading state");
    1389              :             }
    1390            0 :             (TimelineState::Broken { .. }, new_state) => {
    1391            0 :                 error!("Ignoring state update {new_state:?} for broken timeline");
    1392              :             }
    1393              :             (TimelineState::Stopping, TimelineState::Active) => {
    1394            0 :                 error!("Not activating a Stopping timeline");
    1395              :             }
    1396          296 :             (_, new_state) => {
    1397          296 :                 self.state.send_replace(new_state);
    1398          296 :             }
    1399              :         }
    1400          298 :     }
    1401              : 
    1402            2 :     pub(crate) fn set_broken(&self, reason: String) {
    1403            2 :         let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
    1404            2 :         let broken_state = TimelineState::Broken {
    1405            2 :             reason,
    1406            2 :             backtrace: backtrace_str,
    1407            2 :         };
    1408            2 :         self.set_state(broken_state);
    1409            2 : 
    1410            2 :         // Although the Broken state is not equivalent to shutdown() (shutdown will be called
    1411            2 :         // later when this tenant is detach or the process shuts down), firing the cancellation token
    1412            2 :         // here avoids the need for other tasks to watch for the Broken state explicitly.
    1413            2 :         self.cancel.cancel();
    1414            2 :     }
    1415              : 
    1416       228097 :     pub(crate) fn current_state(&self) -> TimelineState {
    1417       228097 :         self.state.borrow().clone()
    1418       228097 :     }
    1419              : 
    1420            6 :     pub(crate) fn is_broken(&self) -> bool {
    1421            6 :         matches!(&*self.state.borrow(), TimelineState::Broken { .. })
    1422            6 :     }
    1423              : 
    1424       226983 :     pub(crate) fn is_active(&self) -> bool {
    1425       226983 :         self.current_state() == TimelineState::Active
    1426       226983 :     }
    1427              : 
    1428          816 :     pub(crate) fn is_stopping(&self) -> bool {
    1429          816 :         self.current_state() == TimelineState::Stopping
    1430          816 :     }
    1431              : 
    1432            0 :     pub(crate) fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
    1433            0 :         self.state.subscribe()
    1434            0 :     }
    1435              : 
    1436       226769 :     pub(crate) async fn wait_to_become_active(
    1437       226769 :         &self,
    1438       226769 :         _ctx: &RequestContext, // Prepare for use by cancellation
    1439       226769 :     ) -> Result<(), TimelineState> {
    1440       226769 :         let mut receiver = self.state.subscribe();
    1441       226769 :         loop {
    1442       226769 :             let current_state = receiver.borrow().clone();
    1443       226769 :             match current_state {
    1444              :                 TimelineState::Loading => {
    1445            0 :                     receiver
    1446            0 :                         .changed()
    1447            0 :                         .await
    1448            0 :                         .expect("holding a reference to self");
    1449              :                 }
    1450              :                 TimelineState::Active { .. } => {
    1451       226767 :                     return Ok(());
    1452              :                 }
    1453              :                 TimelineState::Broken { .. } | TimelineState::Stopping => {
    1454              :                     // There's no chance the timeline can transition back into ::Active
    1455            2 :                     return Err(current_state);
    1456              :                 }
    1457              :             }
    1458              :         }
    1459       226769 :     }
    1460              : 
    1461            0 :     pub(crate) async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
    1462            0 :         let guard = self.layers.read().await;
    1463            0 :         let layer_map = guard.layer_map();
    1464            0 :         let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
    1465            0 :         if let Some(open_layer) = &layer_map.open_layer {
    1466            0 :             in_memory_layers.push(open_layer.info());
    1467            0 :         }
    1468            0 :         for frozen_layer in &layer_map.frozen_layers {
    1469            0 :             in_memory_layers.push(frozen_layer.info());
    1470            0 :         }
    1471              : 
    1472            0 :         let mut historic_layers = Vec::new();
    1473            0 :         for historic_layer in layer_map.iter_historic_layers() {
    1474            0 :             let historic_layer = guard.get_from_desc(&historic_layer);
    1475            0 :             historic_layers.push(historic_layer.info(reset));
    1476            0 :         }
    1477              : 
    1478            0 :         LayerMapInfo {
    1479            0 :             in_memory_layers,
    1480            0 :             historic_layers,
    1481            0 :         }
    1482            0 :     }
    1483              : 
    1484            0 :     #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
    1485              :     pub(crate) async fn download_layer(
    1486              :         &self,
    1487              :         layer_file_name: &str,
    1488              :     ) -> anyhow::Result<Option<bool>> {
    1489              :         let Some(layer) = self.find_layer(layer_file_name).await else {
    1490              :             return Ok(None);
    1491              :         };
    1492              : 
    1493              :         if self.remote_client.is_none() {
    1494              :             return Ok(Some(false));
    1495              :         }
    1496              : 
    1497              :         layer.download().await?;
    1498              : 
    1499              :         Ok(Some(true))
    1500              :     }
    1501              : 
    1502              :     /// Evict just one layer.
    1503              :     ///
    1504              :     /// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`.
    1505            0 :     pub(crate) async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
    1506            0 :         let _gate = self
    1507            0 :             .gate
    1508            0 :             .enter()
    1509            0 :             .map_err(|_| anyhow::anyhow!("Shutting down"))?;
    1510              : 
    1511            0 :         let Some(local_layer) = self.find_layer(layer_file_name).await else {
    1512            0 :             return Ok(None);
    1513              :         };
    1514              : 
    1515            0 :         match local_layer.evict_and_wait().await {
    1516            0 :             Ok(()) => Ok(Some(true)),
    1517            0 :             Err(EvictionError::NotFound) => Ok(Some(false)),
    1518            0 :             Err(EvictionError::Downloaded) => Ok(Some(false)),
    1519              :         }
    1520            0 :     }
    1521              : }
    1522              : 
    1523              : /// Number of times we will compute partition within a checkpoint distance.
    1524              : const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
    1525              : 
    1526              : // Private functions
    1527              : impl Timeline {
    1528            0 :     pub(crate) fn get_lazy_slru_download(&self) -> bool {
    1529            0 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    1530            0 :         tenant_conf
    1531            0 :             .lazy_slru_download
    1532            0 :             .unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
    1533            0 :     }
    1534              : 
    1535          704 :     fn get_checkpoint_distance(&self) -> u64 {
    1536          704 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    1537          704 :         tenant_conf
    1538          704 :             .checkpoint_distance
    1539          704 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
    1540          704 :     }
    1541              : 
    1542            0 :     fn get_checkpoint_timeout(&self) -> Duration {
    1543            0 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    1544            0 :         tenant_conf
    1545            0 :             .checkpoint_timeout
    1546            0 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
    1547            0 :     }
    1548              : 
    1549          482 :     fn get_compaction_target_size(&self) -> u64 {
    1550          482 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    1551          482 :         tenant_conf
    1552          482 :             .compaction_target_size
    1553          482 :             .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
    1554          482 :     }
    1555              : 
    1556          408 :     fn get_compaction_threshold(&self) -> usize {
    1557          408 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    1558          408 :         tenant_conf
    1559          408 :             .compaction_threshold
    1560          408 :             .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
    1561          408 :     }
    1562              : 
    1563          408 :     fn get_image_creation_threshold(&self) -> usize {
    1564          408 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    1565          408 :         tenant_conf
    1566          408 :             .image_creation_threshold
    1567          408 :             .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
    1568          408 :     }
    1569              : 
    1570          408 :     fn get_compaction_algorithm(&self) -> CompactionAlgorithm {
    1571          408 :         let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
    1572          408 :         tenant_conf
    1573          408 :             .compaction_algorithm
    1574          408 :             .unwrap_or(self.conf.default_tenant_conf.compaction_algorithm)
    1575          408 :     }
    1576              : 
    1577            0 :     fn get_eviction_policy(&self) -> EvictionPolicy {
    1578            0 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    1579            0 :         tenant_conf
    1580            0 :             .eviction_policy
    1581            0 :             .unwrap_or(self.conf.default_tenant_conf.eviction_policy)
    1582            0 :     }
    1583              : 
    1584          296 :     fn get_evictions_low_residence_duration_metric_threshold(
    1585          296 :         tenant_conf: &TenantConfOpt,
    1586          296 :         default_tenant_conf: &TenantConf,
    1587          296 :     ) -> Duration {
    1588          296 :         tenant_conf
    1589          296 :             .evictions_low_residence_duration_metric_threshold
    1590          296 :             .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
    1591          296 :     }
    1592              : 
    1593            0 :     pub(super) fn tenant_conf_updated(&self) {
    1594            0 :         // NB: Most tenant conf options are read by background loops, so,
    1595            0 :         // changes will automatically be picked up.
    1596            0 : 
    1597            0 :         // The threshold is embedded in the metric. So, we need to update it.
    1598            0 :         {
    1599            0 :             let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
    1600            0 :                 &self.tenant_conf.read().unwrap().tenant_conf,
    1601            0 :                 &self.conf.default_tenant_conf,
    1602            0 :             );
    1603            0 : 
    1604            0 :             let tenant_id_str = self.tenant_shard_id.tenant_id.to_string();
    1605            0 :             let shard_id_str = format!("{}", self.tenant_shard_id.shard_slug());
    1606            0 : 
    1607            0 :             let timeline_id_str = self.timeline_id.to_string();
    1608            0 :             self.metrics
    1609            0 :                 .evictions_with_low_residence_duration
    1610            0 :                 .write()
    1611            0 :                 .unwrap()
    1612            0 :                 .change_threshold(
    1613            0 :                     &tenant_id_str,
    1614            0 :                     &shard_id_str,
    1615            0 :                     &timeline_id_str,
    1616            0 :                     new_threshold,
    1617            0 :                 );
    1618            0 :         }
    1619            0 :     }
    1620              : 
    1621              :     /// Open a Timeline handle.
    1622              :     ///
    1623              :     /// Loads the metadata for the timeline into memory, but not the layer map.
    1624              :     #[allow(clippy::too_many_arguments)]
    1625          296 :     pub(super) fn new(
    1626          296 :         conf: &'static PageServerConf,
    1627          296 :         tenant_conf: Arc<RwLock<AttachedTenantConf>>,
    1628          296 :         metadata: &TimelineMetadata,
    1629          296 :         ancestor: Option<Arc<Timeline>>,
    1630          296 :         timeline_id: TimelineId,
    1631          296 :         tenant_shard_id: TenantShardId,
    1632          296 :         generation: Generation,
    1633          296 :         shard_identity: ShardIdentity,
    1634          296 :         walredo_mgr: Option<Arc<super::WalRedoManager>>,
    1635          296 :         resources: TimelineResources,
    1636          296 :         pg_version: u32,
    1637          296 :         state: TimelineState,
    1638          296 :         cancel: CancellationToken,
    1639          296 :     ) -> Arc<Self> {
    1640          296 :         let disk_consistent_lsn = metadata.disk_consistent_lsn();
    1641          296 :         let (state, _) = watch::channel(state);
    1642          296 : 
    1643          296 :         let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
    1644          296 :         let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
    1645          296 : 
    1646          296 :         let tenant_conf_guard = tenant_conf.read().unwrap();
    1647          296 : 
    1648          296 :         let evictions_low_residence_duration_metric_threshold =
    1649          296 :             Self::get_evictions_low_residence_duration_metric_threshold(
    1650          296 :                 &tenant_conf_guard.tenant_conf,
    1651          296 :                 &conf.default_tenant_conf,
    1652          296 :             );
    1653          296 :         drop(tenant_conf_guard);
    1654          296 : 
    1655          296 :         Arc::new_cyclic(|myself| {
    1656          296 :             let mut result = Timeline {
    1657          296 :                 conf,
    1658          296 :                 tenant_conf,
    1659          296 :                 myself: myself.clone(),
    1660          296 :                 timeline_id,
    1661          296 :                 tenant_shard_id,
    1662          296 :                 generation,
    1663          296 :                 shard_identity,
    1664          296 :                 pg_version,
    1665          296 :                 layers: Default::default(),
    1666          296 : 
    1667          296 :                 walredo_mgr,
    1668          296 :                 walreceiver: Mutex::new(None),
    1669          296 : 
    1670          296 :                 remote_client: resources.remote_client.map(Arc::new),
    1671          296 : 
    1672          296 :                 // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
    1673          296 :                 last_record_lsn: SeqWait::new(RecordLsn {
    1674          296 :                     last: disk_consistent_lsn,
    1675          296 :                     prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
    1676          296 :                 }),
    1677          296 :                 disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
    1678          296 : 
    1679          296 :                 last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
    1680          296 :                 last_freeze_ts: RwLock::new(Instant::now()),
    1681          296 : 
    1682          296 :                 loaded_at: (disk_consistent_lsn, SystemTime::now()),
    1683          296 : 
    1684          296 :                 ancestor_timeline: ancestor,
    1685          296 :                 ancestor_lsn: metadata.ancestor_lsn(),
    1686          296 : 
    1687          296 :                 metrics: TimelineMetrics::new(
    1688          296 :                     &tenant_shard_id,
    1689          296 :                     &timeline_id,
    1690          296 :                     crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
    1691          296 :                         "mtime",
    1692          296 :                         evictions_low_residence_duration_metric_threshold,
    1693          296 :                     ),
    1694          296 :                 ),
    1695          296 : 
    1696          296 :                 query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new(
    1697          296 :                     &tenant_shard_id,
    1698          296 :                     &timeline_id,
    1699          296 :                 ),
    1700          296 : 
    1701         2072 :                 directory_metrics: array::from_fn(|_| AtomicU64::new(0)),
    1702          296 : 
    1703          296 :                 flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
    1704          296 : 
    1705          296 :                 layer_flush_start_tx,
    1706          296 :                 layer_flush_done_tx,
    1707          296 : 
    1708          296 :                 write_lock: tokio::sync::Mutex::new(()),
    1709          296 : 
    1710          296 :                 gc_info: std::sync::RwLock::new(GcInfo {
    1711          296 :                     retain_lsns: Vec::new(),
    1712          296 :                     horizon_cutoff: Lsn(0),
    1713          296 :                     pitr_cutoff: Lsn(0),
    1714          296 :                 }),
    1715          296 : 
    1716          296 :                 latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
    1717          296 :                 initdb_lsn: metadata.initdb_lsn(),
    1718          296 : 
    1719          296 :                 current_logical_size: if disk_consistent_lsn.is_valid() {
    1720              :                     // we're creating timeline data with some layer files existing locally,
    1721              :                     // need to recalculate timeline's logical size based on data in the layers.
    1722          216 :                     LogicalSize::deferred_initial(disk_consistent_lsn)
    1723              :                 } else {
    1724              :                     // we're creating timeline data without any layers existing locally,
    1725              :                     // initial logical size is 0.
    1726           80 :                     LogicalSize::empty_initial()
    1727              :                 },
    1728          296 :                 partitioning: tokio::sync::Mutex::new((KeyPartitioning::new(), Lsn(0))),
    1729          296 :                 repartition_threshold: 0,
    1730          296 : 
    1731          296 :                 last_received_wal: Mutex::new(None),
    1732          296 :                 rel_size_cache: RwLock::new(HashMap::new()),
    1733          296 : 
    1734          296 :                 download_all_remote_layers_task_info: RwLock::new(None),
    1735          296 : 
    1736          296 :                 state,
    1737          296 : 
    1738          296 :                 eviction_task_timeline_state: tokio::sync::Mutex::new(
    1739          296 :                     EvictionTaskTimelineState::default(),
    1740          296 :                 ),
    1741          296 :                 delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
    1742          296 : 
    1743          296 :                 cancel,
    1744          296 :                 gate: Gate::default(),
    1745          296 : 
    1746          296 :                 compaction_lock: tokio::sync::Mutex::default(),
    1747          296 :                 gc_lock: tokio::sync::Mutex::default(),
    1748          296 : 
    1749          296 :                 timeline_get_throttle: resources.timeline_get_throttle,
    1750          296 : 
    1751          296 :                 aux_files: tokio::sync::Mutex::new(AuxFilesState {
    1752          296 :                     dir: None,
    1753          296 :                     n_deltas: 0,
    1754          296 :                 }),
    1755          296 :             };
    1756          296 :             result.repartition_threshold =
    1757          296 :                 result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
    1758          296 :             result
    1759          296 :                 .metrics
    1760          296 :                 .last_record_gauge
    1761          296 :                 .set(disk_consistent_lsn.0 as i64);
    1762          296 :             result
    1763          296 :         })
    1764          296 :     }
    1765              : 
    1766          366 :     pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
    1767          366 :         let Ok(guard) = self.gate.enter() else {
    1768            0 :             info!("cannot start flush loop when the timeline gate has already been closed");
    1769            0 :             return;
    1770              :         };
    1771          366 :         let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
    1772          366 :         match *flush_loop_state {
    1773          292 :             FlushLoopState::NotStarted => (),
    1774              :             FlushLoopState::Running { .. } => {
    1775           74 :                 info!(
    1776           74 :                     "skipping attempt to start flush_loop twice {}/{}",
    1777           74 :                     self.tenant_shard_id, self.timeline_id
    1778           74 :                 );
    1779           74 :                 return;
    1780              :             }
    1781              :             FlushLoopState::Exited => {
    1782            0 :                 warn!(
    1783            0 :                     "ignoring attempt to restart exited flush_loop {}/{}",
    1784            0 :                     self.tenant_shard_id, self.timeline_id
    1785            0 :                 );
    1786            0 :                 return;
    1787              :             }
    1788              :         }
    1789              : 
    1790          292 :         let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
    1791          292 :         let self_clone = Arc::clone(self);
    1792          292 : 
    1793          292 :         debug!("spawning flush loop");
    1794          292 :         *flush_loop_state = FlushLoopState::Running {
    1795          292 :             #[cfg(test)]
    1796          292 :             expect_initdb_optimization: false,
    1797          292 :             #[cfg(test)]
    1798          292 :             initdb_optimization_count: 0,
    1799          292 :         };
    1800          292 :         task_mgr::spawn(
    1801          292 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    1802          292 :             task_mgr::TaskKind::LayerFlushTask,
    1803          292 :             Some(self.tenant_shard_id),
    1804          292 :             Some(self.timeline_id),
    1805          292 :             "layer flush task",
    1806              :             false,
    1807          292 :             async move {
    1808          292 :                 let _guard = guard;
    1809          292 :                 let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
    1810         2186 :                 self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
    1811            8 :                 let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
    1812            8 :                 assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
    1813            8 :                 *flush_loop_state  = FlushLoopState::Exited;
    1814            8 :                 Ok(())
    1815            8 :             }
    1816          292 :             .instrument(info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
    1817              :         );
    1818          366 :     }
    1819              : 
    1820              :     /// Creates and starts the wal receiver.
    1821              :     ///
    1822              :     /// This function is expected to be called at most once per Timeline's lifecycle
    1823              :     /// when the timeline is activated.
    1824            0 :     fn launch_wal_receiver(
    1825            0 :         self: &Arc<Self>,
    1826            0 :         ctx: &RequestContext,
    1827            0 :         broker_client: BrokerClientChannel,
    1828            0 :     ) {
    1829            0 :         info!(
    1830            0 :             "launching WAL receiver for timeline {} of tenant {}",
    1831            0 :             self.timeline_id, self.tenant_shard_id
    1832            0 :         );
    1833              : 
    1834            0 :         let tenant_conf_guard = self.tenant_conf.read().unwrap();
    1835            0 :         let wal_connect_timeout = tenant_conf_guard
    1836            0 :             .tenant_conf
    1837            0 :             .walreceiver_connect_timeout
    1838            0 :             .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
    1839            0 :         let lagging_wal_timeout = tenant_conf_guard
    1840            0 :             .tenant_conf
    1841            0 :             .lagging_wal_timeout
    1842            0 :             .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
    1843            0 :         let max_lsn_wal_lag = tenant_conf_guard
    1844            0 :             .tenant_conf
    1845            0 :             .max_lsn_wal_lag
    1846            0 :             .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
    1847            0 :         drop(tenant_conf_guard);
    1848            0 : 
    1849            0 :         let mut guard = self.walreceiver.lock().unwrap();
    1850            0 :         assert!(
    1851            0 :             guard.is_none(),
    1852            0 :             "multiple launches / re-launches of WAL receiver are not supported"
    1853              :         );
    1854            0 :         *guard = Some(WalReceiver::start(
    1855            0 :             Arc::clone(self),
    1856            0 :             WalReceiverConf {
    1857            0 :                 wal_connect_timeout,
    1858            0 :                 lagging_wal_timeout,
    1859            0 :                 max_lsn_wal_lag,
    1860            0 :                 auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
    1861            0 :                 availability_zone: self.conf.availability_zone.clone(),
    1862            0 :                 ingest_batch_size: self.conf.ingest_batch_size,
    1863            0 :             },
    1864            0 :             broker_client,
    1865            0 :             ctx,
    1866            0 :         ));
    1867            0 :     }
    1868              : 
    1869              :     /// Initialize with an empty layer map. Used when creating a new timeline.
    1870          290 :     pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
    1871          290 :         let mut layers = self.layers.try_write().expect(
    1872          290 :             "in the context where we call this function, no other task has access to the object",
    1873          290 :         );
    1874          290 :         layers.initialize_empty(Lsn(start_lsn.0));
    1875          290 :     }
    1876              : 
    1877              :     /// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
    1878              :     /// files.
    1879            6 :     pub(super) async fn load_layer_map(
    1880            6 :         &self,
    1881            6 :         disk_consistent_lsn: Lsn,
    1882            6 :         index_part: Option<IndexPart>,
    1883            6 :     ) -> anyhow::Result<()> {
    1884              :         use init::{Decision::*, Discovered, DismissedLayer};
    1885              :         use LayerFileName::*;
    1886              : 
    1887            6 :         let mut guard = self.layers.write().await;
    1888              : 
    1889            6 :         let timer = self.metrics.load_layer_map_histo.start_timer();
    1890            6 : 
    1891            6 :         // Scan timeline directory and create ImageFileName and DeltaFilename
    1892            6 :         // structs representing all files on disk
    1893            6 :         let timeline_path = self
    1894            6 :             .conf
    1895            6 :             .timeline_path(&self.tenant_shard_id, &self.timeline_id);
    1896            6 :         let conf = self.conf;
    1897            6 :         let span = tracing::Span::current();
    1898            6 : 
    1899            6 :         // Copy to move into the task we're about to spawn
    1900            6 :         let generation = self.generation;
    1901            6 :         let shard = self.get_shard_index();
    1902            6 :         let this = self.myself.upgrade().expect("&self method holds the arc");
    1903              : 
    1904            6 :         let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
    1905            6 :             move || {
    1906            6 :                 let _g = span.entered();
    1907            6 :                 let discovered = init::scan_timeline_dir(&timeline_path)?;
    1908            6 :                 let mut discovered_layers = Vec::with_capacity(discovered.len());
    1909            6 :                 let mut unrecognized_files = Vec::new();
    1910            6 : 
    1911            6 :                 let mut path = timeline_path;
    1912              : 
    1913           22 :                 for discovered in discovered {
    1914           16 :                     let (name, kind) = match discovered {
    1915           16 :                         Discovered::Layer(file_name, file_size) => {
    1916           16 :                             discovered_layers.push((file_name, file_size));
    1917           16 :                             continue;
    1918              :                         }
    1919              :                         Discovered::Metadata => {
    1920            0 :                             warn!("found legacy metadata file, these should have been removed in load_tenant_config");
    1921            0 :                             continue;
    1922              :                         }
    1923              :                         Discovered::IgnoredBackup => {
    1924            0 :                             continue;
    1925              :                         }
    1926            0 :                         Discovered::Unknown(file_name) => {
    1927            0 :                             // we will later error if there are any
    1928            0 :                             unrecognized_files.push(file_name);
    1929            0 :                             continue;
    1930              :                         }
    1931            0 :                         Discovered::Ephemeral(name) => (name, "old ephemeral file"),
    1932            0 :                         Discovered::Temporary(name) => (name, "temporary timeline file"),
    1933            0 :                         Discovered::TemporaryDownload(name) => (name, "temporary download"),
    1934              :                     };
    1935            0 :                     path.push(Utf8Path::new(&name));
    1936            0 :                     init::cleanup(&path, kind)?;
    1937            0 :                     path.pop();
    1938              :                 }
    1939              : 
    1940            6 :                 if !unrecognized_files.is_empty() {
    1941              :                     // assume that if there are any there are many many.
    1942            0 :                     let n = unrecognized_files.len();
    1943            0 :                     let first = &unrecognized_files[..n.min(10)];
    1944            0 :                     anyhow::bail!(
    1945            0 :                         "unrecognized files in timeline dir (total {n}), first 10: {first:?}"
    1946            0 :                     );
    1947            6 :                 }
    1948            6 : 
    1949            6 :                 let decided = init::reconcile(
    1950            6 :                     discovered_layers,
    1951            6 :                     index_part.as_ref(),
    1952            6 :                     disk_consistent_lsn,
    1953            6 :                     generation,
    1954            6 :                     shard,
    1955            6 :                 );
    1956            6 : 
    1957            6 :                 let mut loaded_layers = Vec::new();
    1958            6 :                 let mut needs_cleanup = Vec::new();
    1959            6 :                 let mut total_physical_size = 0;
    1960              : 
    1961           22 :                 for (name, decision) in decided {
    1962           16 :                     let decision = match decision {
    1963            0 :                         Ok(UseRemote { local, remote }) => {
    1964            0 :                             // Remote is authoritative, but we may still choose to retain
    1965            0 :                             // the local file if the contents appear to match
    1966            0 :                             if local.file_size() == remote.file_size() {
    1967              :                                 // Use the local file, but take the remote metadata so that we pick up
    1968              :                                 // the correct generation.
    1969            0 :                                 UseLocal(remote)
    1970              :                             } else {
    1971            0 :                                 path.push(name.file_name());
    1972            0 :                                 init::cleanup_local_file_for_remote(&path, &local, &remote)?;
    1973            0 :                                 path.pop();
    1974            0 :                                 UseRemote { local, remote }
    1975              :                             }
    1976              :                         }
    1977           16 :                         Ok(decision) => decision,
    1978            0 :                         Err(DismissedLayer::Future { local }) => {
    1979            0 :                             if local.is_some() {
    1980            0 :                                 path.push(name.file_name());
    1981            0 :                                 init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
    1982            0 :                                 path.pop();
    1983            0 :                             }
    1984            0 :                             needs_cleanup.push(name);
    1985            0 :                             continue;
    1986              :                         }
    1987            0 :                         Err(DismissedLayer::LocalOnly(local)) => {
    1988            0 :                             path.push(name.file_name());
    1989            0 :                             init::cleanup_local_only_file(&path, &name, &local)?;
    1990            0 :                             path.pop();
    1991            0 :                             // this file never existed remotely, we will have to do rework
    1992            0 :                             continue;
    1993              :                         }
    1994              :                     };
    1995              : 
    1996           16 :                     match &name {
    1997           12 :                         Delta(d) => assert!(d.lsn_range.end <= disk_consistent_lsn + 1),
    1998            4 :                         Image(i) => assert!(i.lsn <= disk_consistent_lsn),
    1999              :                     }
    2000              : 
    2001           16 :                     tracing::debug!(layer=%name, ?decision, "applied");
    2002              : 
    2003           16 :                     let layer = match decision {
    2004           16 :                         UseLocal(m) => {
    2005           16 :                             total_physical_size += m.file_size();
    2006           16 :                             Layer::for_resident(conf, &this, name, m).drop_eviction_guard()
    2007              :                         }
    2008            0 :                         Evicted(remote) | UseRemote { remote, .. } => {
    2009            0 :                             Layer::for_evicted(conf, &this, name, remote)
    2010              :                         }
    2011              :                     };
    2012              : 
    2013           16 :                     loaded_layers.push(layer);
    2014              :                 }
    2015            6 :                 Ok((loaded_layers, needs_cleanup, total_physical_size))
    2016            6 :             }
    2017            6 :         })
    2018            6 :         .await
    2019            6 :         .map_err(anyhow::Error::new)
    2020            6 :         .and_then(|x| x)?;
    2021              : 
    2022            6 :         let num_layers = loaded_layers.len();
    2023            6 : 
    2024            6 :         guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
    2025              : 
    2026            6 :         if let Some(rtc) = self.remote_client.as_ref() {
    2027            6 :             rtc.schedule_layer_file_deletion(&needs_cleanup)?;
    2028            6 :             rtc.schedule_index_upload_for_file_changes()?;
    2029              :             // This barrier orders above DELETEs before any later operations.
    2030              :             // This is critical because code executing after the barrier might
    2031              :             // create again objects with the same key that we just scheduled for deletion.
    2032              :             // For example, if we just scheduled deletion of an image layer "from the future",
    2033              :             // later compaction might run again and re-create the same image layer.
    2034              :             // "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
    2035              :             // "same" here means same key range and LSN.
    2036              :             //
    2037              :             // Without a barrier between above DELETEs and the re-creation's PUTs,
    2038              :             // the upload queue may execute the PUT first, then the DELETE.
    2039              :             // In our example, we will end up with an IndexPart referencing a non-existent object.
    2040              :             //
    2041              :             // 1. a future image layer is created and uploaded
    2042              :             // 2. ps restart
    2043              :             // 3. the future layer from (1) is deleted during load layer map
    2044              :             // 4. image layer is re-created and uploaded
    2045              :             // 5. deletion queue would like to delete (1) but actually deletes (4)
    2046              :             // 6. delete by name works as expected, but it now deletes the wrong (later) version
    2047              :             //
    2048              :             // See https://github.com/neondatabase/neon/issues/5878
    2049              :             //
    2050              :             // NB: generation numbers naturally protect against this because they disambiguate
    2051              :             //     (1) and (4)
    2052            6 :             rtc.schedule_barrier()?;
    2053              :             // Tenant::create_timeline will wait for these uploads to happen before returning, or
    2054              :             // on retry.
    2055            0 :         }
    2056              : 
    2057            6 :         info!(
    2058            6 :             "loaded layer map with {} layers at {}, total physical size: {}",
    2059            6 :             num_layers, disk_consistent_lsn, total_physical_size
    2060            6 :         );
    2061              : 
    2062            6 :         timer.stop_and_record();
    2063            6 :         Ok(())
    2064            6 :     }
    2065              : 
    2066              :     /// Retrieve current logical size of the timeline.
    2067              :     ///
    2068              :     /// The size could be lagging behind the actual number, in case
    2069              :     /// the initial size calculation has not been run (gets triggered on the first size access).
    2070              :     ///
    2071              :     /// return size and boolean flag that shows if the size is exact
    2072            0 :     pub(crate) fn get_current_logical_size(
    2073            0 :         self: &Arc<Self>,
    2074            0 :         priority: GetLogicalSizePriority,
    2075            0 :         ctx: &RequestContext,
    2076            0 :     ) -> logical_size::CurrentLogicalSize {
    2077            0 :         if !self.tenant_shard_id.is_zero() {
    2078              :             // Logical size is only accurately maintained on shard zero: when called elsewhere, for example
    2079              :             // when HTTP API is serving a GET for timeline zero, return zero
    2080            0 :             return logical_size::CurrentLogicalSize::Approximate(logical_size::Approximate::zero());
    2081            0 :         }
    2082            0 : 
    2083            0 :         let current_size = self.current_logical_size.current_size();
    2084            0 :         debug!("Current size: {current_size:?}");
    2085              : 
    2086            0 :         match (current_size.accuracy(), priority) {
    2087            0 :             (logical_size::Accuracy::Exact, _) => (), // nothing to do
    2088            0 :             (logical_size::Accuracy::Approximate, GetLogicalSizePriority::Background) => {
    2089            0 :                 // background task will eventually deliver an exact value, we're in no rush
    2090            0 :             }
    2091              :             (logical_size::Accuracy::Approximate, GetLogicalSizePriority::User) => {
    2092              :                 // background task is not ready, but user is asking for it now;
    2093              :                 // => make the background task skip the line
    2094              :                 // (The alternative would be to calculate the size here, but,
    2095              :                 //  it can actually take a long time if the user has a lot of rels.
    2096              :                 //  And we'll inevitable need it again; So, let the background task do the work.)
    2097            0 :                 match self
    2098            0 :                     .current_logical_size
    2099            0 :                     .cancel_wait_for_background_loop_concurrency_limit_semaphore
    2100            0 :                     .get()
    2101              :                 {
    2102            0 :                     Some(cancel) => cancel.cancel(),
    2103              :                     None => {
    2104            0 :                         let state = self.current_state();
    2105            0 :                         if matches!(
    2106            0 :                             state,
    2107              :                             TimelineState::Broken { .. } | TimelineState::Stopping
    2108            0 :                         ) {
    2109            0 : 
    2110            0 :                             // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
    2111            0 :                             // Don't make noise.
    2112            0 :                         } else {
    2113            0 :                             warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work");
    2114              :                         }
    2115              :                     }
    2116              :                 };
    2117              :             }
    2118              :         }
    2119              : 
    2120            0 :         if let CurrentLogicalSize::Approximate(_) = &current_size {
    2121            0 :             if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler {
    2122            0 :                 let first = self
    2123            0 :                     .current_logical_size
    2124            0 :                     .did_return_approximate_to_walreceiver
    2125            0 :                     .compare_exchange(
    2126            0 :                         false,
    2127            0 :                         true,
    2128            0 :                         AtomicOrdering::Relaxed,
    2129            0 :                         AtomicOrdering::Relaxed,
    2130            0 :                     )
    2131            0 :                     .is_ok();
    2132            0 :                 if first {
    2133            0 :                     crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc();
    2134            0 :                 }
    2135            0 :             }
    2136            0 :         }
    2137              : 
    2138            0 :         current_size
    2139            0 :     }
    2140              : 
    2141            0 :     fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &RequestContext) {
    2142            0 :         let Some(initial_part_end) = self.current_logical_size.initial_part_end else {
    2143              :             // nothing to do for freshly created timelines;
    2144            0 :             assert_eq!(
    2145            0 :                 self.current_logical_size.current_size().accuracy(),
    2146            0 :                 logical_size::Accuracy::Exact,
    2147            0 :             );
    2148            0 :             self.current_logical_size.initialized.add_permits(1);
    2149            0 :             return;
    2150              :         };
    2151              : 
    2152            0 :         let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new();
    2153            0 :         let token = cancel_wait_for_background_loop_concurrency_limit_semaphore.clone();
    2154            0 :         self.current_logical_size
    2155            0 :             .cancel_wait_for_background_loop_concurrency_limit_semaphore.set(token)
    2156            0 :             .expect("initial logical size calculation task must be spawned exactly once per Timeline object");
    2157            0 : 
    2158            0 :         let self_clone = Arc::clone(self);
    2159            0 :         let background_ctx = ctx.detached_child(
    2160            0 :             TaskKind::InitialLogicalSizeCalculation,
    2161            0 :             DownloadBehavior::Download,
    2162            0 :         );
    2163            0 :         task_mgr::spawn(
    2164            0 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    2165            0 :             task_mgr::TaskKind::InitialLogicalSizeCalculation,
    2166            0 :             Some(self.tenant_shard_id),
    2167            0 :             Some(self.timeline_id),
    2168            0 :             "initial size calculation",
    2169              :             false,
    2170              :             // NB: don't log errors here, task_mgr will do that.
    2171            0 :             async move {
    2172            0 :                 let cancel = task_mgr::shutdown_token();
    2173            0 :                 self_clone
    2174            0 :                     .initial_logical_size_calculation_task(
    2175            0 :                         initial_part_end,
    2176            0 :                         cancel_wait_for_background_loop_concurrency_limit_semaphore,
    2177            0 :                         cancel,
    2178            0 :                         background_ctx,
    2179            0 :                     )
    2180            0 :                     .await;
    2181            0 :                 Ok(())
    2182            0 :             }
    2183            0 :             .instrument(info_span!(parent: None, "initial_size_calculation", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id)),
    2184              :         );
    2185            0 :     }
    2186              : 
    2187            0 :     async fn initial_logical_size_calculation_task(
    2188            0 :         self: Arc<Self>,
    2189            0 :         initial_part_end: Lsn,
    2190            0 :         skip_concurrency_limiter: CancellationToken,
    2191            0 :         cancel: CancellationToken,
    2192            0 :         background_ctx: RequestContext,
    2193            0 :     ) {
    2194            0 :         scopeguard::defer! {
    2195            0 :             // Irrespective of the outcome of this operation, we should unblock anyone waiting for it.
    2196            0 :             self.current_logical_size.initialized.add_permits(1);
    2197            0 :         }
    2198              : 
    2199              :         enum BackgroundCalculationError {
    2200              :             Cancelled,
    2201              :             Other(anyhow::Error),
    2202              :         }
    2203              : 
    2204            0 :         let try_once = |attempt: usize| {
    2205            0 :             let background_ctx = &background_ctx;
    2206            0 :             let self_ref = &self;
    2207            0 :             let skip_concurrency_limiter = &skip_concurrency_limiter;
    2208            0 :             async move {
    2209            0 :                 let cancel = task_mgr::shutdown_token();
    2210            0 :                 let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
    2211            0 :                     BackgroundLoopKind::InitialLogicalSizeCalculation,
    2212            0 :                     background_ctx,
    2213            0 :                 );
    2214              : 
    2215              :                 use crate::metrics::initial_logical_size::StartCircumstances;
    2216            0 :                 let (_maybe_permit, circumstances) = tokio::select! {
    2217            0 :                     permit = wait_for_permit => {
    2218              :                         (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit)
    2219              :                     }
    2220              :                     _ = self_ref.cancel.cancelled() => {
    2221              :                         return Err(BackgroundCalculationError::Cancelled);
    2222              :                     }
    2223              :                     _ = cancel.cancelled() => {
    2224              :                         return Err(BackgroundCalculationError::Cancelled);
    2225              :                     },
    2226              :                     () = skip_concurrency_limiter.cancelled() => {
    2227              :                         // Some action that is part of a end user interaction requested logical size
    2228              :                         // => break out of the rate limit
    2229              :                         // TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime;
    2230              :                         // but then again what happens if they cancel; also, we should just be using
    2231              :                         // one runtime across the entire process, so, let's leave this for now.
    2232              :                         (None, StartCircumstances::SkippedConcurrencyLimiter)
    2233              :                     }
    2234              :                 };
    2235              : 
    2236            0 :                 let metrics_guard = if attempt == 1 {
    2237            0 :                     crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances)
    2238              :                 } else {
    2239            0 :                     crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
    2240              :                 };
    2241              : 
    2242            0 :                 match self_ref
    2243            0 :                     .logical_size_calculation_task(
    2244            0 :                         initial_part_end,
    2245            0 :                         LogicalSizeCalculationCause::Initial,
    2246            0 :                         background_ctx,
    2247            0 :                     )
    2248            0 :                     .await
    2249              :                 {
    2250            0 :                     Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
    2251              :                     Err(CalculateLogicalSizeError::Cancelled) => {
    2252            0 :                         Err(BackgroundCalculationError::Cancelled)
    2253              :                     }
    2254            0 :                     Err(CalculateLogicalSizeError::Other(err)) => {
    2255            0 :                         if let Some(PageReconstructError::AncestorStopping(_)) =
    2256            0 :                             err.root_cause().downcast_ref()
    2257              :                         {
    2258            0 :                             Err(BackgroundCalculationError::Cancelled)
    2259              :                         } else {
    2260            0 :                             Err(BackgroundCalculationError::Other(err))
    2261              :                         }
    2262              :                     }
    2263              :                 }
    2264            0 :             }
    2265            0 :         };
    2266              : 
    2267            0 :         let retrying = async {
    2268            0 :             let mut attempt = 0;
    2269            0 :             loop {
    2270            0 :                 attempt += 1;
    2271            0 : 
    2272            0 :                 match try_once(attempt).await {
    2273            0 :                     Ok(res) => return ControlFlow::Continue(res),
    2274            0 :                     Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
    2275            0 :                     Err(BackgroundCalculationError::Other(e)) => {
    2276            0 :                         warn!(attempt, "initial size calculation failed: {e:?}");
    2277              :                         // exponential back-off doesn't make sense at these long intervals;
    2278              :                         // use fixed retry interval with generous jitter instead
    2279            0 :                         let sleep_duration = Duration::from_secs(
    2280            0 :                             u64::try_from(
    2281            0 :                                 // 1hour base
    2282            0 :                                 (60_i64 * 60_i64)
    2283            0 :                                     // 10min jitter
    2284            0 :                                     + rand::thread_rng().gen_range(-10 * 60..10 * 60),
    2285            0 :                             )
    2286            0 :                             .expect("10min < 1hour"),
    2287            0 :                         );
    2288            0 :                         tokio::time::sleep(sleep_duration).await;
    2289              :                     }
    2290              :                 }
    2291              :             }
    2292            0 :         };
    2293              : 
    2294            0 :         let (calculated_size, metrics_guard) = tokio::select! {
    2295            0 :             res = retrying  => {
    2296              :                 match res {
    2297              :                     ControlFlow::Continue(calculated_size) => calculated_size,
    2298              :                     ControlFlow::Break(()) => return,
    2299              :                 }
    2300              :             }
    2301              :             _ = cancel.cancelled() => {
    2302              :                 return;
    2303              :             }
    2304              :         };
    2305              : 
    2306              :         // we cannot query current_logical_size.current_size() to know the current
    2307              :         // *negative* value, only truncated to u64.
    2308            0 :         let added = self
    2309            0 :             .current_logical_size
    2310            0 :             .size_added_after_initial
    2311            0 :             .load(AtomicOrdering::Relaxed);
    2312            0 : 
    2313            0 :         let sum = calculated_size.saturating_add_signed(added);
    2314            0 : 
    2315            0 :         // set the gauge value before it can be set in `update_current_logical_size`.
    2316            0 :         self.metrics.current_logical_size_gauge.set(sum);
    2317            0 : 
    2318            0 :         self.current_logical_size
    2319            0 :             .initial_logical_size
    2320            0 :             .set((calculated_size, metrics_guard.calculation_result_saved()))
    2321            0 :             .ok()
    2322            0 :             .expect("only this task sets it");
    2323            0 :     }
    2324              : 
    2325            0 :     pub(crate) fn spawn_ondemand_logical_size_calculation(
    2326            0 :         self: &Arc<Self>,
    2327            0 :         lsn: Lsn,
    2328            0 :         cause: LogicalSizeCalculationCause,
    2329            0 :         ctx: RequestContext,
    2330            0 :     ) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
    2331            0 :         let (sender, receiver) = oneshot::channel();
    2332            0 :         let self_clone = Arc::clone(self);
    2333            0 :         // XXX if our caller loses interest, i.e., ctx is cancelled,
    2334            0 :         // we should stop the size calculation work and return an error.
    2335            0 :         // That would require restructuring this function's API to
    2336            0 :         // return the result directly, instead of a Receiver for the result.
    2337            0 :         let ctx = ctx.detached_child(
    2338            0 :             TaskKind::OndemandLogicalSizeCalculation,
    2339            0 :             DownloadBehavior::Download,
    2340            0 :         );
    2341            0 :         task_mgr::spawn(
    2342            0 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    2343            0 :             task_mgr::TaskKind::OndemandLogicalSizeCalculation,
    2344            0 :             Some(self.tenant_shard_id),
    2345            0 :             Some(self.timeline_id),
    2346            0 :             "ondemand logical size calculation",
    2347            0 :             false,
    2348            0 :             async move {
    2349            0 :                 let res = self_clone
    2350            0 :                     .logical_size_calculation_task(lsn, cause, &ctx)
    2351            0 :                     .await;
    2352            0 :                 let _ = sender.send(res).ok();
    2353            0 :                 Ok(()) // Receiver is responsible for handling errors
    2354            0 :             }
    2355            0 :             .in_current_span(),
    2356            0 :         );
    2357            0 :         receiver
    2358            0 :     }
    2359              : 
    2360              :     /// # Cancel-Safety
    2361              :     ///
    2362              :     /// This method is cancellation-safe.
    2363            0 :     #[instrument(skip_all)]
    2364              :     async fn logical_size_calculation_task(
    2365              :         self: &Arc<Self>,
    2366              :         lsn: Lsn,
    2367              :         cause: LogicalSizeCalculationCause,
    2368              :         ctx: &RequestContext,
    2369              :     ) -> Result<u64, CalculateLogicalSizeError> {
    2370              :         crate::span::debug_assert_current_span_has_tenant_and_timeline_id();
    2371              :         // We should never be calculating logical sizes on shard !=0, because these shards do not have
    2372              :         // accurate relation sizes, and they do not emit consumption metrics.
    2373              :         debug_assert!(self.tenant_shard_id.is_zero());
    2374              : 
    2375              :         let guard = self
    2376              :             .gate
    2377              :             .enter()
    2378            0 :             .map_err(|_| CalculateLogicalSizeError::Cancelled)?;
    2379              : 
    2380              :         let self_calculation = Arc::clone(self);
    2381              : 
    2382            0 :         let mut calculation = pin!(async {
    2383            0 :             let ctx = ctx.attached_child();
    2384            0 :             self_calculation
    2385            0 :                 .calculate_logical_size(lsn, cause, &guard, &ctx)
    2386            0 :                 .await
    2387            0 :         });
    2388              : 
    2389            0 :         tokio::select! {
    2390            0 :             res = &mut calculation => { res }
    2391              :             _ = self.cancel.cancelled() => {
    2392            0 :                 debug!("cancelling logical size calculation for timeline shutdown");
    2393              :                 calculation.await
    2394              :             }
    2395              :             _ = task_mgr::shutdown_watcher() => {
    2396            0 :                 debug!("cancelling logical size calculation for task shutdown");
    2397              :                 calculation.await
    2398              :             }
    2399              :         }
    2400              :     }
    2401              : 
    2402              :     /// Calculate the logical size of the database at the latest LSN.
    2403              :     ///
    2404              :     /// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
    2405              :     /// especially if we need to download remote layers.
    2406              :     ///
    2407              :     /// # Cancel-Safety
    2408              :     ///
    2409              :     /// This method is cancellation-safe.
    2410            0 :     async fn calculate_logical_size(
    2411            0 :         &self,
    2412            0 :         up_to_lsn: Lsn,
    2413            0 :         cause: LogicalSizeCalculationCause,
    2414            0 :         _guard: &GateGuard,
    2415            0 :         ctx: &RequestContext,
    2416            0 :     ) -> Result<u64, CalculateLogicalSizeError> {
    2417            0 :         info!(
    2418            0 :             "Calculating logical size for timeline {} at {}",
    2419            0 :             self.timeline_id, up_to_lsn
    2420            0 :         );
    2421              : 
    2422            0 :         pausable_failpoint!("timeline-calculate-logical-size-pause");
    2423              : 
    2424              :         // See if we've already done the work for initial size calculation.
    2425              :         // This is a short-cut for timelines that are mostly unused.
    2426            0 :         if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
    2427            0 :             return Ok(size);
    2428            0 :         }
    2429            0 :         let storage_time_metrics = match cause {
    2430              :             LogicalSizeCalculationCause::Initial
    2431              :             | LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
    2432            0 :             | LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
    2433              :             LogicalSizeCalculationCause::EvictionTaskImitation => {
    2434            0 :                 &self.metrics.imitate_logical_size_histo
    2435              :             }
    2436              :         };
    2437            0 :         let timer = storage_time_metrics.start_timer();
    2438            0 :         let logical_size = self
    2439            0 :             .get_current_logical_size_non_incremental(up_to_lsn, ctx)
    2440            0 :             .await?;
    2441            0 :         debug!("calculated logical size: {logical_size}");
    2442            0 :         timer.stop_and_record();
    2443            0 :         Ok(logical_size)
    2444            0 :     }
    2445              : 
    2446              :     /// Update current logical size, adding `delta' to the old value.
    2447       270570 :     fn update_current_logical_size(&self, delta: i64) {
    2448       270570 :         let logical_size = &self.current_logical_size;
    2449       270570 :         logical_size.increment_size(delta);
    2450       270570 : 
    2451       270570 :         // Also set the value in the prometheus gauge. Note that
    2452       270570 :         // there is a race condition here: if this is is called by two
    2453       270570 :         // threads concurrently, the prometheus gauge might be set to
    2454       270570 :         // one value while current_logical_size is set to the
    2455       270570 :         // other.
    2456       270570 :         match logical_size.current_size() {
    2457       270570 :             CurrentLogicalSize::Exact(ref new_current_size) => self
    2458       270570 :                 .metrics
    2459       270570 :                 .current_logical_size_gauge
    2460       270570 :                 .set(new_current_size.into()),
    2461            0 :             CurrentLogicalSize::Approximate(_) => {
    2462            0 :                 // don't update the gauge yet, this allows us not to update the gauge back and
    2463            0 :                 // forth between the initial size calculation task.
    2464            0 :             }
    2465              :         }
    2466       270570 :     }
    2467              : 
    2468         2424 :     pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: u64) {
    2469         2424 :         self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed);
    2470         2424 :         let aux_metric =
    2471         2424 :             self.directory_metrics[DirectoryKind::AuxFiles.offset()].load(AtomicOrdering::Relaxed);
    2472         2424 : 
    2473         2424 :         let sum_of_entries = self
    2474         2424 :             .directory_metrics
    2475         2424 :             .iter()
    2476        16968 :             .map(|v| v.load(AtomicOrdering::Relaxed))
    2477         2424 :             .sum();
    2478         2424 :         // Set a high general threshold and a lower threshold for the auxiliary files,
    2479         2424 :         // as we can have large numbers of relations in the db directory.
    2480         2424 :         const SUM_THRESHOLD: u64 = 5000;
    2481         2424 :         const AUX_THRESHOLD: u64 = 1000;
    2482         2424 :         if sum_of_entries >= SUM_THRESHOLD || aux_metric >= AUX_THRESHOLD {
    2483            0 :             self.metrics
    2484            0 :                 .directory_entries_count_gauge
    2485            0 :                 .set(sum_of_entries);
    2486         2424 :         } else if let Some(metric) = Lazy::get(&self.metrics.directory_entries_count_gauge) {
    2487            0 :             metric.set(sum_of_entries);
    2488         2424 :         }
    2489         2424 :     }
    2490              : 
    2491            0 :     async fn find_layer(&self, layer_file_name: &str) -> Option<Layer> {
    2492            0 :         let guard = self.layers.read().await;
    2493            0 :         for historic_layer in guard.layer_map().iter_historic_layers() {
    2494            0 :             let historic_layer_name = historic_layer.filename().file_name();
    2495            0 :             if layer_file_name == historic_layer_name {
    2496            0 :                 return Some(guard.get_from_desc(&historic_layer));
    2497            0 :             }
    2498              :         }
    2499              : 
    2500            0 :         None
    2501            0 :     }
    2502              : 
    2503              :     /// The timeline heatmap is a hint to secondary locations from the primary location,
    2504              :     /// indicating which layers are currently on-disk on the primary.
    2505              :     ///
    2506              :     /// None is returned if the Timeline is in a state where uploading a heatmap
    2507              :     /// doesn't make sense, such as shutting down or initializing.  The caller
    2508              :     /// should treat this as a cue to simply skip doing any heatmap uploading
    2509              :     /// for this timeline.
    2510            0 :     pub(crate) async fn generate_heatmap(&self) -> Option<HeatMapTimeline> {
    2511              :         // no point in heatmaps without remote client
    2512            0 :         let _remote_client = self.remote_client.as_ref()?;
    2513              : 
    2514            0 :         if !self.is_active() {
    2515            0 :             return None;
    2516            0 :         }
    2517              : 
    2518            0 :         let guard = self.layers.read().await;
    2519              : 
    2520            0 :         let resident = guard.resident_layers().map(|layer| {
    2521            0 :             let last_activity_ts = layer.access_stats().latest_activity_or_now();
    2522            0 : 
    2523            0 :             HeatMapLayer::new(
    2524            0 :                 layer.layer_desc().filename(),
    2525            0 :                 layer.metadata().into(),
    2526            0 :                 last_activity_ts,
    2527            0 :             )
    2528            0 :         });
    2529              : 
    2530            0 :         let layers = resident.collect().await;
    2531              : 
    2532            0 :         Some(HeatMapTimeline::new(self.timeline_id, layers))
    2533            0 :     }
    2534              : }
    2535              : 
    2536              : type TraversalId = String;
    2537              : 
    2538              : trait TraversalLayerExt {
    2539              :     fn traversal_id(&self) -> TraversalId;
    2540              : }
    2541              : 
    2542              : impl TraversalLayerExt for Layer {
    2543          104 :     fn traversal_id(&self) -> TraversalId {
    2544          104 :         self.local_path().to_string()
    2545          104 :     }
    2546              : }
    2547              : 
    2548              : impl TraversalLayerExt for Arc<InMemoryLayer> {
    2549            4 :     fn traversal_id(&self) -> TraversalId {
    2550            4 :         format!("timeline {} in-memory {self}", self.get_timeline_id())
    2551            4 :     }
    2552              : }
    2553              : 
    2554              : impl Timeline {
    2555              :     ///
    2556              :     /// Get a handle to a Layer for reading.
    2557              :     ///
    2558              :     /// The returned Layer might be from an ancestor timeline, if the
    2559              :     /// segment hasn't been updated on this timeline yet.
    2560              :     ///
    2561              :     /// This function takes the current timeline's locked LayerMap as an argument,
    2562              :     /// so callers can avoid potential race conditions.
    2563              :     ///
    2564              :     /// # Cancel-Safety
    2565              :     ///
    2566              :     /// This method is cancellation-safe.
    2567       502524 :     async fn get_reconstruct_data(
    2568       502524 :         &self,
    2569       502524 :         key: Key,
    2570       502524 :         request_lsn: Lsn,
    2571       502524 :         reconstruct_state: &mut ValueReconstructState,
    2572       502524 :         ctx: &RequestContext,
    2573       502524 :     ) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
    2574       502524 :         // Start from the current timeline.
    2575       502524 :         let mut timeline_owned;
    2576       502524 :         let mut timeline = self;
    2577       502524 : 
    2578       502524 :         let mut read_count = scopeguard::guard(0, |cnt| {
    2579       502524 :             crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
    2580       502524 :         });
    2581       502524 : 
    2582       502524 :         // For debugging purposes, collect the path of layers that we traversed
    2583       502524 :         // through. It's included in the error message if we fail to find the key.
    2584       502524 :         let mut traversal_path = Vec::<TraversalPathItem>::new();
    2585              : 
    2586       502524 :         let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
    2587            0 :             *cached_lsn
    2588              :         } else {
    2589       502524 :             Lsn(0)
    2590              :         };
    2591              : 
    2592              :         // 'prev_lsn' tracks the last LSN that we were at in our search. It's used
    2593              :         // to check that each iteration make some progress, to break infinite
    2594              :         // looping if something goes wrong.
    2595       502524 :         let mut prev_lsn = Lsn(u64::MAX);
    2596       502524 : 
    2597       502524 :         let mut result = ValueReconstructResult::Continue;
    2598       502524 :         let mut cont_lsn = Lsn(request_lsn.0 + 1);
    2599              : 
    2600      1356666 :         'outer: loop {
    2601      1356666 :             if self.cancel.is_cancelled() {
    2602            0 :                 return Err(PageReconstructError::Cancelled);
    2603      1356666 :             }
    2604      1356666 : 
    2605      1356666 :             // The function should have updated 'state'
    2606      1356666 :             //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
    2607      1356666 :             match result {
    2608       502416 :                 ValueReconstructResult::Complete => return Ok(traversal_path),
    2609              :                 ValueReconstructResult::Continue => {
    2610              :                     // If we reached an earlier cached page image, we're done.
    2611       854244 :                     if cont_lsn == cached_lsn + 1 {
    2612            0 :                         MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
    2613            0 :                         return Ok(traversal_path);
    2614       854244 :                     }
    2615       854244 :                     if prev_lsn <= cont_lsn {
    2616              :                         // Didn't make any progress in last iteration. Error out to avoid
    2617              :                         // getting stuck in the loop.
    2618          100 :                         return Err(layer_traversal_error(format!(
    2619          100 :                             "could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
    2620          100 :                             key,
    2621          100 :                             Lsn(cont_lsn.0 - 1),
    2622          100 :                             request_lsn,
    2623          100 :                             timeline.ancestor_lsn
    2624          100 :                         ), traversal_path));
    2625       854144 :                     }
    2626       854144 :                     prev_lsn = cont_lsn;
    2627              :                 }
    2628              :                 ValueReconstructResult::Missing => {
    2629              :                     return Err(layer_traversal_error(
    2630            6 :                         if cfg!(test) {
    2631            6 :                             format!(
    2632            6 :                                 "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}\n{}",
    2633            6 :                                 key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
    2634            6 :                             )
    2635              :                         } else {
    2636            0 :                             format!(
    2637            0 :                                 "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}",
    2638            0 :                                 key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn
    2639            0 :                             )
    2640              :                         },
    2641            6 :                         traversal_path,
    2642              :                     ));
    2643              :                 }
    2644              :             }
    2645              : 
    2646              :             // Recurse into ancestor if needed
    2647       854144 :             if is_inherited_key(key) && Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
    2648            0 :                 trace!(
    2649            0 :                     "going into ancestor {}, cont_lsn is {}",
    2650            0 :                     timeline.ancestor_lsn,
    2651            0 :                     cont_lsn
    2652            0 :                 );
    2653              : 
    2654       226769 :                 timeline_owned = timeline.get_ready_ancestor_timeline(ctx).await?;
    2655       226767 :                 timeline = &*timeline_owned;
    2656       226767 :                 prev_lsn = Lsn(u64::MAX);
    2657       226767 :                 continue 'outer;
    2658       627375 :             }
    2659              : 
    2660       627375 :             let guard = timeline.layers.read().await;
    2661       627375 :             let layers = guard.layer_map();
    2662              : 
    2663              :             // Check the open and frozen in-memory layers first, in order from newest
    2664              :             // to oldest.
    2665       627375 :             if let Some(open_layer) = &layers.open_layer {
    2666       556312 :                 let start_lsn = open_layer.get_lsn_range().start;
    2667       556312 :                 if cont_lsn > start_lsn {
    2668              :                     //info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
    2669              :                     // Get all the data needed to reconstruct the page version from this layer.
    2670              :                     // But if we have an older cached page image, no need to go past that.
    2671       502209 :                     let lsn_floor = max(cached_lsn + 1, start_lsn);
    2672       502209 :                     result = match open_layer
    2673       502209 :                         .get_value_reconstruct_data(
    2674       502209 :                             key,
    2675       502209 :                             lsn_floor..cont_lsn,
    2676       502209 :                             reconstruct_state,
    2677       502209 :                             ctx,
    2678       502209 :                         )
    2679         8526 :                         .await
    2680              :                     {
    2681       502209 :                         Ok(result) => result,
    2682            0 :                         Err(e) => return Err(PageReconstructError::from(e)),
    2683              :                     };
    2684       502209 :                     cont_lsn = lsn_floor;
    2685       502209 :                     // metrics: open_layer does not count as fs access, so we are not updating `read_count`
    2686       502209 :                     traversal_path.push((
    2687       502209 :                         result,
    2688       502209 :                         cont_lsn,
    2689       502209 :                         Box::new({
    2690       502209 :                             let open_layer = Arc::clone(open_layer);
    2691       502209 :                             move || open_layer.traversal_id()
    2692       502209 :                         }),
    2693       502209 :                     ));
    2694       502209 :                     continue 'outer;
    2695        54103 :                 }
    2696        71063 :             }
    2697       125166 :             for frozen_layer in layers.frozen_layers.iter().rev() {
    2698         1036 :                 let start_lsn = frozen_layer.get_lsn_range().start;
    2699         1036 :                 if cont_lsn > start_lsn {
    2700              :                     //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
    2701         1036 :                     let lsn_floor = max(cached_lsn + 1, start_lsn);
    2702         1036 :                     result = match frozen_layer
    2703         1036 :                         .get_value_reconstruct_data(
    2704         1036 :                             key,
    2705         1036 :                             lsn_floor..cont_lsn,
    2706         1036 :                             reconstruct_state,
    2707         1036 :                             ctx,
    2708         1036 :                         )
    2709            0 :                         .await
    2710              :                     {
    2711         1036 :                         Ok(result) => result,
    2712            0 :                         Err(e) => return Err(PageReconstructError::from(e)),
    2713              :                     };
    2714         1036 :                     cont_lsn = lsn_floor;
    2715         1036 :                     // metrics: open_layer does not count as fs access, so we are not updating `read_count`
    2716         1036 :                     traversal_path.push((
    2717         1036 :                         result,
    2718         1036 :                         cont_lsn,
    2719         1036 :                         Box::new({
    2720         1036 :                             let frozen_layer = Arc::clone(frozen_layer);
    2721         1036 :                             move || frozen_layer.traversal_id()
    2722         1036 :                         }),
    2723         1036 :                     ));
    2724         1036 :                     continue 'outer;
    2725            0 :                 }
    2726              :             }
    2727              : 
    2728       124130 :             if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
    2729       124028 :                 let layer = guard.get_from_desc(&layer);
    2730       124028 :                 // Get all the data needed to reconstruct the page version from this layer.
    2731       124028 :                 // But if we have an older cached page image, no need to go past that.
    2732       124028 :                 let lsn_floor = max(cached_lsn + 1, lsn_floor);
    2733       124028 :                 result = match layer
    2734       124028 :                     .get_value_reconstruct_data(key, lsn_floor..cont_lsn, reconstruct_state, ctx)
    2735        23310 :                     .await
    2736              :                 {
    2737       124028 :                     Ok(result) => result,
    2738            0 :                     Err(e) => return Err(PageReconstructError::from(e)),
    2739              :                 };
    2740       124028 :                 cont_lsn = lsn_floor;
    2741       124028 :                 *read_count += 1;
    2742       124028 :                 traversal_path.push((
    2743       124028 :                     result,
    2744       124028 :                     cont_lsn,
    2745       124028 :                     Box::new({
    2746       124028 :                         let layer = layer.to_owned();
    2747       124028 :                         move || layer.traversal_id()
    2748       124028 :                     }),
    2749       124028 :                 ));
    2750       124028 :                 continue 'outer;
    2751          102 :             } else if timeline.ancestor_timeline.is_some() {
    2752              :                 // Nothing on this timeline. Traverse to parent
    2753          100 :                 result = ValueReconstructResult::Continue;
    2754          100 :                 cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
    2755          100 :                 continue 'outer;
    2756              :             } else {
    2757              :                 // Nothing found
    2758            2 :                 result = ValueReconstructResult::Missing;
    2759            2 :                 continue 'outer;
    2760              :             }
    2761              :         }
    2762       502524 :     }
    2763              : 
    2764              :     /// Get the data needed to reconstruct all keys in the provided keyspace
    2765              :     ///
    2766              :     /// The algorithm is as follows:
    2767              :     /// 1.   While some keys are still not done and there's a timeline to visit:
    2768              :     /// 2.   Visit the timeline (see [`Timeline::get_vectored_reconstruct_data_timeline`]:
    2769              :     /// 2.1: Build the fringe for the current keyspace
    2770              :     /// 2.2  Visit the newest layer from the fringe to collect all values for the range it
    2771              :     ///      intersects
    2772              :     /// 2.3. Pop the timeline from the fringe
    2773              :     /// 2.4. If the fringe is empty, go back to 1
    2774           10 :     async fn get_vectored_reconstruct_data(
    2775           10 :         &self,
    2776           10 :         mut keyspace: KeySpace,
    2777           10 :         request_lsn: Lsn,
    2778           10 :         reconstruct_state: &mut ValuesReconstructState,
    2779           10 :         ctx: &RequestContext,
    2780           10 :     ) -> Result<(), GetVectoredError> {
    2781           10 :         let mut timeline_owned: Arc<Timeline>;
    2782           10 :         let mut timeline = self;
    2783           10 : 
    2784           10 :         let mut cont_lsn = Lsn(request_lsn.0 + 1);
    2785              : 
    2786              :         loop {
    2787           10 :             if self.cancel.is_cancelled() {
    2788            0 :                 return Err(GetVectoredError::Cancelled);
    2789           10 :             }
    2790              : 
    2791           10 :             let completed = Self::get_vectored_reconstruct_data_timeline(
    2792           10 :                 timeline,
    2793           10 :                 keyspace.clone(),
    2794           10 :                 cont_lsn,
    2795           10 :                 reconstruct_state,
    2796           10 :                 &self.cancel,
    2797           10 :                 ctx,
    2798           10 :             )
    2799           25 :             .await?;
    2800              : 
    2801           10 :             keyspace.remove_overlapping_with(&completed);
    2802           10 :             if keyspace.total_size() == 0 || timeline.ancestor_timeline.is_none() {
    2803           10 :                 break;
    2804            0 :             }
    2805            0 : 
    2806            0 :             cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
    2807            0 :             timeline_owned = timeline
    2808            0 :                 .get_ready_ancestor_timeline(ctx)
    2809            0 :                 .await
    2810            0 :                 .map_err(GetVectoredError::GetReadyAncestorError)?;
    2811            0 :             timeline = &*timeline_owned;
    2812              :         }
    2813              : 
    2814           10 :         if keyspace.total_size() != 0 {
    2815            0 :             return Err(GetVectoredError::MissingKey(keyspace.start().unwrap()));
    2816           10 :         }
    2817           10 : 
    2818           10 :         Ok(())
    2819           10 :     }
    2820              : 
    2821              :     /// Collect the reconstruct data for a ketspace from the specified timeline.
    2822              :     ///
    2823              :     /// Maintain a fringe [`LayerFringe`] which tracks all the layers that intersect
    2824              :     /// the current keyspace. The current keyspace of the search at any given timeline
    2825              :     /// is the original keyspace minus all the keys that have been completed minus
    2826              :     /// any keys for which we couldn't find an intersecting layer. It's not tracked explicitly,
    2827              :     /// but if you merge all the keyspaces in the fringe, you get the "current keyspace".
    2828              :     ///
    2829              :     /// This is basically a depth-first search visitor implementation where a vertex
    2830              :     /// is the (layer, lsn range, key space) tuple. The fringe acts as the stack.
    2831              :     ///
    2832              :     /// At each iteration pop the top of the fringe (the layer with the highest Lsn)
    2833              :     /// and get all the required reconstruct data from the layer in one go.
    2834           10 :     async fn get_vectored_reconstruct_data_timeline(
    2835           10 :         timeline: &Timeline,
    2836           10 :         keyspace: KeySpace,
    2837           10 :         mut cont_lsn: Lsn,
    2838           10 :         reconstruct_state: &mut ValuesReconstructState,
    2839           10 :         cancel: &CancellationToken,
    2840           10 :         ctx: &RequestContext,
    2841           10 :     ) -> Result<KeySpace, GetVectoredError> {
    2842           10 :         let mut unmapped_keyspace = keyspace.clone();
    2843           10 :         let mut fringe = LayerFringe::new();
    2844           10 : 
    2845           10 :         let mut completed_keyspace = KeySpace::default();
    2846              : 
    2847              :         // Hold the layer map whilst visiting the timeline to prevent
    2848              :         // compaction, eviction and flushes from rendering the layers unreadable.
    2849              :         //
    2850              :         // TODO: Do we actually need to do this? In theory holding on
    2851              :         // to [`tenant::storage_layer::Layer`] should be enough. However,
    2852              :         // [`Timeline::get`] also holds the lock during IO, so more investigation
    2853              :         // is needed.
    2854           10 :         let guard = timeline.layers.read().await;
    2855           10 :         let layers = guard.layer_map();
    2856              : 
    2857           20 :         'outer: loop {
    2858           20 :             if cancel.is_cancelled() {
    2859            0 :                 return Err(GetVectoredError::Cancelled);
    2860           20 :             }
    2861           20 : 
    2862           20 :             let keys_done_last_step = reconstruct_state.consume_done_keys();
    2863           20 :             unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
    2864           20 :             completed_keyspace.merge(&keys_done_last_step);
    2865           20 : 
    2866           20 :             let in_memory_layer = layers.find_in_memory_layer(|l| {
    2867            0 :                 let start_lsn = l.get_lsn_range().start;
    2868            0 :                 cont_lsn > start_lsn
    2869           20 :             });
    2870           20 : 
    2871           20 :             match in_memory_layer {
    2872            0 :                 Some(l) => {
    2873            0 :                     fringe.update(
    2874            0 :                         ReadableLayerDesc::InMemory {
    2875            0 :                             handle: l,
    2876            0 :                             lsn_ceil: cont_lsn,
    2877            0 :                         },
    2878            0 :                         unmapped_keyspace.clone(),
    2879            0 :                     );
    2880            0 :                 }
    2881              :                 None => {
    2882           20 :                     for range in unmapped_keyspace.ranges.iter() {
    2883           10 :                         let results = match layers.range_search(range.clone(), cont_lsn) {
    2884           10 :                             Some(res) => res,
    2885              :                             None => {
    2886            0 :                                 break 'outer;
    2887              :                             }
    2888              :                         };
    2889              : 
    2890           10 :                         results
    2891           10 :                             .found
    2892           10 :                             .into_iter()
    2893           10 :                             .map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
    2894           10 :                                 (
    2895           10 :                                     ReadableLayerDesc::Persistent {
    2896           10 :                                         desc: (*layer).clone(),
    2897           10 :                                         lsn_range: lsn_floor..cont_lsn,
    2898           10 :                                     },
    2899           10 :                                     keyspace_accum.to_keyspace(),
    2900           10 :                                 )
    2901           10 :                             })
    2902           10 :                             .for_each(|(layer, keyspace)| fringe.update(layer, keyspace));
    2903              :                     }
    2904              :                 }
    2905              :             }
    2906              : 
    2907           20 :             if let Some((layer_to_read, keyspace_to_read)) = fringe.next_layer() {
    2908           10 :                 layer_to_read
    2909           10 :                     .get_values_reconstruct_data(
    2910           10 :                         &guard,
    2911           10 :                         keyspace_to_read.clone(),
    2912           10 :                         reconstruct_state,
    2913           10 :                         ctx,
    2914           10 :                     )
    2915           25 :                     .await?;
    2916              : 
    2917           10 :                 unmapped_keyspace = keyspace_to_read;
    2918           10 :                 cont_lsn = layer_to_read.get_lsn_floor();
    2919              :             } else {
    2920           10 :                 break;
    2921              :             }
    2922              :         }
    2923              : 
    2924           10 :         Ok(completed_keyspace)
    2925           10 :     }
    2926              : 
    2927              :     /// # Cancel-safety
    2928              :     ///
    2929              :     /// This method is cancellation-safe.
    2930       502524 :     async fn lookup_cached_page(
    2931       502524 :         &self,
    2932       502524 :         key: &Key,
    2933       502524 :         lsn: Lsn,
    2934       502524 :         ctx: &RequestContext,
    2935       502524 :     ) -> Option<(Lsn, Bytes)> {
    2936       502524 :         let cache = page_cache::get();
    2937              : 
    2938              :         // FIXME: It's pointless to check the cache for things that are not 8kB pages.
    2939              :         // We should look at the key to determine if it's a cacheable object
    2940       502524 :         let (lsn, read_guard) = cache
    2941       502524 :             .lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
    2942       502524 :             .await?;
    2943            0 :         let img = Bytes::from(read_guard.to_vec());
    2944            0 :         Some((lsn, img))
    2945       502524 :     }
    2946              : 
    2947       226769 :     async fn get_ready_ancestor_timeline(
    2948       226769 :         &self,
    2949       226769 :         ctx: &RequestContext,
    2950       226769 :     ) -> Result<Arc<Timeline>, GetReadyAncestorError> {
    2951       226769 :         let ancestor = match self.get_ancestor_timeline() {
    2952       226769 :             Ok(timeline) => timeline,
    2953            0 :             Err(e) => return Err(GetReadyAncestorError::from(e)),
    2954              :         };
    2955              : 
    2956              :         // It's possible that the ancestor timeline isn't active yet, or
    2957              :         // is active but hasn't yet caught up to the branch point. Wait
    2958              :         // for it.
    2959              :         //
    2960              :         // This cannot happen while the pageserver is running normally,
    2961              :         // because you cannot create a branch from a point that isn't
    2962              :         // present in the pageserver yet. However, we don't wait for the
    2963              :         // branch point to be uploaded to cloud storage before creating
    2964              :         // a branch. I.e., the branch LSN need not be remote consistent
    2965              :         // for the branching operation to succeed.
    2966              :         //
    2967              :         // Hence, if we try to load a tenant in such a state where
    2968              :         // 1. the existence of the branch was persisted (in IndexPart and/or locally)
    2969              :         // 2. but the ancestor state is behind branch_lsn because it was not yet persisted
    2970              :         // then we will need to wait for the ancestor timeline to
    2971              :         // re-stream WAL up to branch_lsn before we access it.
    2972              :         //
    2973              :         // How can a tenant get in such a state?
    2974              :         // - ungraceful pageserver process exit
    2975              :         // - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
    2976              :         //
    2977              :         // NB: this could be avoided by requiring
    2978              :         //   branch_lsn >= remote_consistent_lsn
    2979              :         // during branch creation.
    2980       226769 :         match ancestor.wait_to_become_active(ctx).await {
    2981       226767 :             Ok(()) => {}
    2982              :             Err(TimelineState::Stopping) => {
    2983            0 :                 return Err(GetReadyAncestorError::AncestorStopping(
    2984            0 :                     ancestor.timeline_id,
    2985            0 :                 ));
    2986              :             }
    2987            2 :             Err(state) => {
    2988            2 :                 return Err(GetReadyAncestorError::Other(anyhow::anyhow!(
    2989            2 :                     "Timeline {} will not become active. Current state: {:?}",
    2990            2 :                     ancestor.timeline_id,
    2991            2 :                     &state,
    2992            2 :                 )));
    2993              :             }
    2994              :         }
    2995       226767 :         ancestor
    2996       226767 :             .wait_lsn(self.ancestor_lsn, ctx)
    2997            0 :             .await
    2998       226767 :             .map_err(|e| match e {
    2999            0 :                 e @ WaitLsnError::Timeout(_) => GetReadyAncestorError::AncestorLsnTimeout(e),
    3000            0 :                 WaitLsnError::Shutdown => GetReadyAncestorError::Cancelled,
    3001            0 :                 e @ WaitLsnError::BadState => GetReadyAncestorError::Other(anyhow::anyhow!(e)),
    3002       226767 :             })?;
    3003              : 
    3004       226767 :         Ok(ancestor)
    3005       226769 :     }
    3006              : 
    3007       226769 :     fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
    3008       226769 :         let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
    3009            0 :             format!(
    3010            0 :                 "Ancestor is missing. Timeline id: {} Ancestor id {:?}",
    3011            0 :                 self.timeline_id,
    3012            0 :                 self.get_ancestor_timeline_id(),
    3013            0 :             )
    3014       226769 :         })?;
    3015       226769 :         Ok(Arc::clone(ancestor))
    3016       226769 :     }
    3017              : 
    3018           12 :     pub(crate) fn get_shard_identity(&self) -> &ShardIdentity {
    3019           12 :         &self.shard_identity
    3020           12 :     }
    3021              : 
    3022              :     ///
    3023              :     /// Get a handle to the latest layer for appending.
    3024              :     ///
    3025      2628044 :     async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
    3026      2628044 :         let mut guard = self.layers.write().await;
    3027      2628044 :         let layer = guard
    3028      2628044 :             .get_layer_for_write(
    3029      2628044 :                 lsn,
    3030      2628044 :                 self.get_last_record_lsn(),
    3031      2628044 :                 self.conf,
    3032      2628044 :                 self.timeline_id,
    3033      2628044 :                 self.tenant_shard_id,
    3034      2628044 :             )
    3035          362 :             .await?;
    3036      2628044 :         Ok(layer)
    3037      2628044 :     }
    3038              : 
    3039      2214102 :     async fn put_value(
    3040      2214102 :         &self,
    3041      2214102 :         key: Key,
    3042      2214102 :         lsn: Lsn,
    3043      2214102 :         val: &Value,
    3044      2214102 :         ctx: &RequestContext,
    3045      2214102 :     ) -> anyhow::Result<()> {
    3046              :         //info!("PUT: key {} at {}", key, lsn);
    3047      2214102 :         let layer = self.get_layer_for_write(lsn).await?;
    3048      2214102 :         layer.put_value(key, lsn, val, ctx).await?;
    3049      2214102 :         Ok(())
    3050      2214102 :     }
    3051              : 
    3052       413940 :     async fn put_values(
    3053       413940 :         &self,
    3054       413940 :         values: &HashMap<Key, Vec<(Lsn, Value)>>,
    3055       413940 :         ctx: &RequestContext,
    3056       413940 :     ) -> anyhow::Result<()> {
    3057              :         // Pick the first LSN in the batch to get the layer to write to.
    3058       413940 :         for lsns in values.values() {
    3059       413940 :             if let Some((lsn, _)) = lsns.first() {
    3060       413940 :                 let layer = self.get_layer_for_write(*lsn).await?;
    3061       413940 :                 layer.put_values(values, ctx).await?;
    3062       413940 :                 break;
    3063            0 :             }
    3064              :         }
    3065       413940 :         Ok(())
    3066       413940 :     }
    3067              : 
    3068            2 :     async fn put_tombstones(&self, tombstones: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
    3069            2 :         if let Some((_, lsn)) = tombstones.first() {
    3070            2 :             let layer = self.get_layer_for_write(*lsn).await?;
    3071            2 :             layer.put_tombstones(tombstones).await?;
    3072            0 :         }
    3073            2 :         Ok(())
    3074            2 :     }
    3075              : 
    3076      3102912 :     pub(crate) fn finish_write(&self, new_lsn: Lsn) {
    3077      3102912 :         assert!(new_lsn.is_aligned());
    3078              : 
    3079      3102912 :         self.metrics.last_record_gauge.set(new_lsn.0 as i64);
    3080      3102912 :         self.last_record_lsn.advance(new_lsn);
    3081      3102912 :     }
    3082              : 
    3083          530 :     async fn freeze_inmem_layer(&self, write_lock_held: bool) {
    3084              :         // Freeze the current open in-memory layer. It will be written to disk on next
    3085              :         // iteration.
    3086          530 :         let _write_guard = if write_lock_held {
    3087            0 :             None
    3088              :         } else {
    3089          530 :             Some(self.write_lock.lock().await)
    3090              :         };
    3091          530 :         let mut guard = self.layers.write().await;
    3092          530 :         guard
    3093          530 :             .try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at)
    3094            3 :             .await;
    3095          530 :     }
    3096              : 
    3097              :     /// Layer flusher task's main loop.
    3098          292 :     async fn flush_loop(
    3099          292 :         self: &Arc<Self>,
    3100          292 :         mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
    3101          292 :         ctx: &RequestContext,
    3102          292 :     ) {
    3103          292 :         info!("started flush loop");
    3104              :         loop {
    3105          822 :             tokio::select! {
    3106         1292 :                 _ = self.cancel.cancelled() => {
    3107         1292 :                     info!("shutting down layer flush task");
    3108         1292 :                     break;
    3109         1292 :                 },
    3110         1292 :                 _ = task_mgr::shutdown_watcher() => {
    3111         1292 :                     info!("shutting down layer flush task");
    3112         1292 :                     break;
    3113         1292 :                 },
    3114         1292 :                 _ = layer_flush_start_rx.changed() => {}
    3115         1292 :             }
    3116              : 
    3117            0 :             trace!("waking up");
    3118          530 :             let timer = self.metrics.flush_time_histo.start_timer();
    3119          530 :             let flush_counter = *layer_flush_start_rx.borrow();
    3120         1054 :             let result = loop {
    3121         1054 :                 if self.cancel.is_cancelled() {
    3122            0 :                     info!("dropping out of flush loop for timeline shutdown");
    3123              :                     // Note: we do not bother transmitting into [`layer_flush_done_tx`], because
    3124              :                     // anyone waiting on that will respect self.cancel as well: they will stop
    3125              :                     // waiting at the same time we as drop out of this loop.
    3126            0 :                     return;
    3127         1054 :                 }
    3128              : 
    3129         1054 :                 let layer_to_flush = {
    3130         1054 :                     let guard = self.layers.read().await;
    3131         1054 :                     guard.layer_map().frozen_layers.front().cloned()
    3132              :                     // drop 'layers' lock to allow concurrent reads and writes
    3133              :                 };
    3134         1054 :                 let Some(layer_to_flush) = layer_to_flush else {
    3135          530 :                     break Ok(());
    3136              :                 };
    3137         1716 :                 match self.flush_frozen_layer(layer_to_flush, ctx).await {
    3138          524 :                     Ok(()) => {}
    3139              :                     Err(FlushLayerError::Cancelled) => {
    3140            0 :                         info!("dropping out of flush loop for timeline shutdown");
    3141            0 :                         return;
    3142              :                     }
    3143            0 :                     err @ Err(
    3144              :                         FlushLayerError::Other(_) | FlushLayerError::CreateImageLayersError(_),
    3145              :                     ) => {
    3146            0 :                         error!("could not flush frozen layer: {err:?}");
    3147            0 :                         break err;
    3148              :                     }
    3149              :                 }
    3150              :             };
    3151              :             // Notify any listeners that we're done
    3152          530 :             let _ = self
    3153          530 :                 .layer_flush_done_tx
    3154          530 :                 .send_replace((flush_counter, result));
    3155          530 : 
    3156          530 :             timer.stop_and_record();
    3157              :         }
    3158            8 :     }
    3159              : 
    3160          530 :     async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
    3161          530 :         let mut rx = self.layer_flush_done_tx.subscribe();
    3162          530 : 
    3163          530 :         // Increment the flush cycle counter and wake up the flush task.
    3164          530 :         // Remember the new value, so that when we listen for the flush
    3165          530 :         // to finish, we know when the flush that we initiated has
    3166          530 :         // finished, instead of some other flush that was started earlier.
    3167          530 :         let mut my_flush_request = 0;
    3168          530 : 
    3169          530 :         let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
    3170          530 :         if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
    3171            0 :             anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
    3172          530 :         }
    3173          530 : 
    3174          530 :         self.layer_flush_start_tx.send_modify(|counter| {
    3175          530 :             my_flush_request = *counter + 1;
    3176          530 :             *counter = my_flush_request;
    3177          530 :         });
    3178              : 
    3179         1060 :         loop {
    3180         1060 :             {
    3181         1060 :                 let (last_result_counter, last_result) = &*rx.borrow();
    3182         1060 :                 if *last_result_counter >= my_flush_request {
    3183          530 :                     if let Err(_err) = last_result {
    3184              :                         // We already logged the original error in
    3185              :                         // flush_loop. We cannot propagate it to the caller
    3186              :                         // here, because it might not be Cloneable
    3187            0 :                         anyhow::bail!(
    3188            0 :                             "Could not flush frozen layer. Request id: {}",
    3189            0 :                             my_flush_request
    3190            0 :                         );
    3191              :                     } else {
    3192          530 :                         return Ok(());
    3193              :                     }
    3194          530 :                 }
    3195              :             }
    3196            0 :             trace!("waiting for flush to complete");
    3197          530 :             tokio::select! {
    3198          530 :                 rx_e = rx.changed() => {
    3199              :                     rx_e?;
    3200              :                 },
    3201              :                 // Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring
    3202              :                 // the notification from [`flush_loop`] that it completed.
    3203              :                 _ = self.cancel.cancelled() => {
    3204            0 :                     tracing::info!("Cancelled layer flush due on timeline shutdown");
    3205              :                     return Ok(())
    3206              :                 }
    3207              :             };
    3208            0 :             trace!("done")
    3209              :         }
    3210          530 :     }
    3211              : 
    3212            0 :     fn flush_frozen_layers(&self) {
    3213            0 :         self.layer_flush_start_tx.send_modify(|val| *val += 1);
    3214            0 :     }
    3215              : 
    3216              :     /// Flush one frozen in-memory layer to disk, as a new delta layer.
    3217         1048 :     #[instrument(skip_all, fields(layer=%frozen_layer))]
    3218              :     async fn flush_frozen_layer(
    3219              :         self: &Arc<Self>,
    3220              :         frozen_layer: Arc<InMemoryLayer>,
    3221              :         ctx: &RequestContext,
    3222              :     ) -> Result<(), FlushLayerError> {
    3223              :         debug_assert_current_span_has_tenant_and_timeline_id();
    3224              :         // As a special case, when we have just imported an image into the repository,
    3225              :         // instead of writing out a L0 delta layer, we directly write out image layer
    3226              :         // files instead. This is possible as long as *all* the data imported into the
    3227              :         // repository have the same LSN.
    3228              :         let lsn_range = frozen_layer.get_lsn_range();
    3229              :         let (layers_to_upload, delta_layer_to_add) =
    3230              :             if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
    3231              :                 #[cfg(test)]
    3232              :                 match &mut *self.flush_loop_state.lock().unwrap() {
    3233              :                     FlushLoopState::NotStarted | FlushLoopState::Exited => {
    3234              :                         panic!("flush loop not running")
    3235              :                     }
    3236              :                     FlushLoopState::Running {
    3237              :                         initdb_optimization_count,
    3238              :                         ..
    3239              :                     } => {
    3240              :                         *initdb_optimization_count += 1;
    3241              :                     }
    3242              :                 }
    3243              :                 // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
    3244              :                 // require downloading anything during initial import.
    3245              :                 let (partitioning, _lsn) = self
    3246              :                     .repartition(
    3247              :                         self.initdb_lsn,
    3248              :                         self.get_compaction_target_size(),
    3249              :                         EnumSet::empty(),
    3250              :                         ctx,
    3251              :                     )
    3252              :                     .await?;
    3253              : 
    3254              :                 if self.cancel.is_cancelled() {
    3255              :                     return Err(FlushLayerError::Cancelled);
    3256              :                 }
    3257              : 
    3258              :                 // For image layers, we add them immediately into the layer map.
    3259              :                 (
    3260              :                     self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
    3261              :                         .await?,
    3262              :                     None,
    3263              :                 )
    3264              :             } else {
    3265              :                 #[cfg(test)]
    3266              :                 match &mut *self.flush_loop_state.lock().unwrap() {
    3267              :                     FlushLoopState::NotStarted | FlushLoopState::Exited => {
    3268              :                         panic!("flush loop not running")
    3269              :                     }
    3270              :                     FlushLoopState::Running {
    3271              :                         expect_initdb_optimization,
    3272              :                         ..
    3273              :                     } => {
    3274              :                         assert!(!*expect_initdb_optimization, "expected initdb optimization");
    3275              :                     }
    3276              :                 }
    3277              :                 // Normal case, write out a L0 delta layer file.
    3278              :                 // `create_delta_layer` will not modify the layer map.
    3279              :                 // We will remove frozen layer and add delta layer in one atomic operation later.
    3280              :                 let layer = self.create_delta_layer(&frozen_layer, ctx).await?;
    3281              :                 (
    3282              :                     // FIXME: even though we have a single image and single delta layer assumption
    3283              :                     // we push them to vec
    3284              :                     vec![layer.clone()],
    3285              :                     Some(layer),
    3286              :                 )
    3287              :             };
    3288              : 
    3289          524 :         pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
    3290              : 
    3291              :         if self.cancel.is_cancelled() {
    3292              :             return Err(FlushLayerError::Cancelled);
    3293              :         }
    3294              : 
    3295              :         let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
    3296              :         let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
    3297              : 
    3298              :         // The new on-disk layers are now in the layer map. We can remove the
    3299              :         // in-memory layer from the map now. The flushed layer is stored in
    3300              :         // the mapping in `create_delta_layer`.
    3301              :         {
    3302              :             let mut guard = self.layers.write().await;
    3303              : 
    3304              :             if self.cancel.is_cancelled() {
    3305              :                 return Err(FlushLayerError::Cancelled);
    3306              :             }
    3307              : 
    3308              :             guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
    3309              : 
    3310              :             if disk_consistent_lsn != old_disk_consistent_lsn {
    3311              :                 assert!(disk_consistent_lsn > old_disk_consistent_lsn);
    3312              :                 self.disk_consistent_lsn.store(disk_consistent_lsn);
    3313              : 
    3314              :                 // Schedule remote uploads that will reflect our new disk_consistent_lsn
    3315              :                 self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?;
    3316              :             }
    3317              :             // release lock on 'layers'
    3318              :         };
    3319              : 
    3320              :         // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
    3321              :         // a compaction can delete the file and then it won't be available for uploads any more.
    3322              :         // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
    3323              :         // race situation.
    3324              :         // See https://github.com/neondatabase/neon/issues/4526
    3325          524 :         pausable_failpoint!("flush-frozen-pausable");
    3326              : 
    3327              :         // This failpoint is used by another test case `test_pageserver_recovery`.
    3328            0 :         fail_point!("flush-frozen-exit");
    3329              : 
    3330              :         Ok(())
    3331              :     }
    3332              : 
    3333              :     /// Update metadata file
    3334          524 :     fn schedule_uploads(
    3335          524 :         &self,
    3336          524 :         disk_consistent_lsn: Lsn,
    3337          524 :         layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
    3338          524 :     ) -> anyhow::Result<TimelineMetadata> {
    3339          524 :         // We can only save a valid 'prev_record_lsn' value on disk if we
    3340          524 :         // flushed *all* in-memory changes to disk. We only track
    3341          524 :         // 'prev_record_lsn' in memory for the latest processed record, so we
    3342          524 :         // don't remember what the correct value that corresponds to some old
    3343          524 :         // LSN is. But if we flush everything, then the value corresponding
    3344          524 :         // current 'last_record_lsn' is correct and we can store it on disk.
    3345          524 :         let RecordLsn {
    3346          524 :             last: last_record_lsn,
    3347          524 :             prev: prev_record_lsn,
    3348          524 :         } = self.last_record_lsn.load();
    3349          524 :         let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
    3350          524 :             Some(prev_record_lsn)
    3351              :         } else {
    3352            0 :             None
    3353              :         };
    3354              : 
    3355          524 :         let ancestor_timeline_id = self
    3356          524 :             .ancestor_timeline
    3357          524 :             .as_ref()
    3358          524 :             .map(|ancestor| ancestor.timeline_id);
    3359          524 : 
    3360          524 :         let metadata = TimelineMetadata::new(
    3361          524 :             disk_consistent_lsn,
    3362          524 :             ondisk_prev_record_lsn,
    3363          524 :             ancestor_timeline_id,
    3364          524 :             self.ancestor_lsn,
    3365          524 :             *self.latest_gc_cutoff_lsn.read(),
    3366          524 :             self.initdb_lsn,
    3367          524 :             self.pg_version,
    3368          524 :         );
    3369          524 : 
    3370          524 :         fail_point!("checkpoint-before-saving-metadata", |x| bail!(
    3371            0 :             "{}",
    3372            0 :             x.unwrap()
    3373          524 :         ));
    3374              : 
    3375          524 :         if let Some(remote_client) = &self.remote_client {
    3376         1048 :             for layer in layers_to_upload {
    3377          524 :                 remote_client.schedule_layer_file_upload(layer)?;
    3378              :             }
    3379          524 :             remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
    3380            0 :         }
    3381              : 
    3382          524 :         Ok(metadata)
    3383          524 :     }
    3384              : 
    3385            0 :     pub(crate) async fn preserve_initdb_archive(&self) -> anyhow::Result<()> {
    3386            0 :         if let Some(remote_client) = &self.remote_client {
    3387            0 :             remote_client
    3388            0 :                 .preserve_initdb_archive(
    3389            0 :                     &self.tenant_shard_id.tenant_id,
    3390            0 :                     &self.timeline_id,
    3391            0 :                     &self.cancel,
    3392            0 :                 )
    3393            0 :                 .await?;
    3394              :         } else {
    3395            0 :             bail!("No remote storage configured, but was asked to backup the initdb archive for {} / {}", self.tenant_shard_id.tenant_id, self.timeline_id);
    3396              :         }
    3397            0 :         Ok(())
    3398            0 :     }
    3399              : 
    3400              :     // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
    3401              :     // in layer map immediately. The caller is responsible to put it into the layer map.
    3402          450 :     async fn create_delta_layer(
    3403          450 :         self: &Arc<Self>,
    3404          450 :         frozen_layer: &Arc<InMemoryLayer>,
    3405          450 :         ctx: &RequestContext,
    3406          450 :     ) -> anyhow::Result<ResidentLayer> {
    3407          450 :         let span = tracing::info_span!("blocking");
    3408          450 :         let new_delta: ResidentLayer = tokio::task::spawn_blocking({
    3409          450 :             let self_clone = Arc::clone(self);
    3410          450 :             let frozen_layer = Arc::clone(frozen_layer);
    3411          450 :             let ctx = ctx.attached_child();
    3412          450 :             move || {
    3413          450 :                 // Write it out
    3414          450 :                 // Keep this inside `spawn_blocking` and `Handle::current`
    3415          450 :                 // as long as the write path is still sync and the read impl
    3416          450 :                 // is still not fully async. Otherwise executor threads would
    3417          450 :                 // be blocked.
    3418          450 :                 let _g = span.entered();
    3419          450 :                 let new_delta =
    3420          450 :                     Handle::current().block_on(frozen_layer.write_to_disk(&self_clone, &ctx))?;
    3421          450 :                 let new_delta_path = new_delta.local_path().to_owned();
    3422          450 : 
    3423          450 :                 // Sync it to disk.
    3424          450 :                 //
    3425          450 :                 // We must also fsync the timeline dir to ensure the directory entries for
    3426          450 :                 // new layer files are durable.
    3427          450 :                 //
    3428          450 :                 // NB: timeline dir must be synced _after_ the file contents are durable.
    3429          450 :                 // So, two separate fsyncs are required, they mustn't be batched.
    3430          450 :                 //
    3431          450 :                 // TODO: If we're running inside 'flush_frozen_layers' and there are multiple
    3432          450 :                 // files to flush, the fsync overhead can be reduces as follows:
    3433          450 :                 // 1. write them all to temporary file names
    3434          450 :                 // 2. fsync them
    3435          450 :                 // 3. rename to the final name
    3436          450 :                 // 4. fsync the parent directory.
    3437          450 :                 // Note that (1),(2),(3) today happen inside write_to_disk().
    3438          450 :                 //
    3439          450 :                 // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
    3440          450 :                 par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
    3441          450 :                 par_fsync::par_fsync(&[self_clone
    3442          450 :                     .conf
    3443          450 :                     .timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id)])
    3444          450 :                 .context("fsync of timeline dir")?;
    3445              : 
    3446          450 :                 anyhow::Ok(new_delta)
    3447          450 :             }
    3448          450 :         })
    3449          450 :         .await
    3450          450 :         .context("spawn_blocking")
    3451          450 :         .and_then(|x| x)?;
    3452              : 
    3453          450 :         Ok(new_delta)
    3454          450 :     }
    3455              : 
    3456          482 :     async fn repartition(
    3457          482 :         &self,
    3458          482 :         lsn: Lsn,
    3459          482 :         partition_size: u64,
    3460          482 :         flags: EnumSet<CompactFlags>,
    3461          482 :         ctx: &RequestContext,
    3462          482 :     ) -> anyhow::Result<(KeyPartitioning, Lsn)> {
    3463          482 :         let Ok(mut partitioning_guard) = self.partitioning.try_lock() else {
    3464              :             // NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline.
    3465              :             // The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
    3466              :             // and hence before the compaction task starts.
    3467            0 :             anyhow::bail!("repartition() called concurrently, this should not happen");
    3468              :         };
    3469          482 :         if lsn < partitioning_guard.1 {
    3470            0 :             anyhow::bail!("repartition() called with LSN going backwards, this should not happen");
    3471          482 :         }
    3472          482 : 
    3473          482 :         let distance = lsn.0 - partitioning_guard.1 .0;
    3474          482 :         if partitioning_guard.1 != Lsn(0)
    3475          308 :             && distance <= self.repartition_threshold
    3476          308 :             && !flags.contains(CompactFlags::ForceRepartition)
    3477              :         {
    3478            0 :             debug!(
    3479            0 :                 distance,
    3480            0 :                 threshold = self.repartition_threshold,
    3481            0 :                 "no repartitioning needed"
    3482            0 :             );
    3483          308 :             return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
    3484          174 :         }
    3485              : 
    3486        13313 :         let keyspace = self.collect_keyspace(lsn, ctx).await?;
    3487          174 :         let partitioning = keyspace.partition(partition_size);
    3488          174 : 
    3489          174 :         *partitioning_guard = (partitioning, lsn);
    3490          174 : 
    3491          174 :         Ok((partitioning_guard.0.clone(), partitioning_guard.1))
    3492          482 :     }
    3493              : 
    3494              :     // Is it time to create a new image layer for the given partition?
    3495          408 :     async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
    3496          408 :         let threshold = self.get_image_creation_threshold();
    3497              : 
    3498          408 :         let guard = self.layers.read().await;
    3499          408 :         let layers = guard.layer_map();
    3500          408 : 
    3501          408 :         let mut max_deltas = 0;
    3502         2856 :         for part_range in &partition.ranges {
    3503         2448 :             let image_coverage = layers.image_coverage(part_range, lsn);
    3504         5204 :             for (img_range, last_img) in image_coverage {
    3505         2756 :                 let img_lsn = if let Some(last_img) = last_img {
    3506         2156 :                     last_img.get_lsn_range().end
    3507              :                 } else {
    3508          600 :                     Lsn(0)
    3509              :                 };
    3510              :                 // Let's consider an example:
    3511              :                 //
    3512              :                 // delta layer with LSN range 71-81
    3513              :                 // delta layer with LSN range 81-91
    3514              :                 // delta layer with LSN range 91-101
    3515              :                 // image layer at LSN 100
    3516              :                 //
    3517              :                 // If 'lsn' is still 100, i.e. no new WAL has been processed since the last image layer,
    3518              :                 // there's no need to create a new one. We check this case explicitly, to avoid passing
    3519              :                 // a bogus range to count_deltas below, with start > end. It's even possible that there
    3520              :                 // are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
    3521              :                 // after we read last_record_lsn, which is passed here in the 'lsn' argument.
    3522         2756 :                 if img_lsn < lsn {
    3523          600 :                     let num_deltas =
    3524          600 :                         layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold));
    3525          600 : 
    3526          600 :                     max_deltas = max_deltas.max(num_deltas);
    3527          600 :                     if num_deltas >= threshold {
    3528            0 :                         debug!(
    3529            0 :                             "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
    3530            0 :                             img_range.start, img_range.end, num_deltas, img_lsn, lsn
    3531            0 :                         );
    3532            0 :                         return true;
    3533          600 :                     }
    3534         2156 :                 }
    3535              :             }
    3536              :         }
    3537              : 
    3538            0 :         debug!(
    3539            0 :             max_deltas,
    3540            0 :             "none of the partitioned ranges had >= {threshold} deltas"
    3541            0 :         );
    3542          408 :         false
    3543          408 :     }
    3544              : 
    3545          964 :     #[tracing::instrument(skip_all, fields(%lsn, %force))]
    3546              :     async fn create_image_layers(
    3547              :         self: &Arc<Timeline>,
    3548              :         partitioning: &KeyPartitioning,
    3549              :         lsn: Lsn,
    3550              :         force: bool,
    3551              :         ctx: &RequestContext,
    3552              :     ) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
    3553              :         let timer = self.metrics.create_images_time_histo.start_timer();
    3554              :         let mut image_layers = Vec::new();
    3555              : 
    3556              :         // We need to avoid holes between generated image layers.
    3557              :         // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
    3558              :         // image layer with hole between them. In this case such layer can not be utilized by GC.
    3559              :         //
    3560              :         // How such hole between partitions can appear?
    3561              :         // if we have relation with relid=1 and size 100 and relation with relid=2 with size 200 then result of
    3562              :         // KeySpace::partition may contain partitions <100000000..100000099> and <200000000..200000199>.
    3563              :         // If there is delta layer <100000000..300000000> then it never be garbage collected because
    3564              :         // image layers  <100000000..100000099> and <200000000..200000199> are not completely covering it.
    3565              :         let mut start = Key::MIN;
    3566              : 
    3567              :         for partition in partitioning.parts.iter() {
    3568              :             let img_range = start..partition.ranges.last().unwrap().end;
    3569              :             if !force && !self.time_for_new_image_layer(partition, lsn).await {
    3570              :                 start = img_range.end;
    3571              :                 continue;
    3572              :             }
    3573              : 
    3574              :             let mut image_layer_writer = ImageLayerWriter::new(
    3575              :                 self.conf,
    3576              :                 self.timeline_id,
    3577              :                 self.tenant_shard_id,
    3578              :                 &img_range,
    3579              :                 lsn,
    3580              :             )
    3581              :             .await?;
    3582              : 
    3583            0 :             fail_point!("image-layer-writer-fail-before-finish", |_| {
    3584            0 :                 Err(CreateImageLayersError::Other(anyhow::anyhow!(
    3585            0 :                     "failpoint image-layer-writer-fail-before-finish"
    3586            0 :                 )))
    3587            0 :             });
    3588              : 
    3589              :             let mut wrote_keys = false;
    3590              : 
    3591              :             let mut key_request_accum = KeySpaceAccum::new();
    3592              :             for range in &partition.ranges {
    3593              :                 let mut key = range.start;
    3594              :                 while key < range.end {
    3595              :                     // Decide whether to retain this key: usually we do, but sharded tenants may
    3596              :                     // need to drop keys that don't belong to them.  If we retain the key, add it
    3597              :                     // to `key_request_accum` for later issuing a vectored get
    3598              :                     if self.shard_identity.is_key_disposable(&key) {
    3599            0 :                         debug!(
    3600            0 :                             "Dropping key {} during compaction (it belongs on shard {:?})",
    3601            0 :                             key,
    3602            0 :                             self.shard_identity.get_shard_number(&key)
    3603            0 :                         );
    3604              :                     } else {
    3605              :                         key_request_accum.add_key(key);
    3606              :                     }
    3607              : 
    3608              :                     let last_key_in_range = key.next() == range.end;
    3609              :                     key = key.next();
    3610              : 
    3611              :                     // Maybe flush `key_rest_accum`
    3612              :                     if key_request_accum.size() >= Timeline::MAX_GET_VECTORED_KEYS
    3613              :                         || last_key_in_range
    3614              :                     {
    3615              :                         let results = self
    3616              :                             .get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
    3617              :                             .await?;
    3618              : 
    3619              :                         for (img_key, img) in results {
    3620              :                             let img = match img {
    3621              :                                 Ok(img) => img,
    3622              :                                 Err(err) => {
    3623              :                                     // If we fail to reconstruct a VM or FSM page, we can zero the
    3624              :                                     // page without losing any actual user data. That seems better
    3625              :                                     // than failing repeatedly and getting stuck.
    3626              :                                     //
    3627              :                                     // We had a bug at one point, where we truncated the FSM and VM
    3628              :                                     // in the pageserver, but the Postgres didn't know about that
    3629              :                                     // and continued to generate incremental WAL records for pages
    3630              :                                     // that didn't exist in the pageserver. Trying to replay those
    3631              :                                     // WAL records failed to find the previous image of the page.
    3632              :                                     // This special case allows us to recover from that situation.
    3633              :                                     // See https://github.com/neondatabase/neon/issues/2601.
    3634              :                                     //
    3635              :                                     // Unfortunately we cannot do this for the main fork, or for
    3636              :                                     // any metadata keys, keys, as that would lead to actual data
    3637              :                                     // loss.
    3638              :                                     if is_rel_fsm_block_key(img_key) || is_rel_vm_block_key(img_key)
    3639              :                                     {
    3640            0 :                                         warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
    3641              :                                         ZERO_PAGE.clone()
    3642              :                                     } else {
    3643              :                                         return Err(CreateImageLayersError::PageReconstructError(
    3644              :                                             err,
    3645              :                                         ));
    3646              :                                     }
    3647              :                                 }
    3648              :                             };
    3649              : 
    3650              :                             // Write all the keys we just read into our new image layer.
    3651              :                             image_layer_writer.put_image(img_key, img).await?;
    3652              :                             wrote_keys = true;
    3653              :                         }
    3654              :                     }
    3655              :                 }
    3656              :             }
    3657              : 
    3658              :             if wrote_keys {
    3659              :                 // Normal path: we have written some data into the new image layer for this
    3660              :                 // partition, so flush it to disk.
    3661              :                 start = img_range.end;
    3662              :                 let image_layer = image_layer_writer.finish(self).await?;
    3663              :                 image_layers.push(image_layer);
    3664              :             } else {
    3665              :                 // Special case: the image layer may be empty if this is a sharded tenant and the
    3666              :                 // partition does not cover any keys owned by this shard.  In this case, to ensure
    3667              :                 // we don't leave gaps between image layers, leave `start` where it is, so that the next
    3668              :                 // layer we write will cover the key range that we just scanned.
    3669            0 :                 tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
    3670              :             }
    3671              :         }
    3672              : 
    3673              :         // Sync the new layer to disk before adding it to the layer map, to make sure
    3674              :         // we don't garbage collect something based on the new layer, before it has
    3675              :         // reached the disk.
    3676              :         //
    3677              :         // We must also fsync the timeline dir to ensure the directory entries for
    3678              :         // new layer files are durable
    3679              :         //
    3680              :         // Compaction creates multiple image layers. It would be better to create them all
    3681              :         // and fsync them all in parallel.
    3682              :         let all_paths = image_layers
    3683              :             .iter()
    3684           74 :             .map(|layer| layer.local_path().to_owned())
    3685              :             .collect::<Vec<_>>();
    3686              : 
    3687              :         par_fsync::par_fsync_async(&all_paths)
    3688              :             .await
    3689              :             .context("fsync of newly created layer files")?;
    3690              : 
    3691              :         if !all_paths.is_empty() {
    3692              :             par_fsync::par_fsync_async(&[self
    3693              :                 .conf
    3694              :                 .timeline_path(&self.tenant_shard_id, &self.timeline_id)])
    3695              :             .await
    3696              :             .context("fsync of timeline dir")?;
    3697              :         }
    3698              : 
    3699              :         let mut guard = self.layers.write().await;
    3700              : 
    3701              :         // FIXME: we could add the images to be uploaded *before* returning from here, but right
    3702              :         // now they are being scheduled outside of write lock
    3703              :         guard.track_new_image_layers(&image_layers, &self.metrics);
    3704              :         drop_wlock(guard);
    3705              :         timer.stop_and_record();
    3706              : 
    3707              :         Ok(image_layers)
    3708              :     }
    3709              : 
    3710              :     /// Wait until the background initial logical size calculation is complete, or
    3711              :     /// this Timeline is shut down.  Calling this function will cause the initial
    3712              :     /// logical size calculation to skip waiting for the background jobs barrier.
    3713            0 :     pub(crate) async fn await_initial_logical_size(self: Arc<Self>) {
    3714            0 :         if let Some(await_bg_cancel) = self
    3715            0 :             .current_logical_size
    3716            0 :             .cancel_wait_for_background_loop_concurrency_limit_semaphore
    3717            0 :             .get()
    3718            0 :         {
    3719            0 :             await_bg_cancel.cancel();
    3720            0 :         } else {
    3721              :             // We should not wait if we were not able to explicitly instruct
    3722              :             // the logical size cancellation to skip the concurrency limit semaphore.
    3723              :             // TODO: this is an unexpected case.  We should restructure so that it
    3724              :             // can't happen.
    3725            0 :             tracing::info!(
    3726            0 :                 "await_initial_logical_size: can't get semaphore cancel token, skipping"
    3727            0 :             );
    3728              :         }
    3729              : 
    3730            0 :         tokio::select!(
    3731            0 :             _ = self.current_logical_size.initialized.acquire() => {},
    3732            0 :             _ = self.cancel.cancelled() => {}
    3733            0 :         )
    3734            0 :     }
    3735              : }
    3736              : 
    3737          378 : #[derive(Default)]
    3738              : struct CompactLevel0Phase1Result {
    3739              :     new_layers: Vec<ResidentLayer>,
    3740              :     deltas_to_compact: Vec<Layer>,
    3741              : }
    3742              : 
    3743              : /// Top-level failure to compact.
    3744            0 : #[derive(Debug, thiserror::Error)]
    3745              : pub(crate) enum CompactionError {
    3746              :     #[error("The timeline or pageserver is shutting down")]
    3747              :     ShuttingDown,
    3748              :     /// Compaction cannot be done right now; page reconstruction and so on.
    3749              :     #[error(transparent)]
    3750              :     Other(#[from] anyhow::Error),
    3751              : }
    3752              : 
    3753              : impl From<CollectKeySpaceError> for CompactionError {
    3754            0 :     fn from(err: CollectKeySpaceError) -> Self {
    3755            0 :         match err {
    3756              :             CollectKeySpaceError::Cancelled
    3757              :             | CollectKeySpaceError::PageRead(PageReconstructError::Cancelled) => {
    3758            0 :                 CompactionError::ShuttingDown
    3759              :             }
    3760            0 :             e => CompactionError::Other(e.into()),
    3761              :         }
    3762            0 :     }
    3763              : }
    3764              : 
    3765              : #[serde_as]
    3766          420 : #[derive(serde::Serialize)]
    3767              : struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
    3768              : 
    3769         2856 : #[derive(Default)]
    3770              : enum DurationRecorder {
    3771              :     #[default]
    3772              :     NotStarted,
    3773              :     Recorded(RecordedDuration, tokio::time::Instant),
    3774              : }
    3775              : 
    3776              : impl DurationRecorder {
    3777          558 :     fn till_now(&self) -> DurationRecorder {
    3778          558 :         match self {
    3779              :             DurationRecorder::NotStarted => {
    3780            0 :                 panic!("must only call on recorded measurements")
    3781              :             }
    3782          558 :             DurationRecorder::Recorded(_, ended) => {
    3783          558 :                 let now = tokio::time::Instant::now();
    3784          558 :                 DurationRecorder::Recorded(RecordedDuration(now - *ended), now)
    3785          558 :             }
    3786          558 :         }
    3787          558 :     }
    3788          210 :     fn into_recorded(self) -> Option<RecordedDuration> {
    3789          210 :         match self {
    3790            0 :             DurationRecorder::NotStarted => None,
    3791          210 :             DurationRecorder::Recorded(recorded, _) => Some(recorded),
    3792              :         }
    3793          210 :     }
    3794              : }
    3795              : 
    3796          408 : #[derive(Default)]
    3797              : struct CompactLevel0Phase1StatsBuilder {
    3798              :     version: Option<u64>,
    3799              :     tenant_id: Option<TenantShardId>,
    3800              :     timeline_id: Option<TimelineId>,
    3801              :     read_lock_acquisition_micros: DurationRecorder,
    3802              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    3803              :     read_lock_held_key_sort_micros: DurationRecorder,
    3804              :     read_lock_held_prerequisites_micros: DurationRecorder,
    3805              :     read_lock_held_compute_holes_micros: DurationRecorder,
    3806              :     read_lock_drop_micros: DurationRecorder,
    3807              :     write_layer_files_micros: DurationRecorder,
    3808              :     level0_deltas_count: Option<usize>,
    3809              :     new_deltas_count: Option<usize>,
    3810              :     new_deltas_size: Option<u64>,
    3811              : }
    3812              : 
    3813           30 : #[derive(serde::Serialize)]
    3814              : struct CompactLevel0Phase1Stats {
    3815              :     version: u64,
    3816              :     tenant_id: TenantShardId,
    3817              :     timeline_id: TimelineId,
    3818              :     read_lock_acquisition_micros: RecordedDuration,
    3819              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    3820              :     read_lock_held_key_sort_micros: RecordedDuration,
    3821              :     read_lock_held_prerequisites_micros: RecordedDuration,
    3822              :     read_lock_held_compute_holes_micros: RecordedDuration,
    3823              :     read_lock_drop_micros: RecordedDuration,
    3824              :     write_layer_files_micros: RecordedDuration,
    3825              :     level0_deltas_count: usize,
    3826              :     new_deltas_count: usize,
    3827              :     new_deltas_size: u64,
    3828              : }
    3829              : 
    3830              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    3831              :     type Error = anyhow::Error;
    3832              : 
    3833           30 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    3834           30 :         Ok(Self {
    3835           30 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    3836           30 :             tenant_id: value
    3837           30 :                 .tenant_id
    3838           30 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    3839           30 :             timeline_id: value
    3840           30 :                 .timeline_id
    3841           30 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    3842           30 :             read_lock_acquisition_micros: value
    3843           30 :                 .read_lock_acquisition_micros
    3844           30 :                 .into_recorded()
    3845           30 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    3846           30 :             read_lock_held_spawn_blocking_startup_micros: value
    3847           30 :                 .read_lock_held_spawn_blocking_startup_micros
    3848           30 :                 .into_recorded()
    3849           30 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    3850           30 :             read_lock_held_key_sort_micros: value
    3851           30 :                 .read_lock_held_key_sort_micros
    3852           30 :                 .into_recorded()
    3853           30 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    3854           30 :             read_lock_held_prerequisites_micros: value
    3855           30 :                 .read_lock_held_prerequisites_micros
    3856           30 :                 .into_recorded()
    3857           30 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    3858           30 :             read_lock_held_compute_holes_micros: value
    3859           30 :                 .read_lock_held_compute_holes_micros
    3860           30 :                 .into_recorded()
    3861           30 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    3862           30 :             read_lock_drop_micros: value
    3863           30 :                 .read_lock_drop_micros
    3864           30 :                 .into_recorded()
    3865           30 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    3866           30 :             write_layer_files_micros: value
    3867           30 :                 .write_layer_files_micros
    3868           30 :                 .into_recorded()
    3869           30 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    3870           30 :             level0_deltas_count: value
    3871           30 :                 .level0_deltas_count
    3872           30 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    3873           30 :             new_deltas_count: value
    3874           30 :                 .new_deltas_count
    3875           30 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    3876           30 :             new_deltas_size: value
    3877           30 :                 .new_deltas_size
    3878           30 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    3879              :         })
    3880           30 :     }
    3881              : }
    3882              : 
    3883              : impl Timeline {
    3884              :     /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
    3885          408 :     async fn compact_level0_phase1(
    3886          408 :         self: &Arc<Self>,
    3887          408 :         guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
    3888          408 :         mut stats: CompactLevel0Phase1StatsBuilder,
    3889          408 :         target_file_size: u64,
    3890          408 :         ctx: &RequestContext,
    3891          408 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
    3892          408 :         stats.read_lock_held_spawn_blocking_startup_micros =
    3893          408 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
    3894          408 :         let layers = guard.layer_map();
    3895          408 :         let level0_deltas = layers.get_level0_deltas()?;
    3896          408 :         let mut level0_deltas = level0_deltas
    3897          408 :             .into_iter()
    3898         1770 :             .map(|x| guard.get_from_desc(&x))
    3899          408 :             .collect_vec();
    3900          408 :         stats.level0_deltas_count = Some(level0_deltas.len());
    3901          408 :         // Only compact if enough layers have accumulated.
    3902          408 :         let threshold = self.get_compaction_threshold();
    3903          408 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
    3904            0 :             debug!(
    3905            0 :                 level0_deltas = level0_deltas.len(),
    3906            0 :                 threshold, "too few deltas to compact"
    3907            0 :             );
    3908          378 :             return Ok(CompactLevel0Phase1Result::default());
    3909           30 :         }
    3910           30 : 
    3911           30 :         // This failpoint is used together with `test_duplicate_layers` integration test.
    3912           30 :         // It returns the compaction result exactly the same layers as input to compaction.
    3913           30 :         // We want to ensure that this will not cause any problem when updating the layer map
    3914           30 :         // after the compaction is finished.
    3915           30 :         //
    3916           30 :         // Currently, there are two rare edge cases that will cause duplicated layers being
    3917           30 :         // inserted.
    3918           30 :         // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
    3919           30 :         //    is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
    3920           30 :         //    map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
    3921           30 :         //    point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
    3922           30 :         //    and this causes an overwrite. This is acceptable because the content is the same, and we should do a
    3923           30 :         //    layer replace instead of the normal remove / upload process.
    3924           30 :         // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
    3925           30 :         //    size length. Compaction will likely create the same set of n files afterwards.
    3926           30 :         //
    3927           30 :         // This failpoint is a superset of both of the cases.
    3928           30 :         if cfg!(feature = "testing") {
    3929           30 :             let active = (|| {
    3930           30 :                 ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
    3931           30 :                 false
    3932           30 :             })();
    3933           30 : 
    3934           30 :             if active {
    3935            0 :                 let mut new_layers = Vec::with_capacity(level0_deltas.len());
    3936            0 :                 for delta in &level0_deltas {
    3937              :                     // we are just faking these layers as being produced again for this failpoint
    3938            0 :                     new_layers.push(
    3939            0 :                         delta
    3940            0 :                             .download_and_keep_resident()
    3941            0 :                             .await
    3942            0 :                             .context("download layer for failpoint")?,
    3943              :                     );
    3944              :                 }
    3945            0 :                 tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
    3946            0 :                 return Ok(CompactLevel0Phase1Result {
    3947            0 :                     new_layers,
    3948            0 :                     deltas_to_compact: level0_deltas,
    3949            0 :                 });
    3950           30 :             }
    3951            0 :         }
    3952              : 
    3953              :         // Gather the files to compact in this iteration.
    3954              :         //
    3955              :         // Start with the oldest Level 0 delta file, and collect any other
    3956              :         // level 0 files that form a contiguous sequence, such that the end
    3957              :         // LSN of previous file matches the start LSN of the next file.
    3958              :         //
    3959              :         // Note that if the files don't form such a sequence, we might
    3960              :         // "compact" just a single file. That's a bit pointless, but it allows
    3961              :         // us to get rid of the level 0 file, and compact the other files on
    3962              :         // the next iteration. This could probably made smarter, but such
    3963              :         // "gaps" in the sequence of level 0 files should only happen in case
    3964              :         // of a crash, partial download from cloud storage, or something like
    3965              :         // that, so it's not a big deal in practice.
    3966          540 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
    3967           30 :         let mut level0_deltas_iter = level0_deltas.iter();
    3968           30 : 
    3969           30 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
    3970           30 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
    3971           30 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
    3972           30 : 
    3973           30 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
    3974          300 :         for l in level0_deltas_iter {
    3975          270 :             let lsn_range = &l.layer_desc().lsn_range;
    3976          270 : 
    3977          270 :             if lsn_range.start != prev_lsn_end {
    3978            0 :                 break;
    3979          270 :             }
    3980          270 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
    3981          270 :             prev_lsn_end = lsn_range.end;
    3982              :         }
    3983           30 :         let lsn_range = Range {
    3984           30 :             start: deltas_to_compact
    3985           30 :                 .first()
    3986           30 :                 .unwrap()
    3987           30 :                 .layer_desc()
    3988           30 :                 .lsn_range
    3989           30 :                 .start,
    3990           30 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
    3991           30 :         };
    3992              : 
    3993           30 :         info!(
    3994           30 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
    3995           30 :             lsn_range.start,
    3996           30 :             lsn_range.end,
    3997           30 :             deltas_to_compact.len(),
    3998           30 :             level0_deltas.len()
    3999           30 :         );
    4000              : 
    4001          300 :         for l in deltas_to_compact.iter() {
    4002          300 :             info!("compact includes {l}");
    4003              :         }
    4004              : 
    4005              :         // We don't need the original list of layers anymore. Drop it so that
    4006              :         // we don't accidentally use it later in the function.
    4007           30 :         drop(level0_deltas);
    4008           30 : 
    4009           30 :         stats.read_lock_held_prerequisites_micros = stats
    4010           30 :             .read_lock_held_spawn_blocking_startup_micros
    4011           30 :             .till_now();
    4012           30 : 
    4013           30 :         // Determine N largest holes where N is number of compacted layers.
    4014           30 :         let max_holes = deltas_to_compact.len();
    4015           30 :         let last_record_lsn = self.get_last_record_lsn();
    4016           30 :         let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
    4017           30 :         let min_hole_coverage_size = 3; // TODO: something more flexible?
    4018           30 : 
    4019           30 :         // min-heap (reserve space for one more element added before eviction)
    4020           30 :         let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
    4021           30 :         let mut prev: Option<Key> = None;
    4022           30 : 
    4023           30 :         let mut all_keys = Vec::new();
    4024              : 
    4025          300 :         for l in deltas_to_compact.iter() {
    4026         2407 :             all_keys.extend(l.load_keys(ctx).await?);
    4027              :         }
    4028              : 
    4029              :         // FIXME: should spawn_blocking the rest of this function
    4030              : 
    4031              :         // The current stdlib sorting implementation is designed in a way where it is
    4032              :         // particularly fast where the slice is made up of sorted sub-ranges.
    4033      4931312 :         all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    4034           30 : 
    4035           30 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
    4036              : 
    4037      2102000 :         for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
    4038      2102000 :             if let Some(prev_key) = prev {
    4039              :                 // just first fast filter
    4040      2101970 :                 if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
    4041            0 :                     let key_range = prev_key..next_key;
    4042            0 :                     // Measuring hole by just subtraction of i128 representation of key range boundaries
    4043            0 :                     // has not so much sense, because largest holes will corresponds field1/field2 changes.
    4044            0 :                     // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
    4045            0 :                     // That is why it is better to measure size of hole as number of covering image layers.
    4046            0 :                     let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
    4047            0 :                     if coverage_size >= min_hole_coverage_size {
    4048            0 :                         heap.push(Hole {
    4049            0 :                             key_range,
    4050            0 :                             coverage_size,
    4051            0 :                         });
    4052            0 :                         if heap.len() > max_holes {
    4053            0 :                             heap.pop(); // remove smallest hole
    4054            0 :                         }
    4055            0 :                     }
    4056      2101970 :                 }
    4057           30 :             }
    4058      2102000 :             prev = Some(next_key.next());
    4059              :         }
    4060           30 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
    4061           30 :         drop_rlock(guard);
    4062           30 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
    4063           30 :         let mut holes = heap.into_vec();
    4064           30 :         holes.sort_unstable_by_key(|hole| hole.key_range.start);
    4065           30 :         let mut next_hole = 0; // index of next hole in holes vector
    4066           30 : 
    4067           30 :         // This iterator walks through all key-value pairs from all the layers
    4068           30 :         // we're compacting, in key, LSN order.
    4069           30 :         let all_values_iter = all_keys.iter();
    4070           30 : 
    4071           30 :         // This iterator walks through all keys and is needed to calculate size used by each key
    4072           30 :         let mut all_keys_iter = all_keys
    4073           30 :             .iter()
    4074      2102000 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
    4075      2101970 :             .coalesce(|mut prev, cur| {
    4076      2101970 :                 // Coalesce keys that belong to the same key pair.
    4077      2101970 :                 // This ensures that compaction doesn't put them
    4078      2101970 :                 // into different layer files.
    4079      2101970 :                 // Still limit this by the target file size,
    4080      2101970 :                 // so that we keep the size of the files in
    4081      2101970 :                 // check.
    4082      2101970 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
    4083        92000 :                     prev.2 += cur.2;
    4084        92000 :                     Ok(prev)
    4085              :                 } else {
    4086      2009970 :                     Err((prev, cur))
    4087              :                 }
    4088      2101970 :             });
    4089           30 : 
    4090           30 :         // Merge the contents of all the input delta layers into a new set
    4091           30 :         // of delta layers, based on the current partitioning.
    4092           30 :         //
    4093           30 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
    4094           30 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
    4095           30 :         // would be too large. In that case, we also split on the LSN dimension.
    4096           30 :         //
    4097           30 :         // LSN
    4098           30 :         //  ^
    4099           30 :         //  |
    4100           30 :         //  | +-----------+            +--+--+--+--+
    4101           30 :         //  | |           |            |  |  |  |  |
    4102           30 :         //  | +-----------+            |  |  |  |  |
    4103           30 :         //  | |           |            |  |  |  |  |
    4104           30 :         //  | +-----------+     ==>    |  |  |  |  |
    4105           30 :         //  | |           |            |  |  |  |  |
    4106           30 :         //  | +-----------+            |  |  |  |  |
    4107           30 :         //  | |           |            |  |  |  |  |
    4108           30 :         //  | +-----------+            +--+--+--+--+
    4109           30 :         //  |
    4110           30 :         //  +--------------> key
    4111           30 :         //
    4112           30 :         //
    4113           30 :         // If one key (X) has a lot of page versions:
    4114           30 :         //
    4115           30 :         // LSN
    4116           30 :         //  ^
    4117           30 :         //  |                                 (X)
    4118           30 :         //  | +-----------+            +--+--+--+--+
    4119           30 :         //  | |           |            |  |  |  |  |
    4120           30 :         //  | +-----------+            |  |  +--+  |
    4121           30 :         //  | |           |            |  |  |  |  |
    4122           30 :         //  | +-----------+     ==>    |  |  |  |  |
    4123           30 :         //  | |           |            |  |  +--+  |
    4124           30 :         //  | +-----------+            |  |  |  |  |
    4125           30 :         //  | |           |            |  |  |  |  |
    4126           30 :         //  | +-----------+            +--+--+--+--+
    4127           30 :         //  |
    4128           30 :         //  +--------------> key
    4129           30 :         // TODO: this actually divides the layers into fixed-size chunks, not
    4130           30 :         // based on the partitioning.
    4131           30 :         //
    4132           30 :         // TODO: we should also opportunistically materialize and
    4133           30 :         // garbage collect what we can.
    4134           30 :         let mut new_layers = Vec::new();
    4135           30 :         let mut prev_key: Option<Key> = None;
    4136           30 :         let mut writer: Option<DeltaLayerWriter> = None;
    4137           30 :         let mut key_values_total_size = 0u64;
    4138           30 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
    4139           30 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
    4140              : 
    4141              :         for &DeltaEntry {
    4142      2102000 :             key, lsn, ref val, ..
    4143      2102030 :         } in all_values_iter
    4144              :         {
    4145      2102000 :             let value = val.load(ctx).await?;
    4146      2102000 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
    4147      2102000 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    4148      2102000 :             if !same_key || lsn == dup_end_lsn {
    4149      2010000 :                 let mut next_key_size = 0u64;
    4150      2010000 :                 let is_dup_layer = dup_end_lsn.is_valid();
    4151      2010000 :                 dup_start_lsn = Lsn::INVALID;
    4152      2010000 :                 if !same_key {
    4153      2010000 :                     dup_end_lsn = Lsn::INVALID;
    4154      2010000 :                 }
    4155              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    4156      2010000 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    4157      2010000 :                     next_key_size = next_size;
    4158      2010000 :                     if key != next_key {
    4159      2009970 :                         if dup_end_lsn.is_valid() {
    4160            0 :                             // We are writting segment with duplicates:
    4161            0 :                             // place all remaining values of this key in separate segment
    4162            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    4163            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    4164      2009970 :                         }
    4165      2009970 :                         break;
    4166           30 :                     }
    4167           30 :                     key_values_total_size += next_size;
    4168           30 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    4169           30 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    4170           30 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    4171              :                         // Split key between multiple layers: such layer can contain only single key
    4172            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    4173            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    4174              :                         } else {
    4175            0 :                             lsn // start with the first LSN for this key
    4176              :                         };
    4177            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    4178            0 :                         break;
    4179           30 :                     }
    4180              :                 }
    4181              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    4182      2010000 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    4183            0 :                     dup_start_lsn = dup_end_lsn;
    4184            0 :                     dup_end_lsn = lsn_range.end;
    4185      2010000 :                 }
    4186      2010000 :                 if writer.is_some() {
    4187      2009970 :                     let written_size = writer.as_mut().unwrap().size();
    4188      2009970 :                     let contains_hole =
    4189      2009970 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    4190              :                     // check if key cause layer overflow or contains hole...
    4191      2009970 :                     if is_dup_layer
    4192      2009970 :                         || dup_end_lsn.is_valid()
    4193      2009970 :                         || written_size + key_values_total_size > target_file_size
    4194      2009970 :                         || contains_hole
    4195              :                     {
    4196              :                         // ... if so, flush previous layer and prepare to write new one
    4197            0 :                         new_layers.push(
    4198            0 :                             writer
    4199            0 :                                 .take()
    4200            0 :                                 .unwrap()
    4201            0 :                                 .finish(prev_key.unwrap().next(), self)
    4202            0 :                                 .await?,
    4203              :                         );
    4204            0 :                         writer = None;
    4205            0 : 
    4206            0 :                         if contains_hole {
    4207            0 :                             // skip hole
    4208            0 :                             next_hole += 1;
    4209            0 :                         }
    4210      2009970 :                     }
    4211           30 :                 }
    4212              :                 // Remember size of key value because at next iteration we will access next item
    4213      2010000 :                 key_values_total_size = next_key_size;
    4214        92000 :             }
    4215      2102000 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    4216            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    4217            0 :                     "failpoint delta-layer-writer-fail-before-finish"
    4218            0 :                 )))
    4219      2102000 :             });
    4220              : 
    4221      2102000 :             if !self.shard_identity.is_key_disposable(&key) {
    4222      2102000 :                 if writer.is_none() {
    4223           30 :                     // Create writer if not initiaized yet
    4224           30 :                     writer = Some(
    4225              :                         DeltaLayerWriter::new(
    4226           30 :                             self.conf,
    4227           30 :                             self.timeline_id,
    4228           30 :                             self.tenant_shard_id,
    4229           30 :                             key,
    4230           30 :                             if dup_end_lsn.is_valid() {
    4231              :                                 // this is a layer containing slice of values of the same key
    4232            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    4233            0 :                                 dup_start_lsn..dup_end_lsn
    4234              :                             } else {
    4235            0 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    4236           30 :                                 lsn_range.clone()
    4237              :                             },
    4238              :                         )
    4239           15 :                         .await?,
    4240              :                     );
    4241      2101970 :                 }
    4242              : 
    4243      2102000 :                 writer.as_mut().unwrap().put_value(key, lsn, value).await?;
    4244              :             } else {
    4245            0 :                 debug!(
    4246            0 :                     "Dropping key {} during compaction (it belongs on shard {:?})",
    4247            0 :                     key,
    4248            0 :                     self.shard_identity.get_shard_number(&key)
    4249            0 :                 );
    4250              :             }
    4251              : 
    4252      2102000 :             if !new_layers.is_empty() {
    4253            0 :                 fail_point!("after-timeline-compacted-first-L1");
    4254      2102000 :             }
    4255              : 
    4256      2102000 :             prev_key = Some(key);
    4257              :         }
    4258           30 :         if let Some(writer) = writer {
    4259           61 :             new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
    4260            0 :         }
    4261              : 
    4262              :         // Sync layers
    4263           30 :         if !new_layers.is_empty() {
    4264              :             // Print a warning if the created layer is larger than double the target size
    4265              :             // Add two pages for potential overhead. This should in theory be already
    4266              :             // accounted for in the target calculation, but for very small targets,
    4267              :             // we still might easily hit the limit otherwise.
    4268           30 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    4269           30 :             for layer in new_layers.iter() {
    4270           30 :                 if layer.layer_desc().file_size > warn_limit {
    4271            0 :                     warn!(
    4272            0 :                         %layer,
    4273            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    4274            0 :                     );
    4275           30 :                 }
    4276              :             }
    4277              : 
    4278              :             // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
    4279           30 :             let layer_paths: Vec<Utf8PathBuf> = new_layers
    4280           30 :                 .iter()
    4281           30 :                 .map(|l| l.local_path().to_owned())
    4282           30 :                 .collect();
    4283           30 : 
    4284           30 :             // Fsync all the layer files and directory using multiple threads to
    4285           30 :             // minimize latency.
    4286           30 :             par_fsync::par_fsync_async(&layer_paths)
    4287           30 :                 .await
    4288           30 :                 .context("fsync all new layers")?;
    4289              : 
    4290           30 :             let timeline_dir = self
    4291           30 :                 .conf
    4292           30 :                 .timeline_path(&self.tenant_shard_id, &self.timeline_id);
    4293           30 : 
    4294           30 :             par_fsync::par_fsync_async(&[timeline_dir])
    4295           30 :                 .await
    4296           30 :                 .context("fsync of timeline dir")?;
    4297            0 :         }
    4298              : 
    4299           30 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    4300           30 :         stats.new_deltas_count = Some(new_layers.len());
    4301           30 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    4302           30 : 
    4303           30 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    4304           30 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    4305              :         {
    4306           30 :             Ok(stats_json) => {
    4307           30 :                 info!(
    4308           30 :                     stats_json = stats_json.as_str(),
    4309           30 :                     "compact_level0_phase1 stats available"
    4310           30 :                 )
    4311              :             }
    4312            0 :             Err(e) => {
    4313            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    4314              :             }
    4315              :         }
    4316              : 
    4317           30 :         Ok(CompactLevel0Phase1Result {
    4318           30 :             new_layers,
    4319           30 :             deltas_to_compact: deltas_to_compact
    4320           30 :                 .into_iter()
    4321          300 :                 .map(|x| x.drop_eviction_guard())
    4322           30 :                 .collect::<Vec<_>>(),
    4323           30 :         })
    4324          408 :     }
    4325              : 
    4326              :     ///
    4327              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
    4328              :     /// as Level 1 files.
    4329              :     ///
    4330          408 :     async fn compact_level0(
    4331          408 :         self: &Arc<Self>,
    4332          408 :         target_file_size: u64,
    4333          408 :         ctx: &RequestContext,
    4334          408 :     ) -> Result<(), CompactionError> {
    4335              :         let CompactLevel0Phase1Result {
    4336          408 :             new_layers,
    4337          408 :             deltas_to_compact,
    4338              :         } = {
    4339          408 :             let phase1_span = info_span!("compact_level0_phase1");
    4340          408 :             let ctx = ctx.attached_child();
    4341          408 :             let mut stats = CompactLevel0Phase1StatsBuilder {
    4342          408 :                 version: Some(2),
    4343          408 :                 tenant_id: Some(self.tenant_shard_id),
    4344          408 :                 timeline_id: Some(self.timeline_id),
    4345          408 :                 ..Default::default()
    4346          408 :             };
    4347          408 : 
    4348          408 :             let begin = tokio::time::Instant::now();
    4349          408 :             let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
    4350          408 :             let now = tokio::time::Instant::now();
    4351          408 :             stats.read_lock_acquisition_micros =
    4352          408 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
    4353          408 :             self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
    4354          408 :                 .instrument(phase1_span)
    4355        39560 :                 .await?
    4356              :         };
    4357              : 
    4358          408 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
    4359              :             // nothing to do
    4360          378 :             return Ok(());
    4361           30 :         }
    4362           30 : 
    4363           30 :         self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
    4364            0 :             .await?;
    4365           30 :         Ok(())
    4366          408 :     }
    4367              : 
    4368           30 :     async fn finish_compact_batch(
    4369           30 :         self: &Arc<Self>,
    4370           30 :         new_deltas: &[ResidentLayer],
    4371           30 :         new_images: &[ResidentLayer],
    4372           30 :         layers_to_remove: &[Layer],
    4373           30 :     ) -> anyhow::Result<()> {
    4374           30 :         let mut guard = self.layers.write().await;
    4375              : 
    4376           30 :         let mut duplicated_layers = HashSet::new();
    4377           30 : 
    4378           30 :         let mut insert_layers = Vec::with_capacity(new_deltas.len());
    4379              : 
    4380           60 :         for l in new_deltas {
    4381           30 :             if guard.contains(l.as_ref()) {
    4382              :                 // expected in tests
    4383            0 :                 tracing::error!(layer=%l, "duplicated L1 layer");
    4384              : 
    4385              :                 // good ways to cause a duplicate: we repeatedly error after taking the writelock
    4386              :                 // `guard`  on self.layers. as of writing this, there are no error returns except
    4387              :                 // for compact_level0_phase1 creating an L0, which does not happen in practice
    4388              :                 // because we have not implemented L0 => L0 compaction.
    4389            0 :                 duplicated_layers.insert(l.layer_desc().key());
    4390           30 :             } else if LayerMap::is_l0(l.layer_desc()) {
    4391            0 :                 bail!("compaction generates a L0 layer file as output, which will cause infinite compaction.");
    4392           30 :             } else {
    4393           30 :                 insert_layers.push(l.clone());
    4394           30 :             }
    4395              :         }
    4396              : 
    4397              :         // only remove those inputs which were not outputs
    4398           30 :         let remove_layers: Vec<Layer> = layers_to_remove
    4399           30 :             .iter()
    4400          300 :             .filter(|l| !duplicated_layers.contains(&l.layer_desc().key()))
    4401           30 :             .cloned()
    4402           30 :             .collect();
    4403           30 : 
    4404           30 :         if !new_images.is_empty() {
    4405            0 :             guard.track_new_image_layers(new_images, &self.metrics);
    4406           30 :         }
    4407              : 
    4408              :         // deletion will happen later, the layer file manager calls garbage_collect_on_drop
    4409           30 :         guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
    4410              : 
    4411           30 :         if let Some(remote_client) = self.remote_client.as_ref() {
    4412           30 :             remote_client.schedule_compaction_update(&remove_layers, new_deltas)?;
    4413            0 :         }
    4414              : 
    4415           30 :         drop_wlock(guard);
    4416           30 : 
    4417           30 :         Ok(())
    4418           30 :     }
    4419              : 
    4420              :     /// Update information about which layer files need to be retained on
    4421              :     /// garbage collection. This is separate from actually performing the GC,
    4422              :     /// and is updated more frequently, so that compaction can remove obsolete
    4423              :     /// page versions more aggressively.
    4424              :     ///
    4425              :     /// TODO: that's wishful thinking, compaction doesn't actually do that
    4426              :     /// currently.
    4427              :     ///
    4428              :     /// The caller specifies how much history is needed with the 3 arguments:
    4429              :     ///
    4430              :     /// retain_lsns: keep a version of each page at these LSNs
    4431              :     /// cutoff_horizon: also keep everything newer than this LSN
    4432              :     /// pitr: the time duration required to keep data for PITR
    4433              :     ///
    4434              :     /// The 'retain_lsns' list is currently used to prevent removing files that
    4435              :     /// are needed by child timelines. In the future, the user might be able to
    4436              :     /// name additional points in time to retain. The caller is responsible for
    4437              :     /// collecting that information.
    4438              :     ///
    4439              :     /// The 'cutoff_horizon' point is used to retain recent versions that might still be
    4440              :     /// needed by read-only nodes. (As of this writing, the caller just passes
    4441              :     /// the latest LSN subtracted by a constant, and doesn't do anything smart
    4442              :     /// to figure out what read-only nodes might actually need.)
    4443              :     ///
    4444              :     /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
    4445              :     /// whether a record is needed for PITR.
    4446              :     ///
    4447              :     /// NOTE: This function holds a short-lived lock to protect the 'gc_info'
    4448              :     /// field, so that the three values passed as argument are stored
    4449              :     /// atomically. But the caller is responsible for ensuring that no new
    4450              :     /// branches are created that would need to be included in 'retain_lsns',
    4451              :     /// for example. The caller should hold `Tenant::gc_cs` lock to ensure
    4452              :     /// that.
    4453              :     ///
    4454          816 :     #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
    4455              :     pub(super) async fn update_gc_info(
    4456              :         &self,
    4457              :         retain_lsns: Vec<Lsn>,
    4458              :         cutoff_horizon: Lsn,
    4459              :         pitr: Duration,
    4460              :         cancel: &CancellationToken,
    4461              :         ctx: &RequestContext,
    4462              :     ) -> anyhow::Result<()> {
    4463              :         // First, calculate pitr_cutoff_timestamp and then convert it to LSN.
    4464              :         //
    4465              :         // Some unit tests depend on garbage-collection working even when
    4466              :         // CLOG data is missing, so that find_lsn_for_timestamp() doesn't
    4467              :         // work, so avoid calling it altogether if time-based retention is not
    4468              :         // configured. It would be pointless anyway.
    4469              :         let pitr_cutoff = if pitr != Duration::ZERO {
    4470              :             let now = SystemTime::now();
    4471              :             if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
    4472              :                 let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
    4473              : 
    4474              :                 match self
    4475              :                     .find_lsn_for_timestamp(pitr_timestamp, cancel, ctx)
    4476              :                     .await?
    4477              :                 {
    4478              :                     LsnForTimestamp::Present(lsn) => lsn,
    4479              :                     LsnForTimestamp::Future(lsn) => {
    4480              :                         // The timestamp is in the future. That sounds impossible,
    4481              :                         // but what it really means is that there hasn't been
    4482              :                         // any commits since the cutoff timestamp.
    4483            0 :                         debug!("future({})", lsn);
    4484              :                         cutoff_horizon
    4485              :                     }
    4486              :                     LsnForTimestamp::Past(lsn) => {
    4487            0 :                         debug!("past({})", lsn);
    4488              :                         // conservative, safe default is to remove nothing, when we
    4489              :                         // have no commit timestamp data available
    4490              :                         *self.get_latest_gc_cutoff_lsn()
    4491              :                     }
    4492              :                     LsnForTimestamp::NoData(lsn) => {
    4493            0 :                         debug!("nodata({})", lsn);
    4494              :                         // conservative, safe default is to remove nothing, when we
    4495              :                         // have no commit timestamp data available
    4496              :                         *self.get_latest_gc_cutoff_lsn()
    4497              :                     }
    4498              :                 }
    4499              :             } else {
    4500              :                 // If we don't have enough data to convert to LSN,
    4501              :                 // play safe and don't remove any layers.
    4502              :                 *self.get_latest_gc_cutoff_lsn()
    4503              :             }
    4504              :         } else {
    4505              :             // No time-based retention was configured. Set time-based cutoff to
    4506              :             // same as LSN based.
    4507              :             cutoff_horizon
    4508              :         };
    4509              : 
    4510              :         // Grab the lock and update the values
    4511              :         *self.gc_info.write().unwrap() = GcInfo {
    4512              :             retain_lsns,
    4513              :             horizon_cutoff: cutoff_horizon,
    4514              :             pitr_cutoff,
    4515              :         };
    4516              : 
    4517              :         Ok(())
    4518              :     }
    4519              : 
    4520              :     /// Garbage collect layer files on a timeline that are no longer needed.
    4521              :     ///
    4522              :     /// Currently, we don't make any attempt at removing unneeded page versions
    4523              :     /// within a layer file. We can only remove the whole file if it's fully
    4524              :     /// obsolete.
    4525          408 :     pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
    4526          408 :         // this is most likely the background tasks, but it might be the spawned task from
    4527          408 :         // immediate_gc
    4528          408 :         let cancel = crate::task_mgr::shutdown_token();
    4529          408 :         let _g = tokio::select! {
    4530          408 :             guard = self.gc_lock.lock() => guard,
    4531              :             _ = self.cancel.cancelled() => return Ok(GcResult::default()),
    4532              :             _ = cancel.cancelled() => return Ok(GcResult::default()),
    4533              :         };
    4534          408 :         let timer = self.metrics.garbage_collect_histo.start_timer();
    4535              : 
    4536            0 :         fail_point!("before-timeline-gc");
    4537              : 
    4538              :         // Is the timeline being deleted?
    4539          408 :         if self.is_stopping() {
    4540            0 :             anyhow::bail!("timeline is Stopping");
    4541          408 :         }
    4542          408 : 
    4543          408 :         let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
    4544          408 :             let gc_info = self.gc_info.read().unwrap();
    4545          408 : 
    4546          408 :             let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
    4547          408 :             let pitr_cutoff = gc_info.pitr_cutoff;
    4548          408 :             let retain_lsns = gc_info.retain_lsns.clone();
    4549          408 :             (horizon_cutoff, pitr_cutoff, retain_lsns)
    4550          408 :         };
    4551          408 : 
    4552          408 :         let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
    4553              : 
    4554          408 :         let res = self
    4555          408 :             .gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
    4556          408 :             .instrument(
    4557          408 :                 info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
    4558              :             )
    4559            1 :             .await?;
    4560              : 
    4561              :         // only record successes
    4562          408 :         timer.stop_and_record();
    4563          408 : 
    4564          408 :         Ok(res)
    4565          408 :     }
    4566              : 
    4567          408 :     async fn gc_timeline(
    4568          408 :         &self,
    4569          408 :         horizon_cutoff: Lsn,
    4570          408 :         pitr_cutoff: Lsn,
    4571          408 :         retain_lsns: Vec<Lsn>,
    4572          408 :         new_gc_cutoff: Lsn,
    4573          408 :     ) -> anyhow::Result<GcResult> {
    4574          408 :         let now = SystemTime::now();
    4575          408 :         let mut result: GcResult = GcResult::default();
    4576          408 : 
    4577          408 :         // Nothing to GC. Return early.
    4578          408 :         let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
    4579          408 :         if latest_gc_cutoff >= new_gc_cutoff {
    4580            0 :             info!(
    4581            0 :                 "Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
    4582            0 :             );
    4583            0 :             return Ok(result);
    4584          408 :         }
    4585              : 
    4586              :         // We need to ensure that no one tries to read page versions or create
    4587              :         // branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
    4588              :         // for details. This will block until the old value is no longer in use.
    4589              :         //
    4590              :         // The GC cutoff should only ever move forwards.
    4591          408 :         let waitlist = {
    4592          408 :             let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
    4593          408 :             ensure!(
    4594          408 :                 *write_guard <= new_gc_cutoff,
    4595            0 :                 "Cannot move GC cutoff LSN backwards (was {}, new {})",
    4596            0 :                 *write_guard,
    4597              :                 new_gc_cutoff
    4598              :             );
    4599          408 :             write_guard.store_and_unlock(new_gc_cutoff)
    4600          408 :         };
    4601          408 :         waitlist.wait().await;
    4602              : 
    4603          408 :         info!("GC starting");
    4604              : 
    4605            0 :         debug!("retain_lsns: {:?}", retain_lsns);
    4606              : 
    4607          408 :         let mut layers_to_remove = Vec::new();
    4608              : 
    4609              :         // Scan all layers in the timeline (remote or on-disk).
    4610              :         //
    4611              :         // Garbage collect the layer if all conditions are satisfied:
    4612              :         // 1. it is older than cutoff LSN;
    4613              :         // 2. it is older than PITR interval;
    4614              :         // 3. it doesn't need to be retained for 'retain_lsns';
    4615              :         // 4. newer on-disk image layers cover the layer's whole key range
    4616              :         //
    4617              :         // TODO holding a write lock is too agressive and avoidable
    4618          408 :         let mut guard = self.layers.write().await;
    4619          408 :         let layers = guard.layer_map();
    4620         2404 :         'outer: for l in layers.iter_historic_layers() {
    4621         2404 :             result.layers_total += 1;
    4622         2404 : 
    4623         2404 :             // 1. Is it newer than GC horizon cutoff point?
    4624         2404 :             if l.get_lsn_range().end > horizon_cutoff {
    4625            0 :                 debug!(
    4626            0 :                     "keeping {} because it's newer than horizon_cutoff {}",
    4627            0 :                     l.filename(),
    4628            0 :                     horizon_cutoff,
    4629            0 :                 );
    4630          408 :                 result.layers_needed_by_cutoff += 1;
    4631          408 :                 continue 'outer;
    4632         1996 :             }
    4633         1996 : 
    4634         1996 :             // 2. It is newer than PiTR cutoff point?
    4635         1996 :             if l.get_lsn_range().end > pitr_cutoff {
    4636            0 :                 debug!(
    4637            0 :                     "keeping {} because it's newer than pitr_cutoff {}",
    4638            0 :                     l.filename(),
    4639            0 :                     pitr_cutoff,
    4640            0 :                 );
    4641            0 :                 result.layers_needed_by_pitr += 1;
    4642            0 :                 continue 'outer;
    4643         1996 :             }
    4644              : 
    4645              :             // 3. Is it needed by a child branch?
    4646              :             // NOTE With that we would keep data that
    4647              :             // might be referenced by child branches forever.
    4648              :             // We can track this in child timeline GC and delete parent layers when
    4649              :             // they are no longer needed. This might be complicated with long inheritance chains.
    4650              :             //
    4651              :             // TODO Vec is not a great choice for `retain_lsns`
    4652         1996 :             for retain_lsn in &retain_lsns {
    4653              :                 // start_lsn is inclusive
    4654           12 :                 if &l.get_lsn_range().start <= retain_lsn {
    4655            0 :                     debug!(
    4656            0 :                         "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
    4657            0 :                         l.filename(),
    4658            0 :                         retain_lsn,
    4659            0 :                         l.is_incremental(),
    4660            0 :                     );
    4661           12 :                     result.layers_needed_by_branches += 1;
    4662           12 :                     continue 'outer;
    4663            0 :                 }
    4664              :             }
    4665              : 
    4666              :             // 4. Is there a later on-disk layer for this relation?
    4667              :             //
    4668              :             // The end-LSN is exclusive, while disk_consistent_lsn is
    4669              :             // inclusive. For example, if disk_consistent_lsn is 100, it is
    4670              :             // OK for a delta layer to have end LSN 101, but if the end LSN
    4671              :             // is 102, then it might not have been fully flushed to disk
    4672              :             // before crash.
    4673              :             //
    4674              :             // For example, imagine that the following layers exist:
    4675              :             //
    4676              :             // 1000      - image (A)
    4677              :             // 1000-2000 - delta (B)
    4678              :             // 2000      - image (C)
    4679              :             // 2000-3000 - delta (D)
    4680              :             // 3000      - image (E)
    4681              :             //
    4682              :             // If GC horizon is at 2500, we can remove layers A and B, but
    4683              :             // we cannot remove C, even though it's older than 2500, because
    4684              :             // the delta layer 2000-3000 depends on it.
    4685         1984 :             if !layers
    4686         1984 :                 .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
    4687              :             {
    4688            0 :                 debug!("keeping {} because it is the latest layer", l.filename());
    4689         1984 :                 result.layers_not_updated += 1;
    4690         1984 :                 continue 'outer;
    4691            0 :             }
    4692              : 
    4693              :             // We didn't find any reason to keep this file, so remove it.
    4694            0 :             debug!(
    4695            0 :                 "garbage collecting {} is_dropped: xx is_incremental: {}",
    4696            0 :                 l.filename(),
    4697            0 :                 l.is_incremental(),
    4698            0 :             );
    4699            0 :             layers_to_remove.push(l);
    4700              :         }
    4701              : 
    4702          408 :         if !layers_to_remove.is_empty() {
    4703              :             // Persist the new GC cutoff value before we actually remove anything.
    4704              :             // This unconditionally schedules also an index_part.json update, even though, we will
    4705              :             // be doing one a bit later with the unlinked gc'd layers.
    4706            0 :             let disk_consistent_lsn = self.disk_consistent_lsn.load();
    4707            0 :             self.schedule_uploads(disk_consistent_lsn, None)?;
    4708              : 
    4709            0 :             let gc_layers = layers_to_remove
    4710            0 :                 .iter()
    4711            0 :                 .map(|x| guard.get_from_desc(x))
    4712            0 :                 .collect::<Vec<Layer>>();
    4713            0 : 
    4714            0 :             result.layers_removed = gc_layers.len() as u64;
    4715              : 
    4716            0 :             if let Some(remote_client) = self.remote_client.as_ref() {
    4717            0 :                 remote_client.schedule_gc_update(&gc_layers)?;
    4718            0 :             }
    4719              : 
    4720            0 :             guard.finish_gc_timeline(&gc_layers);
    4721            0 : 
    4722            0 :             #[cfg(feature = "testing")]
    4723            0 :             {
    4724            0 :                 result.doomed_layers = gc_layers;
    4725            0 :             }
    4726          408 :         }
    4727              : 
    4728          408 :         info!(
    4729          408 :             "GC completed removing {} layers, cutoff {}",
    4730          408 :             result.layers_removed, new_gc_cutoff
    4731          408 :         );
    4732              : 
    4733          408 :         result.elapsed = now.elapsed()?;
    4734          408 :         Ok(result)
    4735          408 :     }
    4736              : 
    4737              :     /// Reconstruct a value, using the given base image and WAL records in 'data'.
    4738       502736 :     async fn reconstruct_value(
    4739       502736 :         &self,
    4740       502736 :         key: Key,
    4741       502736 :         request_lsn: Lsn,
    4742       502736 :         mut data: ValueReconstructState,
    4743       502736 :     ) -> Result<Bytes, PageReconstructError> {
    4744       502736 :         // Perform WAL redo if needed
    4745       502736 :         data.records.reverse();
    4746       502736 : 
    4747       502736 :         // If we have a page image, and no WAL, we're all set
    4748       502736 :         if data.records.is_empty() {
    4749       502730 :             if let Some((img_lsn, img)) = &data.img {
    4750            0 :                 trace!(
    4751            0 :                     "found page image for key {} at {}, no WAL redo required, req LSN {}",
    4752            0 :                     key,
    4753            0 :                     img_lsn,
    4754            0 :                     request_lsn,
    4755            0 :                 );
    4756       502730 :                 Ok(img.clone())
    4757              :             } else {
    4758            0 :                 Err(PageReconstructError::from(anyhow!(
    4759            0 :                     "base image for {key} at {request_lsn} not found"
    4760            0 :                 )))
    4761              :             }
    4762              :         } else {
    4763              :             // We need to do WAL redo.
    4764              :             //
    4765              :             // If we don't have a base image, then the oldest WAL record better initialize
    4766              :             // the page
    4767            6 :             if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
    4768            0 :                 Err(PageReconstructError::from(anyhow!(
    4769            0 :                     "Base image for {} at {} not found, but got {} WAL records",
    4770            0 :                     key,
    4771            0 :                     request_lsn,
    4772            0 :                     data.records.len()
    4773            0 :                 )))
    4774              :             } else {
    4775            6 :                 if data.img.is_some() {
    4776            0 :                     trace!(
    4777            0 :                         "found {} WAL records and a base image for {} at {}, performing WAL redo",
    4778            0 :                         data.records.len(),
    4779            0 :                         key,
    4780            0 :                         request_lsn
    4781            0 :                     );
    4782              :                 } else {
    4783            0 :                     trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
    4784              :                 };
    4785              : 
    4786            6 :                 let last_rec_lsn = data.records.last().unwrap().0;
    4787              : 
    4788            6 :                 let img = match self
    4789            6 :                     .walredo_mgr
    4790            6 :                     .as_ref()
    4791            6 :                     .context("timeline has no walredo manager")
    4792            6 :                     .map_err(PageReconstructError::WalRedo)?
    4793            6 :                     .request_redo(key, request_lsn, data.img, data.records, self.pg_version)
    4794            0 :                     .await
    4795            6 :                     .context("reconstruct a page image")
    4796              :                 {
    4797            6 :                     Ok(img) => img,
    4798            0 :                     Err(e) => return Err(PageReconstructError::WalRedo(e)),
    4799              :                 };
    4800              : 
    4801            6 :                 if img.len() == page_cache::PAGE_SZ {
    4802            0 :                     let cache = page_cache::get();
    4803            0 :                     if let Err(e) = cache
    4804            0 :                         .memorize_materialized_page(
    4805            0 :                             self.tenant_shard_id,
    4806            0 :                             self.timeline_id,
    4807            0 :                             key,
    4808            0 :                             last_rec_lsn,
    4809            0 :                             &img,
    4810            0 :                         )
    4811            0 :                         .await
    4812            0 :                         .context("Materialized page memoization failed")
    4813              :                     {
    4814            0 :                         return Err(PageReconstructError::from(e));
    4815            0 :                     }
    4816            6 :                 }
    4817              : 
    4818            6 :                 Ok(img)
    4819              :             }
    4820              :         }
    4821       502736 :     }
    4822              : 
    4823            0 :     pub(crate) async fn spawn_download_all_remote_layers(
    4824            0 :         self: Arc<Self>,
    4825            0 :         request: DownloadRemoteLayersTaskSpawnRequest,
    4826            0 :     ) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
    4827            0 :         use pageserver_api::models::DownloadRemoteLayersTaskState;
    4828            0 : 
    4829            0 :         // this is not really needed anymore; it has tests which really check the return value from
    4830            0 :         // http api. it would be better not to maintain this anymore.
    4831            0 : 
    4832            0 :         let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
    4833            0 :         if let Some(st) = &*status_guard {
    4834            0 :             match &st.state {
    4835              :                 DownloadRemoteLayersTaskState::Running => {
    4836            0 :                     return Err(st.clone());
    4837              :                 }
    4838              :                 DownloadRemoteLayersTaskState::ShutDown
    4839            0 :                 | DownloadRemoteLayersTaskState::Completed => {
    4840            0 :                     *status_guard = None;
    4841            0 :                 }
    4842              :             }
    4843            0 :         }
    4844              : 
    4845            0 :         let self_clone = Arc::clone(&self);
    4846            0 :         let task_id = task_mgr::spawn(
    4847            0 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    4848            0 :             task_mgr::TaskKind::DownloadAllRemoteLayers,
    4849            0 :             Some(self.tenant_shard_id),
    4850            0 :             Some(self.timeline_id),
    4851            0 :             "download all remote layers task",
    4852              :             false,
    4853            0 :             async move {
    4854            0 :                 self_clone.download_all_remote_layers(request).await;
    4855            0 :                 let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
    4856            0 :                  match &mut *status_guard {
    4857              :                     None => {
    4858            0 :                         warn!("tasks status is supposed to be Some(), since we are running");
    4859              :                     }
    4860            0 :                     Some(st) => {
    4861            0 :                         let exp_task_id = format!("{}", task_mgr::current_task_id().unwrap());
    4862            0 :                         if st.task_id != exp_task_id {
    4863            0 :                             warn!("task id changed while we were still running, expecting {} but have {}", exp_task_id, st.task_id);
    4864            0 :                         } else {
    4865            0 :                             st.state = DownloadRemoteLayersTaskState::Completed;
    4866            0 :                         }
    4867              :                     }
    4868              :                 };
    4869            0 :                 Ok(())
    4870            0 :             }
    4871            0 :             .instrument(info_span!(parent: None, "download_all_remote_layers", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
    4872              :         );
    4873              : 
    4874            0 :         let initial_info = DownloadRemoteLayersTaskInfo {
    4875            0 :             task_id: format!("{task_id}"),
    4876            0 :             state: DownloadRemoteLayersTaskState::Running,
    4877            0 :             total_layer_count: 0,
    4878            0 :             successful_download_count: 0,
    4879            0 :             failed_download_count: 0,
    4880            0 :         };
    4881            0 :         *status_guard = Some(initial_info.clone());
    4882            0 : 
    4883            0 :         Ok(initial_info)
    4884            0 :     }
    4885              : 
    4886            0 :     async fn download_all_remote_layers(
    4887            0 :         self: &Arc<Self>,
    4888            0 :         request: DownloadRemoteLayersTaskSpawnRequest,
    4889            0 :     ) {
    4890              :         use pageserver_api::models::DownloadRemoteLayersTaskState;
    4891              : 
    4892            0 :         let remaining = {
    4893            0 :             let guard = self.layers.read().await;
    4894            0 :             guard
    4895            0 :                 .layer_map()
    4896            0 :                 .iter_historic_layers()
    4897            0 :                 .map(|desc| guard.get_from_desc(&desc))
    4898            0 :                 .collect::<Vec<_>>()
    4899            0 :         };
    4900            0 :         let total_layer_count = remaining.len();
    4901            0 : 
    4902            0 :         macro_rules! lock_status {
    4903            0 :             ($st:ident) => {
    4904            0 :                 let mut st = self.download_all_remote_layers_task_info.write().unwrap();
    4905            0 :                 let st = st
    4906            0 :                     .as_mut()
    4907            0 :                     .expect("this function is only called after the task has been spawned");
    4908            0 :                 assert_eq!(
    4909            0 :                     st.task_id,
    4910            0 :                     format!(
    4911            0 :                         "{}",
    4912            0 :                         task_mgr::current_task_id().expect("we run inside a task_mgr task")
    4913            0 :                     )
    4914            0 :                 );
    4915            0 :                 let $st = st;
    4916            0 :             };
    4917            0 :         }
    4918            0 : 
    4919            0 :         {
    4920            0 :             lock_status!(st);
    4921            0 :             st.total_layer_count = total_layer_count as u64;
    4922            0 :         }
    4923            0 : 
    4924            0 :         let mut remaining = remaining.into_iter();
    4925            0 :         let mut have_remaining = true;
    4926            0 :         let mut js = tokio::task::JoinSet::new();
    4927            0 : 
    4928            0 :         let cancel = task_mgr::shutdown_token();
    4929            0 : 
    4930            0 :         let limit = request.max_concurrent_downloads;
    4931              : 
    4932              :         loop {
    4933            0 :             while js.len() < limit.get() && have_remaining && !cancel.is_cancelled() {
    4934            0 :                 let Some(next) = remaining.next() else {
    4935            0 :                     have_remaining = false;
    4936            0 :                     break;
    4937              :                 };
    4938              : 
    4939            0 :                 let span = tracing::info_span!("download", layer = %next);
    4940              : 
    4941            0 :                 js.spawn(
    4942            0 :                     async move {
    4943            0 :                         let res = next.download().await;
    4944            0 :                         (next, res)
    4945            0 :                     }
    4946            0 :                     .instrument(span),
    4947            0 :                 );
    4948              :             }
    4949              : 
    4950            0 :             while let Some(res) = js.join_next().await {
    4951            0 :                 match res {
    4952              :                     Ok((_, Ok(_))) => {
    4953            0 :                         lock_status!(st);
    4954            0 :                         st.successful_download_count += 1;
    4955              :                     }
    4956            0 :                     Ok((layer, Err(e))) => {
    4957            0 :                         tracing::error!(%layer, "download failed: {e:#}");
    4958            0 :                         lock_status!(st);
    4959            0 :                         st.failed_download_count += 1;
    4960              :                     }
    4961            0 :                     Err(je) if je.is_cancelled() => unreachable!("not used here"),
    4962            0 :                     Err(je) if je.is_panic() => {
    4963            0 :                         lock_status!(st);
    4964            0 :                         st.failed_download_count += 1;
    4965              :                     }
    4966            0 :                     Err(je) => tracing::warn!("unknown joinerror: {je:?}"),
    4967              :                 }
    4968              :             }
    4969              : 
    4970            0 :             if js.is_empty() && (!have_remaining || cancel.is_cancelled()) {
    4971            0 :                 break;
    4972            0 :             }
    4973              :         }
    4974              : 
    4975              :         {
    4976            0 :             lock_status!(st);
    4977            0 :             st.state = DownloadRemoteLayersTaskState::Completed;
    4978            0 :         }
    4979            0 :     }
    4980              : 
    4981            0 :     pub(crate) fn get_download_all_remote_layers_task_info(
    4982            0 :         &self,
    4983            0 :     ) -> Option<DownloadRemoteLayersTaskInfo> {
    4984            0 :         self.download_all_remote_layers_task_info
    4985            0 :             .read()
    4986            0 :             .unwrap()
    4987            0 :             .clone()
    4988            0 :     }
    4989              : }
    4990              : 
    4991              : impl Timeline {
    4992              :     /// Returns non-remote layers for eviction.
    4993            0 :     pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
    4994            0 :         let guard = self.layers.read().await;
    4995            0 :         let mut max_layer_size: Option<u64> = None;
    4996              : 
    4997            0 :         let resident_layers = guard
    4998            0 :             .resident_layers()
    4999            0 :             .map(|layer| {
    5000            0 :                 let file_size = layer.layer_desc().file_size;
    5001            0 :                 max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
    5002            0 : 
    5003            0 :                 let last_activity_ts = layer.access_stats().latest_activity_or_now();
    5004            0 : 
    5005            0 :                 EvictionCandidate {
    5006            0 :                     layer: layer.into(),
    5007            0 :                     last_activity_ts,
    5008            0 :                     relative_last_activity: finite_f32::FiniteF32::ZERO,
    5009            0 :                 }
    5010            0 :             })
    5011            0 :             .collect()
    5012            0 :             .await;
    5013              : 
    5014            0 :         DiskUsageEvictionInfo {
    5015            0 :             max_layer_size,
    5016            0 :             resident_layers,
    5017            0 :         }
    5018            0 :     }
    5019              : 
    5020          560 :     pub(crate) fn get_shard_index(&self) -> ShardIndex {
    5021          560 :         ShardIndex {
    5022          560 :             shard_number: self.tenant_shard_id.shard_number,
    5023          560 :             shard_count: self.tenant_shard_id.shard_count,
    5024          560 :         }
    5025          560 :     }
    5026              : }
    5027              : 
    5028              : type TraversalPathItem = (
    5029              :     ValueReconstructResult,
    5030              :     Lsn,
    5031              :     Box<dyn Send + FnOnce() -> TraversalId>,
    5032              : );
    5033              : 
    5034              : /// Helper function for get_reconstruct_data() to add the path of layers traversed
    5035              : /// to an error, as anyhow context information.
    5036          106 : fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructError {
    5037          106 :     // We want the original 'msg' to be the outermost context. The outermost context
    5038          106 :     // is the most high-level information, which also gets propagated to the client.
    5039          106 :     let mut msg_iter = path
    5040          106 :         .into_iter()
    5041          108 :         .map(|(r, c, l)| {
    5042          108 :             format!(
    5043          108 :                 "layer traversal: result {:?}, cont_lsn {}, layer: {}",
    5044          108 :                 r,
    5045          108 :                 c,
    5046          108 :                 l(),
    5047          108 :             )
    5048          108 :         })
    5049          106 :         .chain(std::iter::once(msg));
    5050          106 :     // Construct initial message from the first traversed layer
    5051          106 :     let err = anyhow!(msg_iter.next().unwrap());
    5052          106 : 
    5053          106 :     // Append all subsequent traversals, and the error message 'msg', as contexts.
    5054          108 :     let msg = msg_iter.fold(err, |err, msg| err.context(msg));
    5055          106 :     PageReconstructError::from(msg)
    5056          106 : }
    5057              : 
    5058              : /// Various functions to mutate the timeline.
    5059              : // TODO Currently, Deref is used to allow easy access to read methods from this trait.
    5060              : // This is probably considered a bad practice in Rust and should be fixed eventually,
    5061              : // but will cause large code changes.
    5062              : pub(crate) struct TimelineWriter<'a> {
    5063              :     tl: &'a Timeline,
    5064              :     _write_guard: tokio::sync::MutexGuard<'a, ()>,
    5065              : }
    5066              : 
    5067              : impl Deref for TimelineWriter<'_> {
    5068              :     type Target = Timeline;
    5069              : 
    5070         2424 :     fn deref(&self) -> &Self::Target {
    5071         2424 :         self.tl
    5072         2424 :     }
    5073              : }
    5074              : 
    5075              : impl<'a> TimelineWriter<'a> {
    5076              :     /// Put a new page version that can be constructed from a WAL record
    5077              :     ///
    5078              :     /// This will implicitly extend the relation, if the page is beyond the
    5079              :     /// current end-of-file.
    5080      2214102 :     pub(crate) async fn put(
    5081      2214102 :         &self,
    5082      2214102 :         key: Key,
    5083      2214102 :         lsn: Lsn,
    5084      2214102 :         value: &Value,
    5085      2214102 :         ctx: &RequestContext,
    5086      2214102 :     ) -> anyhow::Result<()> {
    5087      2214102 :         self.tl.put_value(key, lsn, value, ctx).await
    5088      2214102 :     }
    5089              : 
    5090       413940 :     pub(crate) async fn put_batch(
    5091       413940 :         &self,
    5092       413940 :         batch: &HashMap<Key, Vec<(Lsn, Value)>>,
    5093       413940 :         ctx: &RequestContext,
    5094       413940 :     ) -> anyhow::Result<()> {
    5095       413940 :         self.tl.put_values(batch, ctx).await
    5096       413940 :     }
    5097              : 
    5098            2 :     pub(crate) async fn delete_batch(&self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
    5099            2 :         self.tl.put_tombstones(batch).await
    5100            2 :     }
    5101              : 
    5102              :     /// Track the end of the latest digested WAL record.
    5103              :     /// Remember the (end of) last valid WAL record remembered in the timeline.
    5104              :     ///
    5105              :     /// Call this after you have finished writing all the WAL up to 'lsn'.
    5106              :     ///
    5107              :     /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for
    5108              :     /// the 'lsn' or anything older. The previous last record LSN is stored alongside
    5109              :     /// the latest and can be read.
    5110      3102912 :     pub(crate) fn finish_write(&self, new_lsn: Lsn) {
    5111      3102912 :         self.tl.finish_write(new_lsn);
    5112      3102912 :     }
    5113              : 
    5114       270570 :     pub(crate) fn update_current_logical_size(&self, delta: i64) {
    5115       270570 :         self.tl.update_current_logical_size(delta)
    5116       270570 :     }
    5117              : }
    5118              : 
    5119              : // We need TimelineWriter to be send in upcoming conversion of
    5120              : // Timeline::layers to tokio::sync::RwLock.
    5121            2 : #[test]
    5122            2 : fn is_send() {
    5123            2 :     fn _assert_send<T: Send>() {}
    5124            2 :     _assert_send::<TimelineWriter<'_>>();
    5125            2 : }
    5126              : 
    5127              : /// Add a suffix to a layer file's name: .{num}.old
    5128              : /// Uses the first available num (starts at 0)
    5129            0 : fn rename_to_backup(path: &Utf8Path) -> anyhow::Result<()> {
    5130            0 :     let filename = path
    5131            0 :         .file_name()
    5132            0 :         .ok_or_else(|| anyhow!("Path {path} don't have a file name"))?;
    5133            0 :     let mut new_path = path.to_owned();
    5134              : 
    5135            0 :     for i in 0u32.. {
    5136            0 :         new_path.set_file_name(format!("{filename}.{i}.old"));
    5137            0 :         if !new_path.exists() {
    5138            0 :             std::fs::rename(path, &new_path)
    5139            0 :                 .with_context(|| format!("rename {path:?} to {new_path:?}"))?;
    5140            0 :             return Ok(());
    5141            0 :         }
    5142              :     }
    5143              : 
    5144            0 :     bail!("couldn't find an unused backup number for {:?}", path)
    5145            0 : }
    5146              : 
    5147              : #[cfg(test)]
    5148              : mod tests {
    5149              :     use utils::{id::TimelineId, lsn::Lsn};
    5150              : 
    5151              :     use crate::tenant::{
    5152              :         harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline,
    5153              :     };
    5154              : 
    5155            2 :     #[tokio::test]
    5156            2 :     async fn two_layer_eviction_attempts_at_the_same_time() {
    5157            2 :         let harness =
    5158            2 :             TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
    5159            2 : 
    5160            2 :         let ctx = any_context();
    5161            2 :         let tenant = harness.do_try_load(&ctx).await.unwrap();
    5162            2 :         let timeline = tenant
    5163            2 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
    5164            6 :             .await
    5165            2 :             .unwrap();
    5166            2 : 
    5167            2 :         let layer = find_some_layer(&timeline).await;
    5168            2 :         let layer = layer
    5169            2 :             .keep_resident()
    5170            2 :             .await
    5171            2 :             .expect("no download => no downloading errors")
    5172            2 :             .expect("should had been resident")
    5173            2 :             .drop_eviction_guard();
    5174            2 : 
    5175            2 :         let first = async { layer.evict_and_wait().await };
    5176            2 :         let second = async { layer.evict_and_wait().await };
    5177            2 : 
    5178            3 :         let (first, second) = tokio::join!(first, second);
    5179            2 : 
    5180            2 :         let res = layer.keep_resident().await;
    5181            2 :         assert!(matches!(res, Ok(None)), "{res:?}");
    5182            2 : 
    5183            2 :         match (first, second) {
    5184            2 :             (Ok(()), Ok(())) => {
    5185            0 :                 // because there are no more timeline locks being taken on eviction path, we can
    5186            0 :                 // witness all three outcomes here.
    5187            0 :             }
    5188            2 :             (Ok(()), Err(EvictionError::NotFound)) | (Err(EvictionError::NotFound), Ok(())) => {
    5189            2 :                 // if one completes before the other, this is fine just as well.
    5190            2 :             }
    5191            2 :             other => unreachable!("unexpected {:?}", other),
    5192            2 :         }
    5193            2 :     }
    5194              : 
    5195            2 :     fn any_context() -> crate::context::RequestContext {
    5196            2 :         use crate::context::*;
    5197            2 :         use crate::task_mgr::*;
    5198            2 :         RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
    5199            2 :     }
    5200              : 
    5201            2 :     async fn find_some_layer(timeline: &Timeline) -> Layer {
    5202            2 :         let layers = timeline.layers.read().await;
    5203            2 :         let desc = layers
    5204            2 :             .layer_map()
    5205            2 :             .iter_historic_layers()
    5206            2 :             .next()
    5207            2 :             .expect("must find one layer to evict");
    5208            2 : 
    5209            2 :         layers.get_from_desc(&desc)
    5210            2 :     }
    5211              : }
        

Generated by: LCOV version 2.1-beta