LCOV - code coverage report
Current view: top level - pageserver/src/tenant - timeline.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 88.9 % 2867 2548
Test Date: 2024-02-07 07:37:29 Functions: 64.9 % 405 263

            Line data    Source code
       1              : pub mod delete;
       2              : mod eviction_task;
       3              : mod init;
       4              : pub mod layer_manager;
       5              : pub(crate) mod logical_size;
       6              : pub mod span;
       7              : pub mod uninit;
       8              : mod walreceiver;
       9              : 
      10              : use anyhow::{anyhow, bail, ensure, Context, Result};
      11              : use bytes::Bytes;
      12              : use camino::{Utf8Path, Utf8PathBuf};
      13              : use enumset::EnumSet;
      14              : use fail::fail_point;
      15              : use itertools::Itertools;
      16              : use pageserver_api::{
      17              :     keyspace::{key_range_size, KeySpaceAccum},
      18              :     models::{
      19              :         DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
      20              :         LayerMapInfo, TimelineState,
      21              :     },
      22              :     reltag::BlockNumber,
      23              :     shard::{ShardIdentity, TenantShardId},
      24              : };
      25              : use rand::Rng;
      26              : use serde_with::serde_as;
      27              : use storage_broker::BrokerClientChannel;
      28              : use tokio::{
      29              :     runtime::Handle,
      30              :     sync::{oneshot, watch},
      31              : };
      32              : use tokio_util::sync::CancellationToken;
      33              : use tracing::*;
      34              : use utils::sync::gate::Gate;
      35              : 
      36              : use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet};
      37              : use std::ops::{Deref, Range};
      38              : use std::pin::pin;
      39              : use std::sync::atomic::Ordering as AtomicOrdering;
      40              : use std::sync::{Arc, Mutex, RwLock, Weak};
      41              : use std::time::{Duration, Instant, SystemTime};
      42              : use std::{
      43              :     cmp::{max, min, Ordering},
      44              :     ops::ControlFlow,
      45              : };
      46              : 
      47              : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
      48              : use crate::tenant::{
      49              :     layer_map::{LayerMap, SearchResult},
      50              :     metadata::{save_metadata, TimelineMetadata},
      51              :     par_fsync,
      52              : };
      53              : use crate::{
      54              :     context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder},
      55              :     disk_usage_eviction_task::DiskUsageEvictionInfo,
      56              : };
      57              : use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
      58              : use crate::{
      59              :     disk_usage_eviction_task::finite_f32,
      60              :     tenant::storage_layer::{
      61              :         AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
      62              :         LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
      63              :         ValueReconstructState,
      64              :     },
      65              : };
      66              : use crate::{
      67              :     disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
      68              : };
      69              : use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
      70              : 
      71              : use crate::config::PageServerConf;
      72              : use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
      73              : use crate::metrics::{
      74              :     TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
      75              : };
      76              : use crate::pgdatadir_mapping::CalculateLogicalSizeError;
      77              : use crate::tenant::config::TenantConfOpt;
      78              : use pageserver_api::key::{is_inherited_key, is_rel_fsm_block_key, is_rel_vm_block_key};
      79              : use pageserver_api::reltag::RelTag;
      80              : use pageserver_api::shard::ShardIndex;
      81              : 
      82              : use postgres_connection::PgConnectionConfig;
      83              : use postgres_ffi::to_pg_timestamp;
      84              : use utils::{
      85              :     completion,
      86              :     generation::Generation,
      87              :     id::TimelineId,
      88              :     lsn::{AtomicLsn, Lsn, RecordLsn},
      89              :     seqwait::SeqWait,
      90              :     simple_rcu::{Rcu, RcuReadGuard},
      91              : };
      92              : 
      93              : use crate::page_cache;
      94              : use crate::repository::GcResult;
      95              : use crate::repository::{Key, Value};
      96              : use crate::task_mgr;
      97              : use crate::task_mgr::TaskKind;
      98              : use crate::ZERO_PAGE;
      99              : 
     100              : use self::delete::DeleteTimelineFlow;
     101              : pub(super) use self::eviction_task::EvictionTaskTenantState;
     102              : use self::eviction_task::EvictionTaskTimelineState;
     103              : use self::layer_manager::LayerManager;
     104              : use self::logical_size::LogicalSize;
     105              : use self::walreceiver::{WalReceiver, WalReceiverConf};
     106              : 
     107              : use super::config::TenantConf;
     108              : use super::remote_timeline_client::index::{IndexLayerMetadata, IndexPart};
     109              : use super::remote_timeline_client::RemoteTimelineClient;
     110              : use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
     111              : use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
     112              : 
     113           44 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
     114              : pub(super) enum FlushLoopState {
     115              :     NotStarted,
     116              :     Running {
     117              :         #[cfg(test)]
     118              :         expect_initdb_optimization: bool,
     119              :         #[cfg(test)]
     120              :         initdb_optimization_count: usize,
     121              :     },
     122              :     Exited,
     123              : }
     124              : 
     125              : /// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
     126            0 : #[derive(Debug, Clone, PartialEq, Eq)]
     127              : pub(crate) struct Hole {
     128              :     key_range: Range<Key>,
     129              :     coverage_size: usize,
     130              : }
     131              : 
     132              : impl Ord for Hole {
     133          266 :     fn cmp(&self, other: &Self) -> Ordering {
     134          266 :         other.coverage_size.cmp(&self.coverage_size) // inverse order
     135          266 :     }
     136              : }
     137              : 
     138              : impl PartialOrd for Hole {
     139          266 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     140          266 :         Some(self.cmp(other))
     141          266 :     }
     142              : }
     143              : 
     144              : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
     145              : /// Can be removed after all refactors are done.
     146          293 : fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
     147          293 :     drop(rlock)
     148          293 : }
     149              : 
     150              : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
     151              : /// Can be removed after all refactors are done.
     152         1892 : fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
     153         1892 :     drop(rlock)
     154         1892 : }
     155              : 
     156              : /// The outward-facing resources required to build a Timeline
     157              : pub struct TimelineResources {
     158              :     pub remote_client: Option<RemoteTimelineClient>,
     159              :     pub deletion_queue_client: DeletionQueueClient,
     160              : }
     161              : 
     162              : pub struct Timeline {
     163              :     conf: &'static PageServerConf,
     164              :     tenant_conf: Arc<RwLock<AttachedTenantConf>>,
     165              : 
     166              :     myself: Weak<Self>,
     167              : 
     168              :     pub(crate) tenant_shard_id: TenantShardId,
     169              :     pub timeline_id: TimelineId,
     170              : 
     171              :     /// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
     172              :     /// Never changes for the lifetime of this [`Timeline`] object.
     173              :     ///
     174              :     /// This duplicates the generation stored in LocationConf, but that structure is mutable:
     175              :     /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
     176              :     pub(crate) generation: Generation,
     177              : 
     178              :     /// The detailed sharding information from our parent Tenant.  This enables us to map keys
     179              :     /// to shards, and is constant through the lifetime of this Timeline.
     180              :     shard_identity: ShardIdentity,
     181              : 
     182              :     pub pg_version: u32,
     183              : 
     184              :     /// The tuple has two elements.
     185              :     /// 1. `LayerFileManager` keeps track of the various physical representations of the layer files (inmem, local, remote).
     186              :     /// 2. `LayerMap`, the acceleration data structure for `get_reconstruct_data`.
     187              :     ///
     188              :     /// `LayerMap` maps out the `(PAGE,LSN) / (KEY,LSN)` space, which is composed of `(KeyRange, LsnRange)` rectangles.
     189              :     /// We describe these rectangles through the `PersistentLayerDesc` struct.
     190              :     ///
     191              :     /// When we want to reconstruct a page, we first find the `PersistentLayerDesc`'s that we need for page reconstruction,
     192              :     /// using `LayerMap`. Then, we use `LayerFileManager` to get the `PersistentLayer`'s that correspond to the
     193              :     /// `PersistentLayerDesc`'s.
     194              :     ///
     195              :     /// Hence, it's important to keep things coherent. The `LayerFileManager` must always have an entry for all
     196              :     /// `PersistentLayerDesc`'s in the `LayerMap`. If it doesn't, `LayerFileManager::get_from_desc` will panic at
     197              :     /// runtime, e.g., during page reconstruction.
     198              :     ///
     199              :     /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
     200              :     /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
     201              :     pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
     202              : 
     203              :     /// Set of key ranges which should be covered by image layers to
     204              :     /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
     205              :     /// It is used by compaction task when it checks if new image layer should be created.
     206              :     /// Newly created image layer doesn't help to remove the delta layer, until the
     207              :     /// newly created image layer falls off the PITR horizon. So on next GC cycle,
     208              :     /// gc_timeline may still want the new image layer to be created. To avoid redundant
     209              :     /// image layers creation we should check if image layer exists but beyond PITR horizon.
     210              :     /// This is why we need remember GC cutoff LSN.
     211              :     ///
     212              :     wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
     213              : 
     214              :     last_freeze_at: AtomicLsn,
     215              :     // Atomic would be more appropriate here.
     216              :     last_freeze_ts: RwLock<Instant>,
     217              : 
     218              :     pub(crate) standby_horizon: AtomicLsn,
     219              : 
     220              :     // WAL redo manager. `None` only for broken tenants.
     221              :     walredo_mgr: Option<Arc<super::WalRedoManager>>,
     222              : 
     223              :     /// Remote storage client.
     224              :     /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
     225              :     pub remote_client: Option<Arc<RemoteTimelineClient>>,
     226              : 
     227              :     // What page versions do we hold in the repository? If we get a
     228              :     // request > last_record_lsn, we need to wait until we receive all
     229              :     // the WAL up to the request. The SeqWait provides functions for
     230              :     // that. TODO: If we get a request for an old LSN, such that the
     231              :     // versions have already been garbage collected away, we should
     232              :     // throw an error, but we don't track that currently.
     233              :     //
     234              :     // last_record_lsn.load().last points to the end of last processed WAL record.
     235              :     //
     236              :     // We also remember the starting point of the previous record in
     237              :     // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
     238              :     // first WAL record when the node is started up. But here, we just
     239              :     // keep track of it.
     240              :     last_record_lsn: SeqWait<RecordLsn, Lsn>,
     241              : 
     242              :     // All WAL records have been processed and stored durably on files on
     243              :     // local disk, up to this LSN. On crash and restart, we need to re-process
     244              :     // the WAL starting from this point.
     245              :     //
     246              :     // Some later WAL records might have been processed and also flushed to disk
     247              :     // already, so don't be surprised to see some, but there's no guarantee on
     248              :     // them yet.
     249              :     disk_consistent_lsn: AtomicLsn,
     250              : 
     251              :     // Parent timeline that this timeline was branched from, and the LSN
     252              :     // of the branch point.
     253              :     ancestor_timeline: Option<Arc<Timeline>>,
     254              :     ancestor_lsn: Lsn,
     255              : 
     256              :     pub(super) metrics: TimelineMetrics,
     257              : 
     258              :     // `Timeline` doesn't write these metrics itself, but it manages the lifetime.  Code
     259              :     // in `crate::page_service` writes these metrics.
     260              :     pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
     261              : 
     262              :     /// Ensures layers aren't frozen by checkpointer between
     263              :     /// [`Timeline::get_layer_for_write`] and layer reads.
     264              :     /// Locked automatically by [`TimelineWriter`] and checkpointer.
     265              :     /// Must always be acquired before the layer map/individual layer lock
     266              :     /// to avoid deadlock.
     267              :     write_lock: tokio::sync::Mutex<()>,
     268              : 
     269              :     /// Used to avoid multiple `flush_loop` tasks running
     270              :     pub(super) flush_loop_state: Mutex<FlushLoopState>,
     271              : 
     272              :     /// layer_flush_start_tx can be used to wake up the layer-flushing task.
     273              :     /// The value is a counter, incremented every time a new flush cycle is requested.
     274              :     /// The flush cycle counter is sent back on the layer_flush_done channel when
     275              :     /// the flush finishes. You can use that to wait for the flush to finish.
     276              :     layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
     277              :     /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
     278              :     layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
     279              : 
     280              :     // Needed to ensure that we can't create a branch at a point that was already garbage collected
     281              :     pub latest_gc_cutoff_lsn: Rcu<Lsn>,
     282              : 
     283              :     // List of child timelines and their branch points. This is needed to avoid
     284              :     // garbage collecting data that is still needed by the child timelines.
     285              :     pub gc_info: std::sync::RwLock<GcInfo>,
     286              : 
     287              :     // It may change across major versions so for simplicity
     288              :     // keep it after running initdb for a timeline.
     289              :     // It is needed in checks when we want to error on some operations
     290              :     // when they are requested for pre-initdb lsn.
     291              :     // It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn",
     292              :     // though let's keep them both for better error visibility.
     293              :     pub initdb_lsn: Lsn,
     294              : 
     295              :     /// When did we last calculate the partitioning?
     296              :     partitioning: Mutex<(KeyPartitioning, Lsn)>,
     297              : 
     298              :     /// Configuration: how often should the partitioning be recalculated.
     299              :     repartition_threshold: u64,
     300              : 
     301              :     /// Current logical size of the "datadir", at the last LSN.
     302              :     current_logical_size: LogicalSize,
     303              : 
     304              :     /// Information about the last processed message by the WAL receiver,
     305              :     /// or None if WAL receiver has not received anything for this timeline
     306              :     /// yet.
     307              :     pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
     308              :     pub walreceiver: Mutex<Option<WalReceiver>>,
     309              : 
     310              :     /// Relation size cache
     311              :     pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
     312              : 
     313              :     download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
     314              : 
     315              :     state: watch::Sender<TimelineState>,
     316              : 
     317              :     /// Prevent two tasks from deleting the timeline at the same time. If held, the
     318              :     /// timeline is being deleted. If 'true', the timeline has already been deleted.
     319              :     pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
     320              : 
     321              :     eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
     322              : 
     323              :     /// Load or creation time information about the disk_consistent_lsn and when the loading
     324              :     /// happened. Used for consumption metrics.
     325              :     pub(crate) loaded_at: (Lsn, SystemTime),
     326              : 
     327              :     /// Gate to prevent shutdown completing while I/O is still happening to this timeline's data
     328              :     pub(crate) gate: Gate,
     329              : 
     330              :     /// Cancellation token scoped to this timeline: anything doing long-running work relating
     331              :     /// to the timeline should drop out when this token fires.
     332              :     pub(crate) cancel: CancellationToken,
     333              : 
     334              :     /// Make sure we only have one running compaction at a time in tests.
     335              :     ///
     336              :     /// Must only be taken in two places:
     337              :     /// - [`Timeline::compact`] (this file)
     338              :     /// - [`delete::delete_local_layer_files`]
     339              :     ///
     340              :     /// Timeline deletion will acquire both compaction and gc locks in whatever order.
     341              :     compaction_lock: tokio::sync::Mutex<()>,
     342              : 
     343              :     /// Make sure we only have one running gc at a time.
     344              :     ///
     345              :     /// Must only be taken in two places:
     346              :     /// - [`Timeline::gc`] (this file)
     347              :     /// - [`delete::delete_local_layer_files`]
     348              :     ///
     349              :     /// Timeline deletion will acquire both compaction and gc locks in whatever order.
     350              :     gc_lock: tokio::sync::Mutex<()>,
     351              : }
     352              : 
     353              : pub struct WalReceiverInfo {
     354              :     pub wal_source_connconf: PgConnectionConfig,
     355              :     pub last_received_msg_lsn: Lsn,
     356              :     pub last_received_msg_ts: u128,
     357              : }
     358              : 
     359              : ///
     360              : /// Information about how much history needs to be retained, needed by
     361              : /// Garbage Collection.
     362              : ///
     363              : pub struct GcInfo {
     364              :     /// Specific LSNs that are needed.
     365              :     ///
     366              :     /// Currently, this includes all points where child branches have
     367              :     /// been forked off from. In the future, could also include
     368              :     /// explicit user-defined snapshot points.
     369              :     pub retain_lsns: Vec<Lsn>,
     370              : 
     371              :     /// In addition to 'retain_lsns', keep everything newer than this
     372              :     /// point.
     373              :     ///
     374              :     /// This is calculated by subtracting 'gc_horizon' setting from
     375              :     /// last-record LSN
     376              :     ///
     377              :     /// FIXME: is this inclusive or exclusive?
     378              :     pub horizon_cutoff: Lsn,
     379              : 
     380              :     /// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this
     381              :     /// point.
     382              :     ///
     383              :     /// This is calculated by finding a number such that a record is needed for PITR
     384              :     /// if only if its LSN is larger than 'pitr_cutoff'.
     385              :     pub pitr_cutoff: Lsn,
     386              : }
     387              : 
     388              : /// An error happened in a get() operation.
     389           70 : #[derive(thiserror::Error, Debug)]
     390              : pub(crate) enum PageReconstructError {
     391              :     #[error(transparent)]
     392              :     Other(#[from] anyhow::Error),
     393              : 
     394              :     #[error("Ancestor LSN wait error: {0}")]
     395              :     AncestorLsnTimeout(#[from] WaitLsnError),
     396              : 
     397              :     #[error("timeline shutting down")]
     398              :     Cancelled,
     399              : 
     400              :     /// The ancestor of this is being stopped
     401              :     #[error("ancestor timeline {0} is being stopped")]
     402              :     AncestorStopping(TimelineId),
     403              : 
     404              :     /// An error happened replaying WAL records
     405              :     #[error(transparent)]
     406              :     WalRedo(anyhow::Error),
     407              : }
     408              : 
     409              : impl PageReconstructError {
     410              :     /// Returns true if this error indicates a tenant/timeline shutdown alike situation
     411            0 :     pub(crate) fn is_stopping(&self) -> bool {
     412            0 :         use PageReconstructError::*;
     413            0 :         match self {
     414            0 :             Other(_) => false,
     415            0 :             AncestorLsnTimeout(_) => false,
     416            0 :             Cancelled | AncestorStopping(_) => true,
     417            0 :             WalRedo(_) => false,
     418              :         }
     419            0 :     }
     420              : }
     421              : 
     422            0 : #[derive(thiserror::Error, Debug)]
     423              : enum CreateImageLayersError {
     424              :     #[error("timeline shutting down")]
     425              :     Cancelled,
     426              : 
     427              :     #[error(transparent)]
     428              :     GetVectoredError(GetVectoredError),
     429              : 
     430              :     #[error(transparent)]
     431              :     PageReconstructError(PageReconstructError),
     432              : 
     433              :     #[error(transparent)]
     434              :     Other(#[from] anyhow::Error),
     435              : }
     436              : 
     437            0 : #[derive(thiserror::Error, Debug)]
     438              : enum FlushLayerError {
     439              :     /// Timeline cancellation token was cancelled
     440              :     #[error("timeline shutting down")]
     441              :     Cancelled,
     442              : 
     443              :     #[error(transparent)]
     444              :     CreateImageLayersError(CreateImageLayersError),
     445              : 
     446              :     #[error(transparent)]
     447              :     Other(#[from] anyhow::Error),
     448              : }
     449              : 
     450            0 : #[derive(thiserror::Error, Debug)]
     451              : pub(crate) enum GetVectoredError {
     452              :     #[error("timeline shutting down")]
     453              :     Cancelled,
     454              : 
     455              :     #[error("Requested too many keys: {0} > {}", Timeline::MAX_GET_VECTORED_KEYS)]
     456              :     Oversized(u64),
     457              : 
     458              :     #[error("Requested at invalid LSN: {0}")]
     459              :     InvalidLsn(Lsn),
     460              : }
     461              : 
     462            0 : #[derive(thiserror::Error, Debug)]
     463              : pub(crate) enum GetReadyAncestorError {
     464              :     #[error("ancestor timeline {0} is being stopped")]
     465              :     AncestorStopping(TimelineId),
     466              : 
     467              :     #[error("Ancestor LSN wait error: {0}")]
     468              :     AncestorLsnTimeout(#[from] WaitLsnError),
     469              : 
     470              :     #[error("Cancelled")]
     471              :     Cancelled,
     472              : 
     473              :     #[error(transparent)]
     474              :     Other(#[from] anyhow::Error),
     475              : }
     476              : 
     477            0 : #[derive(Clone, Copy)]
     478              : pub enum LogicalSizeCalculationCause {
     479              :     Initial,
     480              :     ConsumptionMetricsSyntheticSize,
     481              :     EvictionTaskImitation,
     482              :     TenantSizeHandler,
     483              : }
     484              : 
     485              : pub enum GetLogicalSizePriority {
     486              :     User,
     487              :     Background,
     488              : }
     489              : 
     490          727 : #[derive(enumset::EnumSetType)]
     491              : pub(crate) enum CompactFlags {
     492              :     ForceRepartition,
     493              : }
     494              : 
     495              : impl std::fmt::Debug for Timeline {
     496            0 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
     497            0 :         write!(f, "Timeline<{}>", self.timeline_id)
     498            0 :     }
     499              : }
     500              : 
     501           48 : #[derive(thiserror::Error, Debug)]
     502              : pub(crate) enum WaitLsnError {
     503              :     // Called on a timeline which is shutting down
     504              :     #[error("Shutdown")]
     505              :     Shutdown,
     506              : 
     507              :     // Called on an timeline not in active state or shutting down
     508              :     #[error("Bad state (not active)")]
     509              :     BadState,
     510              : 
     511              :     // Timeout expired while waiting for LSN to catch up with goal.
     512              :     #[error("{0}")]
     513              :     Timeout(String),
     514              : }
     515              : 
     516              : // The impls below achieve cancellation mapping for errors.
     517              : // Perhaps there's a way of achieving this with less cruft.
     518              : 
     519              : impl From<CreateImageLayersError> for CompactionError {
     520            0 :     fn from(e: CreateImageLayersError) -> Self {
     521            0 :         match e {
     522            0 :             CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
     523            0 :             _ => CompactionError::Other(e.into()),
     524              :         }
     525            0 :     }
     526              : }
     527              : 
     528              : impl From<CreateImageLayersError> for FlushLayerError {
     529            0 :     fn from(e: CreateImageLayersError) -> Self {
     530            0 :         match e {
     531            0 :             CreateImageLayersError::Cancelled => FlushLayerError::Cancelled,
     532            0 :             any => FlushLayerError::CreateImageLayersError(any),
     533              :         }
     534            0 :     }
     535              : }
     536              : 
     537              : impl From<PageReconstructError> for CreateImageLayersError {
     538            0 :     fn from(e: PageReconstructError) -> Self {
     539            0 :         match e {
     540            0 :             PageReconstructError::Cancelled => CreateImageLayersError::Cancelled,
     541            0 :             _ => CreateImageLayersError::PageReconstructError(e),
     542              :         }
     543            0 :     }
     544              : }
     545              : 
     546              : impl From<GetVectoredError> for CreateImageLayersError {
     547            0 :     fn from(e: GetVectoredError) -> Self {
     548            0 :         match e {
     549            0 :             GetVectoredError::Cancelled => CreateImageLayersError::Cancelled,
     550            0 :             _ => CreateImageLayersError::GetVectoredError(e),
     551              :         }
     552            0 :     }
     553              : }
     554              : 
     555              : impl From<GetReadyAncestorError> for PageReconstructError {
     556            4 :     fn from(e: GetReadyAncestorError) -> Self {
     557            4 :         use GetReadyAncestorError::*;
     558            4 :         match e {
     559            2 :             AncestorStopping(tid) => PageReconstructError::AncestorStopping(tid),
     560            0 :             AncestorLsnTimeout(wait_err) => PageReconstructError::AncestorLsnTimeout(wait_err),
     561            0 :             Cancelled => PageReconstructError::Cancelled,
     562            2 :             Other(other) => PageReconstructError::Other(other),
     563              :         }
     564            4 :     }
     565              : }
     566              : 
     567              : /// Public interface functions
     568              : impl Timeline {
     569              :     /// Get the LSN where this branch was created
     570         3659 :     pub(crate) fn get_ancestor_lsn(&self) -> Lsn {
     571         3659 :         self.ancestor_lsn
     572         3659 :     }
     573              : 
     574              :     /// Get the ancestor's timeline id
     575         4999 :     pub(crate) fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
     576         4999 :         self.ancestor_timeline
     577         4999 :             .as_ref()
     578         4999 :             .map(|ancestor| ancestor.timeline_id)
     579         4999 :     }
     580              : 
     581              :     /// Lock and get timeline's GC cutoff
     582      5623582 :     pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
     583      5623582 :         self.latest_gc_cutoff_lsn.read()
     584      5623582 :     }
     585              : 
     586              :     /// Look up given page version.
     587              :     ///
     588              :     /// If a remote layer file is needed, it is downloaded as part of this
     589              :     /// call.
     590              :     ///
     591              :     /// NOTE: It is considered an error to 'get' a key that doesn't exist. The
     592              :     /// abstraction above this needs to store suitable metadata to track what
     593              :     /// data exists with what keys, in separate metadata entries. If a
     594              :     /// non-existent key is requested, we may incorrectly return a value from
     595              :     /// an ancestor branch, for example, or waste a lot of cycles chasing the
     596              :     /// non-existing key.
     597              :     ///
     598              :     /// # Cancel-Safety
     599              :     ///
     600              :     /// This method is cancellation-safe.
     601      8625195 :     pub(crate) async fn get(
     602      8625195 :         &self,
     603      8625195 :         key: Key,
     604      8625195 :         lsn: Lsn,
     605      8625195 :         ctx: &RequestContext,
     606      8625197 :     ) -> Result<Bytes, PageReconstructError> {
     607      8625197 :         if !lsn.is_valid() {
     608            1 :             return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
     609      8625196 :         }
     610              : 
     611              :         // This check is debug-only because of the cost of hashing, and because it's a double-check: we
     612              :         // already checked the key against the shard_identity when looking up the Timeline from
     613              :         // page_service.
     614      8625196 :         debug_assert!(!self.shard_identity.is_key_disposable(&key));
     615              : 
     616              :         // XXX: structured stats collection for layer eviction here.
     617            0 :         trace!(
     618            0 :             "get page request for {}@{} from task kind {:?}",
     619            0 :             key,
     620            0 :             lsn,
     621            0 :             ctx.task_kind()
     622            0 :         );
     623              : 
     624              :         // Check the page cache. We will get back the most recent page with lsn <= `lsn`.
     625              :         // The cached image can be returned directly if there is no WAL between the cached image
     626              :         // and requested LSN. The cached image can also be used to reduce the amount of WAL needed
     627              :         // for redo.
     628      8625196 :         let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
     629      1776743 :             Some((cached_lsn, cached_img)) => {
     630      1776743 :                 match cached_lsn.cmp(&lsn) {
     631      1776726 :                     Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
     632              :                     Ordering::Equal => {
     633           17 :                         MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
     634           17 :                         return Ok(cached_img); // exact LSN match, return the image
     635              :                     }
     636              :                     Ordering::Greater => {
     637            0 :                         unreachable!("the returned lsn should never be after the requested lsn")
     638              :                     }
     639              :                 }
     640      1776726 :                 Some((cached_lsn, cached_img))
     641              :             }
     642      6848452 :             None => None,
     643              :         };
     644              : 
     645      8625178 :         let mut reconstruct_state = ValueReconstructState {
     646      8625178 :             records: Vec::new(),
     647      8625178 :             img: cached_page_img,
     648      8625178 :         };
     649      8625178 : 
     650      8625178 :         let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
     651      8625178 :         let path = self
     652      8625178 :             .get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
     653      2485493 :             .await?;
     654      8623839 :         timer.stop_and_record();
     655      8623839 : 
     656      8623839 :         let start = Instant::now();
     657      8623839 :         let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
     658      8623839 :         let elapsed = start.elapsed();
     659      8623839 :         crate::metrics::RECONSTRUCT_TIME
     660      8623839 :             .for_result(&res)
     661      8623839 :             .observe(elapsed.as_secs_f64());
     662      8623839 : 
     663      8623839 :         if cfg!(feature = "testing") && res.is_err() {
     664              :             // it can only be walredo issue
     665              :             use std::fmt::Write;
     666              : 
     667            0 :             let mut msg = String::new();
     668            0 : 
     669            0 :             path.into_iter().for_each(|(res, cont_lsn, layer)| {
     670            0 :                 writeln!(
     671            0 :                     msg,
     672            0 :                     "- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
     673            0 :                     layer(),
     674            0 :                 )
     675            0 :                 .expect("string grows")
     676            0 :             });
     677              : 
     678              :             // this is to rule out or provide evidence that we could in some cases read a duplicate
     679              :             // walrecord
     680            0 :             tracing::info!("walredo failed, path:\n{msg}");
     681      8623839 :         }
     682              : 
     683      8623839 :         res
     684      8625185 :     }
     685              : 
     686              :     pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
     687              : 
     688              :     /// Look up multiple page versions at a given LSN
     689              :     ///
     690              :     /// This naive implementation will be replaced with a more efficient one
     691              :     /// which actually vectorizes the read path.
     692        72383 :     pub(crate) async fn get_vectored(
     693        72383 :         &self,
     694        72383 :         key_ranges: &[Range<Key>],
     695        72383 :         lsn: Lsn,
     696        72383 :         ctx: &RequestContext,
     697        72383 :     ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
     698        72383 :         if !lsn.is_valid() {
     699            0 :             return Err(GetVectoredError::InvalidLsn(lsn));
     700        72383 :         }
     701        72383 : 
     702        72383 :         let key_count = key_ranges
     703        72383 :             .iter()
     704        73586 :             .map(|range| key_range_size(range) as u64)
     705        72383 :             .sum();
     706        72383 :         if key_count > Timeline::MAX_GET_VECTORED_KEYS {
     707            0 :             return Err(GetVectoredError::Oversized(key_count));
     708        72383 :         }
     709        72383 : 
     710        72383 :         let _timer = crate::metrics::GET_VECTORED_LATENCY
     711        72383 :             .for_task_kind(ctx.task_kind())
     712        72383 :             .map(|t| t.start_timer());
     713        72383 : 
     714        72383 :         let mut values = BTreeMap::new();
     715       145969 :         for range in key_ranges {
     716        73586 :             let mut key = range.start;
     717       305982 :             while key != range.end {
     718       232396 :                 assert!(!self.shard_identity.is_key_disposable(&key));
     719              : 
     720       232396 :                 let block = self.get(key, lsn, ctx).await;
     721              : 
     722              :                 if matches!(
     723            0 :                     block,
     724              :                     Err(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_))
     725              :                 ) {
     726            0 :                     return Err(GetVectoredError::Cancelled);
     727       232396 :                 }
     728       232396 : 
     729       232396 :                 values.insert(key, block);
     730       232396 :                 key = key.next();
     731              :             }
     732              :         }
     733              : 
     734        72383 :         Ok(values)
     735        72383 :     }
     736              : 
     737              :     /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
     738     10862121 :     pub(crate) fn get_last_record_lsn(&self) -> Lsn {
     739     10862121 :         self.last_record_lsn.load().last
     740     10862121 :     }
     741              : 
     742         3042 :     pub(crate) fn get_prev_record_lsn(&self) -> Lsn {
     743         3042 :         self.last_record_lsn.load().prev
     744         3042 :     }
     745              : 
     746              :     /// Atomically get both last and prev.
     747         1084 :     pub(crate) fn get_last_record_rlsn(&self) -> RecordLsn {
     748         1084 :         self.last_record_lsn.load()
     749         1084 :     }
     750              : 
     751       800801 :     pub(crate) fn get_disk_consistent_lsn(&self) -> Lsn {
     752       800801 :         self.disk_consistent_lsn.load()
     753       800801 :     }
     754              : 
     755              :     /// remote_consistent_lsn from the perspective of the tenant's current generation,
     756              :     /// not validated with control plane yet.
     757              :     /// See [`Self::get_remote_consistent_lsn_visible`].
     758         3042 :     pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
     759         3042 :         if let Some(remote_client) = &self.remote_client {
     760         3042 :             remote_client.remote_consistent_lsn_projected()
     761              :         } else {
     762            0 :             None
     763              :         }
     764         3042 :     }
     765              : 
     766              :     /// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
     767              :     /// i.e. a value of remote_consistent_lsn_projected which has undergone
     768              :     /// generation validation in the deletion queue.
     769       798512 :     pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
     770       798512 :         if let Some(remote_client) = &self.remote_client {
     771       798512 :             remote_client.remote_consistent_lsn_visible()
     772              :         } else {
     773            0 :             None
     774              :         }
     775       798512 :     }
     776              : 
     777              :     /// The sum of the file size of all historic layers in the layer map.
     778              :     /// This method makes no distinction between local and remote layers.
     779              :     /// Hence, the result **does not represent local filesystem usage**.
     780         3513 :     pub(crate) async fn layer_size_sum(&self) -> u64 {
     781         3513 :         let guard = self.layers.read().await;
     782         3513 :         let layer_map = guard.layer_map();
     783         3513 :         let mut size = 0;
     784       336931 :         for l in layer_map.iter_historic_layers() {
     785       336931 :             size += l.file_size();
     786       336931 :         }
     787         3513 :         size
     788         3513 :     }
     789              : 
     790           15 :     pub(crate) fn resident_physical_size(&self) -> u64 {
     791           15 :         self.metrics.resident_physical_size_get()
     792           15 :     }
     793              : 
     794              :     ///
     795              :     /// Wait until WAL has been received and processed up to this LSN.
     796              :     ///
     797              :     /// You should call this before any of the other get_* or list_* functions. Calling
     798              :     /// those functions with an LSN that has been processed yet is an error.
     799              :     ///
     800      1358246 :     pub(crate) async fn wait_lsn(
     801      1358246 :         &self,
     802      1358246 :         lsn: Lsn,
     803      1358246 :         _ctx: &RequestContext, /* Prepare for use by cancellation */
     804      1358246 :     ) -> Result<(), WaitLsnError> {
     805      1358246 :         if self.cancel.is_cancelled() {
     806            0 :             return Err(WaitLsnError::Shutdown);
     807      1358246 :         } else if !self.is_active() {
     808            0 :             return Err(WaitLsnError::BadState);
     809      1358246 :         }
     810              : 
     811              :         // This should never be called from the WAL receiver, because that could lead
     812              :         // to a deadlock.
     813              :         debug_assert!(
     814      1358246 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
     815            0 :             "wait_lsn cannot be called in WAL receiver"
     816              :         );
     817              :         debug_assert!(
     818      1358246 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
     819            0 :             "wait_lsn cannot be called in WAL receiver"
     820              :         );
     821              :         debug_assert!(
     822      1358246 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
     823            0 :             "wait_lsn cannot be called in WAL receiver"
     824              :         );
     825              : 
     826      1358246 :         let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
     827      1358246 : 
     828      1358246 :         match self
     829      1358246 :             .last_record_lsn
     830      1358246 :             .wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
     831       112555 :             .await
     832              :         {
     833      1358221 :             Ok(()) => Ok(()),
     834           24 :             Err(e) => {
     835           24 :                 use utils::seqwait::SeqWaitError::*;
     836           24 :                 match e {
     837            0 :                     Shutdown => Err(WaitLsnError::Shutdown),
     838              :                     Timeout => {
     839              :                         // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
     840           24 :                         drop(_timer);
     841           24 :                         let walreceiver_status = self.walreceiver_status();
     842           24 :                         Err(WaitLsnError::Timeout(format!(
     843           24 :                         "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
     844           24 :                         lsn,
     845           24 :                         self.get_last_record_lsn(),
     846           24 :                         self.get_disk_consistent_lsn(),
     847           24 :                         walreceiver_status,
     848           24 :                     )))
     849              :                     }
     850              :                 }
     851              :             }
     852              :         }
     853      1358245 :     }
     854              : 
     855         3066 :     pub(crate) fn walreceiver_status(&self) -> String {
     856         3066 :         match &*self.walreceiver.lock().unwrap() {
     857          113 :             None => "stopping or stopped".to_string(),
     858         2953 :             Some(walreceiver) => match walreceiver.status() {
     859         1794 :                 Some(status) => status.to_human_readable_string(),
     860         1159 :                 None => "Not active".to_string(),
     861              :             },
     862              :         }
     863         3066 :     }
     864              : 
     865              :     /// Check that it is valid to request operations with that lsn.
     866          672 :     pub(crate) fn check_lsn_is_in_scope(
     867          672 :         &self,
     868          672 :         lsn: Lsn,
     869          672 :         latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
     870          672 :     ) -> anyhow::Result<()> {
     871          672 :         ensure!(
     872          672 :             lsn >= **latest_gc_cutoff_lsn,
     873           16 :             "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
     874           16 :             lsn,
     875           16 :             **latest_gc_cutoff_lsn,
     876              :         );
     877          656 :         Ok(())
     878          672 :     }
     879              : 
     880              :     /// Flush to disk all data that was written with the put_* functions
     881         1801 :     #[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
     882              :     pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
     883              :         self.freeze_inmem_layer(false).await;
     884              :         self.flush_frozen_layers_and_wait().await
     885              :     }
     886              : 
     887              :     /// Outermost timeline compaction operation; downloads needed layers.
     888         1523 :     pub(crate) async fn compact(
     889         1523 :         self: &Arc<Self>,
     890         1523 :         cancel: &CancellationToken,
     891         1523 :         flags: EnumSet<CompactFlags>,
     892         1523 :         ctx: &RequestContext,
     893         1523 :     ) -> Result<(), CompactionError> {
     894         1523 :         // most likely the cancellation token is from background task, but in tests it could be the
     895         1523 :         // request task as well.
     896         1523 : 
     897         1523 :         let prepare = async move {
     898         1523 :             let guard = self.compaction_lock.lock().await;
     899              : 
     900         1523 :             let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
     901         1523 :                 BackgroundLoopKind::Compaction,
     902         1523 :                 ctx,
     903         1523 :             )
     904            0 :             .await;
     905              : 
     906         1523 :             (guard, permit)
     907         1523 :         };
     908              : 
     909              :         // this wait probably never needs any "long time spent" logging, because we already nag if
     910              :         // compaction task goes over it's period (20s) which is quite often in production.
     911         1523 :         let (_guard, _permit) = tokio::select! {
     912         1523 :             tuple = prepare => { tuple },
     913              :             _ = self.cancel.cancelled() => return Ok(()),
     914              :             _ = cancel.cancelled() => return Ok(()),
     915              :         };
     916              : 
     917         1523 :         let last_record_lsn = self.get_last_record_lsn();
     918         1523 : 
     919         1523 :         // Last record Lsn could be zero in case the timeline was just created
     920         1523 :         if !last_record_lsn.is_valid() {
     921            0 :             warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
     922            0 :             return Ok(());
     923         1523 :         }
     924         1523 : 
     925         1523 :         // High level strategy for compaction / image creation:
     926         1523 :         //
     927         1523 :         // 1. First, calculate the desired "partitioning" of the
     928         1523 :         // currently in-use key space. The goal is to partition the
     929         1523 :         // key space into roughly fixed-size chunks, but also take into
     930         1523 :         // account any existing image layers, and try to align the
     931         1523 :         // chunk boundaries with the existing image layers to avoid
     932         1523 :         // too much churn. Also try to align chunk boundaries with
     933         1523 :         // relation boundaries.  In principle, we don't know about
     934         1523 :         // relation boundaries here, we just deal with key-value
     935         1523 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
     936         1523 :         // map relations into key-value pairs. But in practice we know
     937         1523 :         // that 'field6' is the block number, and the fields 1-5
     938         1523 :         // identify a relation. This is just an optimization,
     939         1523 :         // though.
     940         1523 :         //
     941         1523 :         // 2. Once we know the partitioning, for each partition,
     942         1523 :         // decide if it's time to create a new image layer. The
     943         1523 :         // criteria is: there has been too much "churn" since the last
     944         1523 :         // image layer? The "churn" is fuzzy concept, it's a
     945         1523 :         // combination of too many delta files, or too much WAL in
     946         1523 :         // total in the delta file. Or perhaps: if creating an image
     947         1523 :         // file would allow to delete some older files.
     948         1523 :         //
     949         1523 :         // 3. After that, we compact all level0 delta files if there
     950         1523 :         // are too many of them.  While compacting, we also garbage
     951         1523 :         // collect any page versions that are no longer needed because
     952         1523 :         // of the new image layers we created in step 2.
     953         1523 :         //
     954         1523 :         // TODO: This high level strategy hasn't been implemented yet.
     955         1523 :         // Below are functions compact_level0() and create_image_layers()
     956         1523 :         // but they are a bit ad hoc and don't quite work like it's explained
     957         1523 :         // above. Rewrite it.
     958         1523 : 
     959         1523 :         // Is the timeline being deleted?
     960         1523 :         if self.is_stopping() {
     961            0 :             trace!("Dropping out of compaction on timeline shutdown");
     962            0 :             return Err(CompactionError::ShuttingDown);
     963         1523 :         }
     964         1523 : 
     965         1523 :         let target_file_size = self.get_checkpoint_distance();
     966         1523 : 
     967         1523 :         // Define partitioning schema if needed
     968         1523 : 
     969         1523 :         // FIXME: the match should only cover repartitioning, not the next steps
     970         1523 :         match self
     971         1523 :             .repartition(
     972         1523 :                 self.get_last_record_lsn(),
     973         1523 :                 self.get_compaction_target_size(),
     974         1523 :                 flags,
     975         1523 :                 ctx,
     976         1523 :             )
     977       213034 :             .await
     978              :         {
     979         1519 :             Ok((partitioning, lsn)) => {
     980         1519 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     981         1519 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     982         1519 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     983         1519 :                     .build();
     984         1519 : 
     985         1519 :                 // 2. Compact
     986         1519 :                 let timer = self.metrics.compact_time_histo.start_timer();
     987       331578 :                 self.compact_level0(target_file_size, ctx).await?;
     988         1518 :                 timer.stop_and_record();
     989              : 
     990              :                 // 3. Create new image layers for partitions that have been modified
     991              :                 // "enough".
     992         1518 :                 let layers = self
     993         1518 :                     .create_image_layers(&partitioning, lsn, false, &image_ctx)
     994        60779 :                     .await
     995         1517 :                     .map_err(anyhow::Error::from)?;
     996         1517 :                 if let Some(remote_client) = &self.remote_client {
     997         7466 :                     for layer in layers {
     998         5949 :                         remote_client.schedule_layer_file_upload(layer)?;
     999              :                     }
    1000            0 :                 }
    1001              : 
    1002         1517 :                 if let Some(remote_client) = &self.remote_client {
    1003              :                     // should any new image layer been created, not uploading index_part will
    1004              :                     // result in a mismatch between remote_physical_size and layermap calculated
    1005              :                     // size, which will fail some tests, but should not be an issue otherwise.
    1006         1517 :                     remote_client.schedule_index_upload_for_file_changes()?;
    1007            0 :                 }
    1008              :             }
    1009            3 :             Err(err) => {
    1010            3 :                 // no partitioning? This is normal, if the timeline was just created
    1011            3 :                 // as an empty timeline. Also in unit tests, when we use the timeline
    1012            3 :                 // as a simple key-value store, ignoring the datadir layout. Log the
    1013            3 :                 // error but continue.
    1014            3 :                 //
    1015            3 :                 // Suppress error when it's due to cancellation
    1016            3 :                 if !self.cancel.is_cancelled() {
    1017            1 :                     error!("could not compact, repartitioning keyspace failed: {err:?}");
    1018            2 :                 }
    1019              :             }
    1020              :         };
    1021              : 
    1022         1520 :         Ok(())
    1023         1520 :     }
    1024              : 
    1025              :     /// Mutate the timeline with a [`TimelineWriter`].
    1026      3355115 :     pub(crate) async fn writer(&self) -> TimelineWriter<'_> {
    1027      3355115 :         TimelineWriter {
    1028      3355115 :             tl: self,
    1029      3355115 :             _write_guard: self.write_lock.lock().await,
    1030              :         }
    1031      3355115 :     }
    1032              : 
    1033              :     /// Check if more than 'checkpoint_distance' of WAL has been accumulated in
    1034              :     /// the in-memory layer, and initiate flushing it if so.
    1035              :     ///
    1036              :     /// Also flush after a period of time without new data -- it helps
    1037              :     /// safekeepers to regard pageserver as caught up and suspend activity.
    1038      1401451 :     pub(crate) async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
    1039      1401451 :         let last_lsn = self.get_last_record_lsn();
    1040      1400622 :         let open_layer_size = {
    1041      1401451 :             let guard = self.layers.read().await;
    1042      1401451 :             let layers = guard.layer_map();
    1043      1401451 :             let Some(open_layer) = layers.open_layer.as_ref() else {
    1044          829 :                 return Ok(());
    1045              :             };
    1046      1400622 :             open_layer.size().await?
    1047              :         };
    1048      1400622 :         let last_freeze_at = self.last_freeze_at.load();
    1049      1400622 :         let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
    1050      1400622 :         let distance = last_lsn.widening_sub(last_freeze_at);
    1051      1400622 :         // Checkpointing the open layer can be triggered by layer size or LSN range.
    1052      1400622 :         // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
    1053      1400622 :         // we want to stay below that with a big margin.  The LSN distance determines how
    1054      1400622 :         // much WAL the safekeepers need to store.
    1055      1400622 :         if distance >= self.get_checkpoint_distance().into()
    1056      1399838 :             || open_layer_size > self.get_checkpoint_distance()
    1057      1396934 :             || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
    1058              :         {
    1059         3688 :             info!(
    1060         3688 :                 "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
    1061         3688 :                 distance,
    1062         3688 :                 open_layer_size,
    1063         3688 :                 last_freeze_ts.elapsed()
    1064         3688 :             );
    1065              : 
    1066         3688 :             self.freeze_inmem_layer(true).await;
    1067         3688 :             self.last_freeze_at.store(last_lsn);
    1068         3688 :             *(self.last_freeze_ts.write().unwrap()) = Instant::now();
    1069         3688 : 
    1070         3688 :             // Wake up the layer flusher
    1071         3688 :             self.flush_frozen_layers();
    1072      1396934 :         }
    1073      1400622 :         Ok(())
    1074      1401451 :     }
    1075              : 
    1076         1235 :     pub(crate) fn activate(
    1077         1235 :         self: &Arc<Self>,
    1078         1235 :         broker_client: BrokerClientChannel,
    1079         1235 :         background_jobs_can_start: Option<&completion::Barrier>,
    1080         1235 :         ctx: &RequestContext,
    1081         1235 :     ) {
    1082         1235 :         if self.tenant_shard_id.is_zero() {
    1083         1205 :             // Logical size is only maintained accurately on shard zero.
    1084         1205 :             self.spawn_initial_logical_size_computation_task(ctx);
    1085         1205 :         }
    1086         1235 :         self.launch_wal_receiver(ctx, broker_client);
    1087         1235 :         self.set_state(TimelineState::Active);
    1088         1235 :         self.launch_eviction_task(background_jobs_can_start);
    1089         1235 :     }
    1090              : 
    1091              :     /// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
    1092              :     /// also to remote storage.  This method can easily take multiple seconds for a busy timeline.
    1093              :     ///
    1094              :     /// While we are flushing, we continue to accept read I/O.
    1095          249 :     pub(crate) async fn flush_and_shutdown(&self) {
    1096          249 :         debug_assert_current_span_has_tenant_and_timeline_id();
    1097              : 
    1098              :         // Stop ingesting data, so that we are not still writing to an InMemoryLayer while
    1099              :         // trying to flush
    1100            0 :         tracing::debug!("Waiting for WalReceiverManager...");
    1101          249 :         task_mgr::shutdown_tasks(
    1102          249 :             Some(TaskKind::WalReceiverManager),
    1103          249 :             Some(self.tenant_shard_id),
    1104          249 :             Some(self.timeline_id),
    1105          249 :         )
    1106          133 :         .await;
    1107              : 
    1108              :         // Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
    1109          249 :         self.last_record_lsn.shutdown();
    1110          249 : 
    1111          249 :         // now all writers to InMemory layer are gone, do the final flush if requested
    1112          249 :         match self.freeze_and_flush().await {
    1113              :             Ok(_) => {
    1114              :                 // drain the upload queue
    1115          205 :                 if let Some(client) = self.remote_client.as_ref() {
    1116              :                     // if we did not wait for completion here, it might be our shutdown process
    1117              :                     // didn't wait for remote uploads to complete at all, as new tasks can forever
    1118              :                     // be spawned.
    1119              :                     //
    1120              :                     // what is problematic is the shutting down of RemoteTimelineClient, because
    1121              :                     // obviously it does not make sense to stop while we wait for it, but what
    1122              :                     // about corner cases like s3 suddenly hanging up?
    1123          205 :                     if let Err(e) = client.shutdown().await {
    1124              :                         // Non-fatal.  Shutdown is infallible.  Failures to flush just mean that
    1125              :                         // we have some extra WAL replay to do next time the timeline starts.
    1126            0 :                         warn!("failed to flush to remote storage: {e:#}");
    1127          205 :                     }
    1128            0 :                 }
    1129              :             }
    1130           44 :             Err(e) => {
    1131              :                 // Non-fatal.  Shutdown is infallible.  Failures to flush just mean that
    1132              :                 // we have some extra WAL replay to do next time the timeline starts.
    1133           44 :                 warn!("failed to freeze and flush: {e:#}");
    1134              :             }
    1135              :         }
    1136              : 
    1137          422 :         self.shutdown().await;
    1138          249 :     }
    1139              : 
    1140              :     /// Shut down immediately, without waiting for any open layers to flush to disk.  This is a subset of
    1141              :     /// the graceful [`Timeline::flush_and_shutdown`] function.
    1142          577 :     pub(crate) async fn shutdown(&self) {
    1143          577 :         debug_assert_current_span_has_tenant_and_timeline_id();
    1144              : 
    1145              :         // Signal any subscribers to our cancellation token to drop out
    1146            0 :         tracing::debug!("Cancelling CancellationToken");
    1147          577 :         self.cancel.cancel();
    1148          577 : 
    1149          577 :         // Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
    1150          577 :         // while doing so.
    1151          577 :         self.last_record_lsn.shutdown();
    1152          577 : 
    1153          577 :         // Shut down the layer flush task before the remote client, as one depends on the other
    1154          577 :         task_mgr::shutdown_tasks(
    1155          577 :             Some(TaskKind::LayerFlushTask),
    1156          577 :             Some(self.tenant_shard_id),
    1157          577 :             Some(self.timeline_id),
    1158          577 :         )
    1159          491 :         .await;
    1160              : 
    1161              :         // Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
    1162              :         // case our caller wants to use that for a deletion
    1163          577 :         if let Some(remote_client) = self.remote_client.as_ref() {
    1164          577 :             match remote_client.stop() {
    1165          577 :                 Ok(()) => {}
    1166            0 :                 Err(StopError::QueueUninitialized) => {
    1167            0 :                     // Shutting down during initialization is legal
    1168            0 :                 }
    1169              :             }
    1170            0 :         }
    1171              : 
    1172            0 :         tracing::debug!("Waiting for tasks...");
    1173              : 
    1174          701 :         task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;
    1175              : 
    1176              :         // Finally wait until any gate-holders are complete
    1177          577 :         self.gate.close().await;
    1178          577 :     }
    1179              : 
    1180         2243 :     pub(crate) fn set_state(&self, new_state: TimelineState) {
    1181         2243 :         match (self.current_state(), new_state) {
    1182         2243 :             (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
    1183          126 :                 info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
    1184              :             }
    1185            0 :             (st, TimelineState::Loading) => {
    1186            0 :                 error!("ignoring transition from {st:?} into Loading state");
    1187              :             }
    1188            7 :             (TimelineState::Broken { .. }, new_state) => {
    1189            7 :                 error!("Ignoring state update {new_state:?} for broken timeline");
    1190              :             }
    1191              :             (TimelineState::Stopping, TimelineState::Active) => {
    1192            0 :                 error!("Not activating a Stopping timeline");
    1193              :             }
    1194         2110 :             (_, new_state) => {
    1195         2110 :                 self.state.send_replace(new_state);
    1196         2110 :             }
    1197              :         }
    1198         2243 :     }
    1199              : 
    1200           19 :     pub(crate) fn set_broken(&self, reason: String) {
    1201           19 :         let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
    1202           19 :         let broken_state = TimelineState::Broken {
    1203           19 :             reason,
    1204           19 :             backtrace: backtrace_str,
    1205           19 :         };
    1206           19 :         self.set_state(broken_state);
    1207           19 : 
    1208           19 :         // Although the Broken state is not equivalent to shutdown() (shutdown will be called
    1209           19 :         // later when this tenant is detach or the process shuts down), firing the cancellation token
    1210           19 :         // here avoids the need for other tasks to watch for the Broken state explicitly.
    1211           19 :         self.cancel.cancel();
    1212           19 :     }
    1213              : 
    1214      2108734 :     pub(crate) fn current_state(&self) -> TimelineState {
    1215      2108734 :         self.state.borrow().clone()
    1216      2108734 :     }
    1217              : 
    1218          908 :     pub(crate) fn is_broken(&self) -> bool {
    1219          908 :         matches!(&*self.state.borrow(), TimelineState::Broken { .. })
    1220          908 :     }
    1221              : 
    1222      1373205 :     pub(crate) fn is_active(&self) -> bool {
    1223      1373205 :         self.current_state() == TimelineState::Active
    1224      1373205 :     }
    1225              : 
    1226         2638 :     pub(crate) fn is_stopping(&self) -> bool {
    1227         2638 :         self.current_state() == TimelineState::Stopping
    1228         2638 :     }
    1229              : 
    1230         1233 :     pub(crate) fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
    1231         1233 :         self.state.subscribe()
    1232         1233 :     }
    1233              : 
    1234      1299949 :     pub(crate) async fn wait_to_become_active(
    1235      1299949 :         &self,
    1236      1299949 :         _ctx: &RequestContext, // Prepare for use by cancellation
    1237      1299949 :     ) -> Result<(), TimelineState> {
    1238      1299949 :         let mut receiver = self.state.subscribe();
    1239      1299974 :         loop {
    1240      1299974 :             let current_state = receiver.borrow().clone();
    1241      1299974 :             match current_state {
    1242              :                 TimelineState::Loading => {
    1243           25 :                     receiver
    1244           25 :                         .changed()
    1245           25 :                         .await
    1246           25 :                         .expect("holding a reference to self");
    1247              :                 }
    1248              :                 TimelineState::Active { .. } => {
    1249      1299943 :                     return Ok(());
    1250              :                 }
    1251              :                 TimelineState::Broken { .. } | TimelineState::Stopping => {
    1252              :                     // There's no chance the timeline can transition back into ::Active
    1253            6 :                     return Err(current_state);
    1254              :                 }
    1255              :             }
    1256              :         }
    1257      1299949 :     }
    1258              : 
    1259           96 :     pub(crate) async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
    1260           96 :         let guard = self.layers.read().await;
    1261           96 :         let layer_map = guard.layer_map();
    1262           96 :         let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
    1263           96 :         if let Some(open_layer) = &layer_map.open_layer {
    1264            4 :             in_memory_layers.push(open_layer.info());
    1265           92 :         }
    1266           96 :         for frozen_layer in &layer_map.frozen_layers {
    1267            0 :             in_memory_layers.push(frozen_layer.info());
    1268            0 :         }
    1269              : 
    1270           96 :         let mut historic_layers = Vec::new();
    1271         3014 :         for historic_layer in layer_map.iter_historic_layers() {
    1272         3014 :             let historic_layer = guard.get_from_desc(&historic_layer);
    1273         3014 :             historic_layers.push(historic_layer.info(reset).await);
    1274              :         }
    1275              : 
    1276           96 :         LayerMapInfo {
    1277           96 :             in_memory_layers,
    1278           96 :             historic_layers,
    1279           96 :         }
    1280           96 :     }
    1281              : 
    1282            2 :     #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
    1283              :     pub(crate) async fn download_layer(
    1284              :         &self,
    1285              :         layer_file_name: &str,
    1286              :     ) -> anyhow::Result<Option<bool>> {
    1287              :         let Some(layer) = self.find_layer(layer_file_name).await else {
    1288              :             return Ok(None);
    1289              :         };
    1290              : 
    1291              :         if self.remote_client.is_none() {
    1292              :             return Ok(Some(false));
    1293              :         }
    1294              : 
    1295              :         layer.download().await?;
    1296              : 
    1297              :         Ok(Some(true))
    1298              :     }
    1299              : 
    1300              :     /// Evict just one layer.
    1301              :     ///
    1302              :     /// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`.
    1303         2252 :     pub(crate) async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
    1304         2252 :         let _gate = self
    1305         2252 :             .gate
    1306         2252 :             .enter()
    1307         2252 :             .map_err(|_| anyhow::anyhow!("Shutting down"))?;
    1308              : 
    1309         2252 :         let Some(local_layer) = self.find_layer(layer_file_name).await else {
    1310            0 :             return Ok(None);
    1311              :         };
    1312              : 
    1313         2252 :         match local_layer.evict_and_wait().await {
    1314         2252 :             Ok(()) => Ok(Some(true)),
    1315            0 :             Err(EvictionError::NotFound) => Ok(Some(false)),
    1316            0 :             Err(EvictionError::Downloaded) => Ok(Some(false)),
    1317              :         }
    1318         2252 :     }
    1319              : }
    1320              : 
    1321              : /// Number of times we will compute partition within a checkpoint distance.
    1322              : const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
    1323              : 
    1324              : // Private functions
    1325              : impl Timeline {
    1326          609 :     pub(crate) fn get_lazy_slru_download(&self) -> bool {
    1327          609 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1328          609 :         tenant_conf
    1329          609 :             .lazy_slru_download
    1330          609 :             .unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
    1331          609 :     }
    1332              : 
    1333      2803551 :     fn get_checkpoint_distance(&self) -> u64 {
    1334      2803551 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1335      2803551 :         tenant_conf
    1336      2803551 :             .checkpoint_distance
    1337      2803551 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
    1338      2803551 :     }
    1339              : 
    1340      1396934 :     fn get_checkpoint_timeout(&self) -> Duration {
    1341      1396934 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1342      1396934 :         tenant_conf
    1343      1396934 :             .checkpoint_timeout
    1344      1396934 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
    1345      1396934 :     }
    1346              : 
    1347         1603 :     fn get_compaction_target_size(&self) -> u64 {
    1348         1603 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1349         1603 :         tenant_conf
    1350         1603 :             .compaction_target_size
    1351         1603 :             .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
    1352         1603 :     }
    1353              : 
    1354         1519 :     fn get_compaction_threshold(&self) -> usize {
    1355         1519 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1356         1519 :         tenant_conf
    1357         1519 :             .compaction_threshold
    1358         1519 :             .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
    1359         1519 :     }
    1360              : 
    1361        37180 :     fn get_image_creation_threshold(&self) -> usize {
    1362        37180 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1363        37180 :         tenant_conf
    1364        37180 :             .image_creation_threshold
    1365        37180 :             .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
    1366        37180 :     }
    1367              : 
    1368         2588 :     fn get_eviction_policy(&self) -> EvictionPolicy {
    1369         2588 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1370         2588 :         tenant_conf
    1371         2588 :             .eviction_policy
    1372         2588 :             .unwrap_or(self.conf.default_tenant_conf.eviction_policy)
    1373         2588 :     }
    1374              : 
    1375         1621 :     fn get_evictions_low_residence_duration_metric_threshold(
    1376         1621 :         tenant_conf: &TenantConfOpt,
    1377         1621 :         default_tenant_conf: &TenantConf,
    1378         1621 :     ) -> Duration {
    1379         1621 :         tenant_conf
    1380         1621 :             .evictions_low_residence_duration_metric_threshold
    1381         1621 :             .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
    1382         1621 :     }
    1383              : 
    1384        13566 :     fn get_gc_feedback(&self) -> bool {
    1385        13566 :         let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
    1386        13566 :         tenant_conf
    1387        13566 :             .gc_feedback
    1388        13566 :             .unwrap_or(self.conf.default_tenant_conf.gc_feedback)
    1389        13566 :     }
    1390              : 
    1391           53 :     pub(super) fn tenant_conf_updated(&self) {
    1392           53 :         // NB: Most tenant conf options are read by background loops, so,
    1393           53 :         // changes will automatically be picked up.
    1394           53 : 
    1395           53 :         // The threshold is embedded in the metric. So, we need to update it.
    1396           53 :         {
    1397           53 :             let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
    1398           53 :                 &self.tenant_conf.read().unwrap().tenant_conf,
    1399           53 :                 &self.conf.default_tenant_conf,
    1400           53 :             );
    1401           53 : 
    1402           53 :             let tenant_id_str = self.tenant_shard_id.tenant_id.to_string();
    1403           53 :             let shard_id_str = format!("{}", self.tenant_shard_id.shard_slug());
    1404           53 : 
    1405           53 :             let timeline_id_str = self.timeline_id.to_string();
    1406           53 :             self.metrics
    1407           53 :                 .evictions_with_low_residence_duration
    1408           53 :                 .write()
    1409           53 :                 .unwrap()
    1410           53 :                 .change_threshold(
    1411           53 :                     &tenant_id_str,
    1412           53 :                     &shard_id_str,
    1413           53 :                     &timeline_id_str,
    1414           53 :                     new_threshold,
    1415           53 :                 );
    1416           53 :         }
    1417           53 :     }
    1418              : 
    1419              :     /// Open a Timeline handle.
    1420              :     ///
    1421              :     /// Loads the metadata for the timeline into memory, but not the layer map.
    1422              :     #[allow(clippy::too_many_arguments)]
    1423         1568 :     pub(super) fn new(
    1424         1568 :         conf: &'static PageServerConf,
    1425         1568 :         tenant_conf: Arc<RwLock<AttachedTenantConf>>,
    1426         1568 :         metadata: &TimelineMetadata,
    1427         1568 :         ancestor: Option<Arc<Timeline>>,
    1428         1568 :         timeline_id: TimelineId,
    1429         1568 :         tenant_shard_id: TenantShardId,
    1430         1568 :         generation: Generation,
    1431         1568 :         shard_identity: ShardIdentity,
    1432         1568 :         walredo_mgr: Option<Arc<super::WalRedoManager>>,
    1433         1568 :         resources: TimelineResources,
    1434         1568 :         pg_version: u32,
    1435         1568 :         state: TimelineState,
    1436         1568 :         cancel: CancellationToken,
    1437         1568 :     ) -> Arc<Self> {
    1438         1568 :         let disk_consistent_lsn = metadata.disk_consistent_lsn();
    1439         1568 :         let (state, _) = watch::channel(state);
    1440         1568 : 
    1441         1568 :         let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
    1442         1568 :         let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
    1443         1568 : 
    1444         1568 :         let tenant_conf_guard = tenant_conf.read().unwrap();
    1445         1568 : 
    1446         1568 :         let evictions_low_residence_duration_metric_threshold =
    1447         1568 :             Self::get_evictions_low_residence_duration_metric_threshold(
    1448         1568 :                 &tenant_conf_guard.tenant_conf,
    1449         1568 :                 &conf.default_tenant_conf,
    1450         1568 :             );
    1451         1568 :         drop(tenant_conf_guard);
    1452         1568 : 
    1453         1568 :         Arc::new_cyclic(|myself| {
    1454         1568 :             let mut result = Timeline {
    1455         1568 :                 conf,
    1456         1568 :                 tenant_conf,
    1457         1568 :                 myself: myself.clone(),
    1458         1568 :                 timeline_id,
    1459         1568 :                 tenant_shard_id,
    1460         1568 :                 generation,
    1461         1568 :                 shard_identity,
    1462         1568 :                 pg_version,
    1463         1568 :                 layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())),
    1464         1568 :                 wanted_image_layers: Mutex::new(None),
    1465         1568 : 
    1466         1568 :                 walredo_mgr,
    1467         1568 :                 walreceiver: Mutex::new(None),
    1468         1568 : 
    1469         1568 :                 remote_client: resources.remote_client.map(Arc::new),
    1470         1568 : 
    1471         1568 :                 // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
    1472         1568 :                 last_record_lsn: SeqWait::new(RecordLsn {
    1473         1568 :                     last: disk_consistent_lsn,
    1474         1568 :                     prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
    1475         1568 :                 }),
    1476         1568 :                 disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
    1477         1568 : 
    1478         1568 :                 last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
    1479         1568 :                 last_freeze_ts: RwLock::new(Instant::now()),
    1480         1568 : 
    1481         1568 :                 loaded_at: (disk_consistent_lsn, SystemTime::now()),
    1482         1568 : 
    1483         1568 :                 ancestor_timeline: ancestor,
    1484         1568 :                 ancestor_lsn: metadata.ancestor_lsn(),
    1485         1568 : 
    1486         1568 :                 metrics: TimelineMetrics::new(
    1487         1568 :                     &tenant_shard_id,
    1488         1568 :                     &timeline_id,
    1489         1568 :                     crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
    1490         1568 :                         "mtime",
    1491         1568 :                         evictions_low_residence_duration_metric_threshold,
    1492         1568 :                     ),
    1493         1568 :                 ),
    1494         1568 : 
    1495         1568 :                 query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new(
    1496         1568 :                     &tenant_shard_id,
    1497         1568 :                     &timeline_id,
    1498         1568 :                 ),
    1499         1568 : 
    1500         1568 :                 flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
    1501         1568 : 
    1502         1568 :                 layer_flush_start_tx,
    1503         1568 :                 layer_flush_done_tx,
    1504         1568 : 
    1505         1568 :                 write_lock: tokio::sync::Mutex::new(()),
    1506         1568 : 
    1507         1568 :                 gc_info: std::sync::RwLock::new(GcInfo {
    1508         1568 :                     retain_lsns: Vec::new(),
    1509         1568 :                     horizon_cutoff: Lsn(0),
    1510         1568 :                     pitr_cutoff: Lsn(0),
    1511         1568 :                 }),
    1512         1568 : 
    1513         1568 :                 latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
    1514         1568 :                 initdb_lsn: metadata.initdb_lsn(),
    1515         1568 : 
    1516         1568 :                 current_logical_size: if disk_consistent_lsn.is_valid() {
    1517              :                     // we're creating timeline data with some layer files existing locally,
    1518              :                     // need to recalculate timeline's logical size based on data in the layers.
    1519          890 :                     LogicalSize::deferred_initial(disk_consistent_lsn)
    1520              :                 } else {
    1521              :                     // we're creating timeline data without any layers existing locally,
    1522              :                     // initial logical size is 0.
    1523          678 :                     LogicalSize::empty_initial()
    1524              :                 },
    1525         1568 :                 partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
    1526         1568 :                 repartition_threshold: 0,
    1527         1568 : 
    1528         1568 :                 last_received_wal: Mutex::new(None),
    1529         1568 :                 rel_size_cache: RwLock::new(HashMap::new()),
    1530         1568 : 
    1531         1568 :                 download_all_remote_layers_task_info: RwLock::new(None),
    1532         1568 : 
    1533         1568 :                 state,
    1534         1568 : 
    1535         1568 :                 eviction_task_timeline_state: tokio::sync::Mutex::new(
    1536         1568 :                     EvictionTaskTimelineState::default(),
    1537         1568 :                 ),
    1538         1568 :                 delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
    1539         1568 : 
    1540         1568 :                 cancel,
    1541         1568 :                 gate: Gate::default(),
    1542         1568 : 
    1543         1568 :                 compaction_lock: tokio::sync::Mutex::default(),
    1544         1568 :                 gc_lock: tokio::sync::Mutex::default(),
    1545         1568 :                 standby_horizon: AtomicLsn::new(0),
    1546         1568 :             };
    1547         1568 :             result.repartition_threshold =
    1548         1568 :                 result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
    1549         1568 :             result
    1550         1568 :                 .metrics
    1551         1568 :                 .last_record_gauge
    1552         1568 :                 .set(disk_consistent_lsn.0 as i64);
    1553         1568 :             result
    1554         1568 :         })
    1555         1568 :     }
    1556              : 
    1557         2218 :     pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
    1558         2218 :         let Ok(guard) = self.gate.enter() else {
    1559            0 :             info!("cannot start flush loop when the timeline gate has already been closed");
    1560            0 :             return;
    1561              :         };
    1562         2218 :         let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
    1563         2218 :         match *flush_loop_state {
    1564         1551 :             FlushLoopState::NotStarted => (),
    1565              :             FlushLoopState::Running { .. } => {
    1566          667 :                 info!(
    1567          667 :                     "skipping attempt to start flush_loop twice {}/{}",
    1568          667 :                     self.tenant_shard_id, self.timeline_id
    1569          667 :                 );
    1570          667 :                 return;
    1571              :             }
    1572              :             FlushLoopState::Exited => {
    1573            0 :                 warn!(
    1574            0 :                     "ignoring attempt to restart exited flush_loop {}/{}",
    1575            0 :                     self.tenant_shard_id, self.timeline_id
    1576            0 :                 );
    1577            0 :                 return;
    1578              :             }
    1579              :         }
    1580              : 
    1581         1551 :         let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
    1582         1551 :         let self_clone = Arc::clone(self);
    1583         1551 : 
    1584         1551 :         debug!("spawning flush loop");
    1585         1551 :         *flush_loop_state = FlushLoopState::Running {
    1586         1551 :             #[cfg(test)]
    1587         1551 :             expect_initdb_optimization: false,
    1588         1551 :             #[cfg(test)]
    1589         1551 :             initdb_optimization_count: 0,
    1590         1551 :         };
    1591         1551 :         task_mgr::spawn(
    1592         1551 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    1593         1551 :             task_mgr::TaskKind::LayerFlushTask,
    1594         1551 :             Some(self.tenant_shard_id),
    1595         1551 :             Some(self.timeline_id),
    1596         1551 :             "layer flush task",
    1597              :             false,
    1598         1551 :             async move {
    1599         1551 :                 let _guard = guard;
    1600         1551 :                 let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
    1601        27756 :                 self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
    1602          576 :                 let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
    1603          576 :                 assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
    1604          576 :                 *flush_loop_state  = FlushLoopState::Exited;
    1605          576 :                 Ok(())
    1606          576 :             }
    1607         1551 :             .instrument(info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
    1608              :         );
    1609         2218 :     }
    1610              : 
    1611              :     /// Creates and starts the wal receiver.
    1612              :     ///
    1613              :     /// This function is expected to be called at most once per Timeline's lifecycle
    1614              :     /// when the timeline is activated.
    1615         1235 :     fn launch_wal_receiver(
    1616         1235 :         self: &Arc<Self>,
    1617         1235 :         ctx: &RequestContext,
    1618         1235 :         broker_client: BrokerClientChannel,
    1619         1235 :     ) {
    1620         1235 :         info!(
    1621         1235 :             "launching WAL receiver for timeline {} of tenant {}",
    1622         1235 :             self.timeline_id, self.tenant_shard_id
    1623         1235 :         );
    1624              : 
    1625         1235 :         let tenant_conf_guard = self.tenant_conf.read().unwrap();
    1626         1235 :         let wal_connect_timeout = tenant_conf_guard
    1627         1235 :             .tenant_conf
    1628         1235 :             .walreceiver_connect_timeout
    1629         1235 :             .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
    1630         1235 :         let lagging_wal_timeout = tenant_conf_guard
    1631         1235 :             .tenant_conf
    1632         1235 :             .lagging_wal_timeout
    1633         1235 :             .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
    1634         1235 :         let max_lsn_wal_lag = tenant_conf_guard
    1635         1235 :             .tenant_conf
    1636         1235 :             .max_lsn_wal_lag
    1637         1235 :             .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
    1638         1235 :         drop(tenant_conf_guard);
    1639         1235 : 
    1640         1235 :         let mut guard = self.walreceiver.lock().unwrap();
    1641         1235 :         assert!(
    1642         1235 :             guard.is_none(),
    1643            0 :             "multiple launches / re-launches of WAL receiver are not supported"
    1644              :         );
    1645         1235 :         *guard = Some(WalReceiver::start(
    1646         1235 :             Arc::clone(self),
    1647         1235 :             WalReceiverConf {
    1648         1235 :                 wal_connect_timeout,
    1649         1235 :                 lagging_wal_timeout,
    1650         1235 :                 max_lsn_wal_lag,
    1651         1235 :                 auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
    1652         1235 :                 availability_zone: self.conf.availability_zone.clone(),
    1653         1235 :                 ingest_batch_size: self.conf.ingest_batch_size,
    1654         1235 :             },
    1655         1235 :             broker_client,
    1656         1235 :             ctx,
    1657         1235 :         ));
    1658         1235 :     }
    1659              : 
    1660              :     /// Initialize with an empty layer map. Used when creating a new timeline.
    1661         1144 :     pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
    1662         1144 :         let mut layers = self.layers.try_write().expect(
    1663         1144 :             "in the context where we call this function, no other task has access to the object",
    1664         1144 :         );
    1665         1144 :         layers.initialize_empty(Lsn(start_lsn.0));
    1666         1144 :     }
    1667              : 
    1668              :     /// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
    1669              :     /// files.
    1670          412 :     pub(super) async fn load_layer_map(
    1671          412 :         &self,
    1672          412 :         disk_consistent_lsn: Lsn,
    1673          412 :         index_part: Option<IndexPart>,
    1674          412 :     ) -> anyhow::Result<()> {
    1675              :         use init::{Decision::*, Discovered, DismissedLayer};
    1676              :         use LayerFileName::*;
    1677              : 
    1678          412 :         let mut guard = self.layers.write().await;
    1679              : 
    1680          412 :         let timer = self.metrics.load_layer_map_histo.start_timer();
    1681          412 : 
    1682          412 :         // Scan timeline directory and create ImageFileName and DeltaFilename
    1683          412 :         // structs representing all files on disk
    1684          412 :         let timeline_path = self
    1685          412 :             .conf
    1686          412 :             .timeline_path(&self.tenant_shard_id, &self.timeline_id);
    1687          412 :         let conf = self.conf;
    1688          412 :         let span = tracing::Span::current();
    1689          412 : 
    1690          412 :         // Copy to move into the task we're about to spawn
    1691          412 :         let generation = self.generation;
    1692          412 :         let shard = self.get_shard_index();
    1693          412 :         let this = self.myself.upgrade().expect("&self method holds the arc");
    1694              : 
    1695          412 :         let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
    1696          412 :             move || {
    1697          412 :                 let _g = span.entered();
    1698          412 :                 let discovered = init::scan_timeline_dir(&timeline_path)?;
    1699          412 :                 let mut discovered_layers = Vec::with_capacity(discovered.len());
    1700          412 :                 let mut unrecognized_files = Vec::new();
    1701          412 : 
    1702          412 :                 let mut path = timeline_path;
    1703              : 
    1704        14223 :                 for discovered in discovered {
    1705        13811 :                     let (name, kind) = match discovered {
    1706        13348 :                         Discovered::Layer(file_name, file_size) => {
    1707        13348 :                             discovered_layers.push((file_name, file_size));
    1708        13348 :                             continue;
    1709              :                         }
    1710              :                         Discovered::Metadata | Discovered::IgnoredBackup => {
    1711          412 :                             continue;
    1712              :                         }
    1713            0 :                         Discovered::Unknown(file_name) => {
    1714            0 :                             // we will later error if there are any
    1715            0 :                             unrecognized_files.push(file_name);
    1716            0 :                             continue;
    1717              :                         }
    1718           47 :                         Discovered::Ephemeral(name) => (name, "old ephemeral file"),
    1719            1 :                         Discovered::Temporary(name) => (name, "temporary timeline file"),
    1720            3 :                         Discovered::TemporaryDownload(name) => (name, "temporary download"),
    1721              :                     };
    1722           51 :                     path.push(Utf8Path::new(&name));
    1723           51 :                     init::cleanup(&path, kind)?;
    1724           51 :                     path.pop();
    1725              :                 }
    1726              : 
    1727          412 :                 if !unrecognized_files.is_empty() {
    1728              :                     // assume that if there are any there are many many.
    1729            0 :                     let n = unrecognized_files.len();
    1730            0 :                     let first = &unrecognized_files[..n.min(10)];
    1731            0 :                     anyhow::bail!(
    1732            0 :                         "unrecognized files in timeline dir (total {n}), first 10: {first:?}"
    1733            0 :                     );
    1734          412 :                 }
    1735          412 : 
    1736          412 :                 let decided = init::reconcile(
    1737          412 :                     discovered_layers,
    1738          412 :                     index_part.as_ref(),
    1739          412 :                     disk_consistent_lsn,
    1740          412 :                     generation,
    1741          412 :                     shard,
    1742          412 :                 );
    1743          412 : 
    1744          412 :                 let mut loaded_layers = Vec::new();
    1745          412 :                 let mut needs_cleanup = Vec::new();
    1746          412 :                 let mut total_physical_size = 0;
    1747              : 
    1748        57877 :                 for (name, decision) in decided {
    1749        57465 :                     let decision = match decision {
    1750        12212 :                         Ok(UseRemote { local, remote }) => {
    1751        12212 :                             // Remote is authoritative, but we may still choose to retain
    1752        12212 :                             // the local file if the contents appear to match
    1753        12212 :                             if local.file_size() == remote.file_size() {
    1754              :                                 // Use the local file, but take the remote metadata so that we pick up
    1755              :                                 // the correct generation.
    1756        12211 :                                 UseLocal(remote)
    1757              :                             } else {
    1758            1 :                                 path.push(name.file_name());
    1759            1 :                                 init::cleanup_local_file_for_remote(&path, &local, &remote)?;
    1760            1 :                                 path.pop();
    1761            1 :                                 UseRemote { local, remote }
    1762              :                             }
    1763              :                         }
    1764        44616 :                         Ok(decision) => decision,
    1765            2 :                         Err(DismissedLayer::Future { local }) => {
    1766            2 :                             if local.is_some() {
    1767            1 :                                 path.push(name.file_name());
    1768            1 :                                 init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
    1769            1 :                                 path.pop();
    1770            1 :                             }
    1771            2 :                             needs_cleanup.push(name);
    1772            2 :                             continue;
    1773              :                         }
    1774          635 :                         Err(DismissedLayer::LocalOnly(local)) => {
    1775          635 :                             path.push(name.file_name());
    1776          635 :                             init::cleanup_local_only_file(&path, &name, &local)?;
    1777          635 :                             path.pop();
    1778          635 :                             // this file never existed remotely, we will have to do rework
    1779          635 :                             continue;
    1780              :                         }
    1781              :                     };
    1782              : 
    1783        56828 :                     match &name {
    1784        23503 :                         Delta(d) => assert!(d.lsn_range.end <= disk_consistent_lsn + 1),
    1785        33325 :                         Image(i) => assert!(i.lsn <= disk_consistent_lsn),
    1786              :                     }
    1787              : 
    1788        56828 :                     tracing::debug!(layer=%name, ?decision, "applied");
    1789              : 
    1790        56828 :                     let layer = match decision {
    1791        12711 :                         UseLocal(m) => {
    1792        12711 :                             total_physical_size += m.file_size();
    1793        12711 :                             Layer::for_resident(conf, &this, name, m).drop_eviction_guard()
    1794              :                         }
    1795        44116 :                         Evicted(remote) | UseRemote { remote, .. } => {
    1796        44117 :                             Layer::for_evicted(conf, &this, name, remote)
    1797              :                         }
    1798              :                     };
    1799              : 
    1800        56828 :                     loaded_layers.push(layer);
    1801              :                 }
    1802          412 :                 Ok((loaded_layers, needs_cleanup, total_physical_size))
    1803          412 :             }
    1804          412 :         })
    1805          408 :         .await
    1806          412 :         .map_err(anyhow::Error::new)
    1807          412 :         .and_then(|x| x)?;
    1808              : 
    1809          412 :         let num_layers = loaded_layers.len();
    1810          412 : 
    1811          412 :         guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
    1812              : 
    1813          412 :         if let Some(rtc) = self.remote_client.as_ref() {
    1814          412 :             rtc.schedule_layer_file_deletion(&needs_cleanup)?;
    1815          412 :             rtc.schedule_index_upload_for_file_changes()?;
    1816              :             // This barrier orders above DELETEs before any later operations.
    1817              :             // This is critical because code executing after the barrier might
    1818              :             // create again objects with the same key that we just scheduled for deletion.
    1819              :             // For example, if we just scheduled deletion of an image layer "from the future",
    1820              :             // later compaction might run again and re-create the same image layer.
    1821              :             // "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
    1822              :             // "same" here means same key range and LSN.
    1823              :             //
    1824              :             // Without a barrier between above DELETEs and the re-creation's PUTs,
    1825              :             // the upload queue may execute the PUT first, then the DELETE.
    1826              :             // In our example, we will end up with an IndexPart referencing a non-existent object.
    1827              :             //
    1828              :             // 1. a future image layer is created and uploaded
    1829              :             // 2. ps restart
    1830              :             // 3. the future layer from (1) is deleted during load layer map
    1831              :             // 4. image layer is re-created and uploaded
    1832              :             // 5. deletion queue would like to delete (1) but actually deletes (4)
    1833              :             // 6. delete by name works as expected, but it now deletes the wrong (later) version
    1834              :             //
    1835              :             // See https://github.com/neondatabase/neon/issues/5878
    1836              :             //
    1837              :             // NB: generation numbers naturally protect against this because they disambiguate
    1838              :             //     (1) and (4)
    1839          412 :             rtc.schedule_barrier()?;
    1840              :             // Tenant::create_timeline will wait for these uploads to happen before returning, or
    1841              :             // on retry.
    1842            0 :         }
    1843              : 
    1844          412 :         info!(
    1845          412 :             "loaded layer map with {} layers at {}, total physical size: {}",
    1846          412 :             num_layers, disk_consistent_lsn, total_physical_size
    1847          412 :         );
    1848              : 
    1849          412 :         timer.stop_and_record();
    1850          412 :         Ok(())
    1851          412 :     }
    1852              : 
    1853              :     /// Retrieve current logical size of the timeline.
    1854              :     ///
    1855              :     /// The size could be lagging behind the actual number, in case
    1856              :     /// the initial size calculation has not been run (gets triggered on the first size access).
    1857              :     ///
    1858              :     /// return size and boolean flag that shows if the size is exact
    1859       710214 :     pub(crate) fn get_current_logical_size(
    1860       710214 :         self: &Arc<Self>,
    1861       710214 :         priority: GetLogicalSizePriority,
    1862       710214 :         ctx: &RequestContext,
    1863       710214 :     ) -> logical_size::CurrentLogicalSize {
    1864       710214 :         if !self.tenant_shard_id.is_zero() {
    1865              :             // Logical size is only accurately maintained on shard zero: when called elsewhere, for example
    1866              :             // when HTTP API is serving a GET for timeline zero, return zero
    1867           70 :             return logical_size::CurrentLogicalSize::Approximate(logical_size::Approximate::zero());
    1868       710144 :         }
    1869       710144 : 
    1870       710144 :         let current_size = self.current_logical_size.current_size();
    1871       710144 :         debug!("Current size: {current_size:?}");
    1872              : 
    1873       710144 :         match (current_size.accuracy(), priority) {
    1874       709383 :             (logical_size::Accuracy::Exact, _) => (), // nothing to do
    1875          156 :             (logical_size::Accuracy::Approximate, GetLogicalSizePriority::Background) => {
    1876          156 :                 // background task will eventually deliver an exact value, we're in no rush
    1877          156 :             }
    1878              :             (logical_size::Accuracy::Approximate, GetLogicalSizePriority::User) => {
    1879              :                 // background task is not ready, but user is asking for it now;
    1880              :                 // => make the background task skip the line
    1881              :                 // (The alternative would be to calculate the size here, but,
    1882              :                 //  it can actually take a long time if the user has a lot of rels.
    1883              :                 //  And we'll inevitable need it again; So, let the background task do the work.)
    1884          605 :                 match self
    1885          605 :                     .current_logical_size
    1886          605 :                     .cancel_wait_for_background_loop_concurrency_limit_semaphore
    1887          605 :                     .get()
    1888              :                 {
    1889          605 :                     Some(cancel) => cancel.cancel(),
    1890              :                     None => {
    1891            0 :                         let state = self.current_state();
    1892            0 :                         if matches!(
    1893            0 :                             state,
    1894              :                             TimelineState::Broken { .. } | TimelineState::Stopping
    1895            0 :                         ) {
    1896            0 : 
    1897            0 :                             // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
    1898            0 :                             // Don't make noise.
    1899            0 :                         } else {
    1900            0 :                             warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work");
    1901              :                         }
    1902              :                     }
    1903              :                 };
    1904              :             }
    1905              :         }
    1906              : 
    1907       710144 :         if let CurrentLogicalSize::Approximate(_) = &current_size {
    1908          761 :             if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler {
    1909          327 :                 let first = self
    1910          327 :                     .current_logical_size
    1911          327 :                     .did_return_approximate_to_walreceiver
    1912          327 :                     .compare_exchange(
    1913          327 :                         false,
    1914          327 :                         true,
    1915          327 :                         AtomicOrdering::Relaxed,
    1916          327 :                         AtomicOrdering::Relaxed,
    1917          327 :                     )
    1918          327 :                     .is_ok();
    1919          327 :                 if first {
    1920           60 :                     crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc();
    1921          267 :                 }
    1922          434 :             }
    1923       709383 :         }
    1924              : 
    1925       710144 :         current_size
    1926       710214 :     }
    1927              : 
    1928         1205 :     fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &RequestContext) {
    1929         1205 :         let Some(initial_part_end) = self.current_logical_size.initial_part_end else {
    1930              :             // nothing to do for freshly created timelines;
    1931          570 :             assert_eq!(
    1932          570 :                 self.current_logical_size.current_size().accuracy(),
    1933          570 :                 logical_size::Accuracy::Exact,
    1934          570 :             );
    1935          570 :             self.current_logical_size.initialized.add_permits(1);
    1936          570 :             return;
    1937              :         };
    1938              : 
    1939          635 :         let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new();
    1940          635 :         let token = cancel_wait_for_background_loop_concurrency_limit_semaphore.clone();
    1941          635 :         self.current_logical_size
    1942          635 :             .cancel_wait_for_background_loop_concurrency_limit_semaphore.set(token)
    1943          635 :             .expect("initial logical size calculation task must be spawned exactly once per Timeline object");
    1944          635 : 
    1945          635 :         let self_clone = Arc::clone(self);
    1946          635 :         let background_ctx = ctx.detached_child(
    1947          635 :             TaskKind::InitialLogicalSizeCalculation,
    1948          635 :             DownloadBehavior::Download,
    1949          635 :         );
    1950          635 :         task_mgr::spawn(
    1951          635 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    1952          635 :             task_mgr::TaskKind::InitialLogicalSizeCalculation,
    1953          635 :             Some(self.tenant_shard_id),
    1954          635 :             Some(self.timeline_id),
    1955          635 :             "initial size calculation",
    1956              :             false,
    1957              :             // NB: don't log errors here, task_mgr will do that.
    1958          635 :             async move {
    1959          635 :                 let cancel = task_mgr::shutdown_token();
    1960          635 :                 self_clone
    1961          635 :                     .initial_logical_size_calculation_task(
    1962          635 :                         initial_part_end,
    1963          635 :                         cancel_wait_for_background_loop_concurrency_limit_semaphore,
    1964          635 :                         cancel,
    1965          635 :                         background_ctx,
    1966          635 :                     )
    1967       103140 :                     .await;
    1968          631 :                 Ok(())
    1969          631 :             }
    1970          635 :             .instrument(info_span!(parent: None, "initial_size_calculation", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id)),
    1971              :         );
    1972         1205 :     }
    1973              : 
    1974          635 :     async fn initial_logical_size_calculation_task(
    1975          635 :         self: Arc<Self>,
    1976          635 :         initial_part_end: Lsn,
    1977          635 :         skip_concurrency_limiter: CancellationToken,
    1978          635 :         cancel: CancellationToken,
    1979          635 :         background_ctx: RequestContext,
    1980          635 :     ) {
    1981          631 :         scopeguard::defer! {
    1982          631 :             // Irrespective of the outcome of this operation, we should unblock anyone waiting for it.
    1983          631 :             self.current_logical_size.initialized.add_permits(1);
    1984          631 :         }
    1985              : 
    1986              :         enum BackgroundCalculationError {
    1987              :             Cancelled,
    1988              :             Other(anyhow::Error),
    1989              :         }
    1990              : 
    1991          635 :         let try_once = |attempt: usize| {
    1992          635 :             let background_ctx = &background_ctx;
    1993          635 :             let self_ref = &self;
    1994          635 :             let skip_concurrency_limiter = &skip_concurrency_limiter;
    1995          635 :             async move {
    1996          635 :                 let cancel = task_mgr::shutdown_token();
    1997          635 :                 let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
    1998          635 :                     BackgroundLoopKind::InitialLogicalSizeCalculation,
    1999          635 :                     background_ctx,
    2000          635 :                 );
    2001              : 
    2002              :                 use crate::metrics::initial_logical_size::StartCircumstances;
    2003          635 :                 let (_maybe_permit, circumstances) = tokio::select! {
    2004          215 :                     permit = wait_for_permit => {
    2005              :                         (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit)
    2006              :                     }
    2007              :                     _ = self_ref.cancel.cancelled() => {
    2008              :                         return Err(BackgroundCalculationError::Cancelled);
    2009              :                     }
    2010              :                     _ = cancel.cancelled() => {
    2011              :                         return Err(BackgroundCalculationError::Cancelled);
    2012              :                     },
    2013              :                     () = skip_concurrency_limiter.cancelled() => {
    2014              :                         // Some action that is part of a end user interaction requested logical size
    2015              :                         // => break out of the rate limit
    2016              :                         // TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime;
    2017              :                         // but then again what happens if they cancel; also, we should just be using
    2018              :                         // one runtime across the entire process, so, let's leave this for now.
    2019              :                         (None, StartCircumstances::SkippedConcurrencyLimiter)
    2020              :                     }
    2021              :                 };
    2022              : 
    2023          623 :                 let metrics_guard = if attempt == 1 {
    2024          623 :                     crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances)
    2025              :                 } else {
    2026            0 :                     crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
    2027              :                 };
    2028              : 
    2029          623 :                 match self_ref
    2030          623 :                     .logical_size_calculation_task(
    2031          623 :                         initial_part_end,
    2032          623 :                         LogicalSizeCalculationCause::Initial,
    2033          623 :                         background_ctx,
    2034          623 :                     )
    2035       102635 :                     .await
    2036              :                 {
    2037          580 :                     Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
    2038              :                     Err(CalculateLogicalSizeError::Cancelled) => {
    2039           31 :                         Err(BackgroundCalculationError::Cancelled)
    2040              :                     }
    2041            2 :                     Err(CalculateLogicalSizeError::Other(err)) => {
    2042            1 :                         if let Some(PageReconstructError::AncestorStopping(_)) =
    2043            2 :                             err.root_cause().downcast_ref()
    2044              :                         {
    2045            0 :                             Err(BackgroundCalculationError::Cancelled)
    2046              :                         } else {
    2047            2 :                             Err(BackgroundCalculationError::Other(err))
    2048              :                         }
    2049              :                     }
    2050              :                 }
    2051          625 :             }
    2052          635 :         };
    2053              : 
    2054          635 :         let retrying = async {
    2055          635 :             let mut attempt = 0;
    2056          635 :             loop {
    2057          635 :                 attempt += 1;
    2058          635 : 
    2059       103137 :                 match try_once(attempt).await {
    2060          580 :                     Ok(res) => return ControlFlow::Continue(res),
    2061           43 :                     Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
    2062            2 :                     Err(BackgroundCalculationError::Other(e)) => {
    2063            2 :                         warn!(attempt, "initial size calculation failed: {e:?}");
    2064              :                         // exponential back-off doesn't make sense at these long intervals;
    2065              :                         // use fixed retry interval with generous jitter instead
    2066            2 :                         let sleep_duration = Duration::from_secs(
    2067            2 :                             u64::try_from(
    2068            2 :                                 // 1hour base
    2069            2 :                                 (60_i64 * 60_i64)
    2070            2 :                                     // 10min jitter
    2071            2 :                                     + rand::thread_rng().gen_range(-10 * 60..10 * 60),
    2072            2 :                             )
    2073            2 :                             .expect("10min < 1hour"),
    2074            2 :                         );
    2075            2 :                         tokio::time::sleep(sleep_duration).await;
    2076              :                     }
    2077              :                 }
    2078              :             }
    2079          623 :         };
    2080              : 
    2081          635 :         let (calculated_size, metrics_guard) = tokio::select! {
    2082          623 :             res = retrying  => {
    2083              :                 match res {
    2084              :                     ControlFlow::Continue(calculated_size) => calculated_size,
    2085              :                     ControlFlow::Break(()) => return,
    2086              :                 }
    2087              :             }
    2088              :             _ = cancel.cancelled() => {
    2089              :                 return;
    2090              :             }
    2091              :         };
    2092              : 
    2093              :         // we cannot query current_logical_size.current_size() to know the current
    2094              :         // *negative* value, only truncated to u64.
    2095          580 :         let added = self
    2096          580 :             .current_logical_size
    2097          580 :             .size_added_after_initial
    2098          580 :             .load(AtomicOrdering::Relaxed);
    2099          580 : 
    2100          580 :         let sum = calculated_size.saturating_add_signed(added);
    2101          580 : 
    2102          580 :         // set the gauge value before it can be set in `update_current_logical_size`.
    2103          580 :         self.metrics.current_logical_size_gauge.set(sum);
    2104          580 : 
    2105          580 :         self.current_logical_size
    2106          580 :             .initial_logical_size
    2107          580 :             .set((calculated_size, metrics_guard.calculation_result_saved()))
    2108          580 :             .ok()
    2109          580 :             .expect("only this task sets it");
    2110          631 :     }
    2111              : 
    2112           35 :     pub(crate) fn spawn_ondemand_logical_size_calculation(
    2113           35 :         self: &Arc<Self>,
    2114           35 :         lsn: Lsn,
    2115           35 :         cause: LogicalSizeCalculationCause,
    2116           35 :         ctx: RequestContext,
    2117           35 :     ) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
    2118           35 :         let (sender, receiver) = oneshot::channel();
    2119           35 :         let self_clone = Arc::clone(self);
    2120           35 :         // XXX if our caller loses interest, i.e., ctx is cancelled,
    2121           35 :         // we should stop the size calculation work and return an error.
    2122           35 :         // That would require restructuring this function's API to
    2123           35 :         // return the result directly, instead of a Receiver for the result.
    2124           35 :         let ctx = ctx.detached_child(
    2125           35 :             TaskKind::OndemandLogicalSizeCalculation,
    2126           35 :             DownloadBehavior::Download,
    2127           35 :         );
    2128           35 :         task_mgr::spawn(
    2129           35 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    2130           35 :             task_mgr::TaskKind::OndemandLogicalSizeCalculation,
    2131           35 :             Some(self.tenant_shard_id),
    2132           35 :             Some(self.timeline_id),
    2133           35 :             "ondemand logical size calculation",
    2134           35 :             false,
    2135           35 :             async move {
    2136           35 :                 let res = self_clone
    2137           35 :                     .logical_size_calculation_task(lsn, cause, &ctx)
    2138         2789 :                     .await;
    2139           35 :                 let _ = sender.send(res).ok();
    2140           35 :                 Ok(()) // Receiver is responsible for handling errors
    2141           35 :             }
    2142           35 :             .in_current_span(),
    2143           35 :         );
    2144           35 :         receiver
    2145           35 :     }
    2146              : 
    2147              :     /// # Cancel-Safety
    2148              :     ///
    2149              :     /// This method is cancellation-safe.
    2150            0 :     #[instrument(skip_all)]
    2151              :     async fn logical_size_calculation_task(
    2152              :         self: &Arc<Self>,
    2153              :         lsn: Lsn,
    2154              :         cause: LogicalSizeCalculationCause,
    2155              :         ctx: &RequestContext,
    2156              :     ) -> Result<u64, CalculateLogicalSizeError> {
    2157              :         crate::span::debug_assert_current_span_has_tenant_and_timeline_id();
    2158              :         // We should never be calculating logical sizes on shard !=0, because these shards do not have
    2159              :         // accurate relation sizes, and they do not emit consumption metrics.
    2160              :         debug_assert!(self.tenant_shard_id.is_zero());
    2161              : 
    2162              :         let _guard = self.gate.enter();
    2163              : 
    2164              :         let self_calculation = Arc::clone(self);
    2165              : 
    2166          658 :         let mut calculation = pin!(async {
    2167          658 :             let ctx = ctx.attached_child();
    2168          658 :             self_calculation
    2169          658 :                 .calculate_logical_size(lsn, cause, &ctx)
    2170       105431 :                 .await
    2171          648 :         });
    2172              : 
    2173       106073 :         tokio::select! {
    2174          646 :             res = &mut calculation => { res }
    2175              :             _ = self.cancel.cancelled() => {
    2176            0 :                 debug!("cancelling logical size calculation for timeline shutdown");
    2177              :                 calculation.await
    2178              :             }
    2179              :             _ = task_mgr::shutdown_watcher() => {
    2180            0 :                 debug!("cancelling logical size calculation for task shutdown");
    2181              :                 calculation.await
    2182              :             }
    2183              :         }
    2184              :     }
    2185              : 
    2186              :     /// Calculate the logical size of the database at the latest LSN.
    2187              :     ///
    2188              :     /// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
    2189              :     /// especially if we need to download remote layers.
    2190              :     ///
    2191              :     /// # Cancel-Safety
    2192              :     ///
    2193              :     /// This method is cancellation-safe.
    2194          665 :     async fn calculate_logical_size(
    2195          665 :         &self,
    2196          665 :         up_to_lsn: Lsn,
    2197          665 :         cause: LogicalSizeCalculationCause,
    2198          665 :         ctx: &RequestContext,
    2199          665 :     ) -> Result<u64, CalculateLogicalSizeError> {
    2200          665 :         info!(
    2201          665 :             "Calculating logical size for timeline {} at {}",
    2202          665 :             self.timeline_id, up_to_lsn
    2203          665 :         );
    2204              :         // These failpoints are used by python tests to ensure that we don't delete
    2205              :         // the timeline while the logical size computation is ongoing.
    2206              :         // The first failpoint is used to make this function pause.
    2207              :         // Then the python test initiates timeline delete operation in a thread.
    2208              :         // It waits for a few seconds, then arms the second failpoint and disables
    2209              :         // the first failpoint. The second failpoint prints an error if the timeline
    2210              :         // delete code has deleted the on-disk state while we're still running here.
    2211              :         // It shouldn't do that. If it does it anyway, the error will be caught
    2212              :         // by the test suite, highlighting the problem.
    2213            0 :         fail::fail_point!("timeline-calculate-logical-size-pause");
    2214          665 :         fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
    2215            2 :             if !self
    2216            2 :                 .conf
    2217            2 :                 .metadata_path(&self.tenant_shard_id, &self.timeline_id)
    2218            2 :                 .exists()
    2219              :             {
    2220            0 :                 error!("timeline-calculate-logical-size-pre metadata file does not exist")
    2221            2 :             }
    2222              :             // need to return something
    2223            2 :             Ok(0)
    2224          665 :         });
    2225              :         // See if we've already done the work for initial size calculation.
    2226              :         // This is a short-cut for timelines that are mostly unused.
    2227          663 :         if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
    2228            5 :             return Ok(size);
    2229          658 :         }
    2230          658 :         let storage_time_metrics = match cause {
    2231              :             LogicalSizeCalculationCause::Initial
    2232              :             | LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
    2233          645 :             | LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
    2234              :             LogicalSizeCalculationCause::EvictionTaskImitation => {
    2235           13 :                 &self.metrics.imitate_logical_size_histo
    2236              :             }
    2237              :         };
    2238          658 :         let timer = storage_time_metrics.start_timer();
    2239          658 :         let logical_size = self
    2240          658 :             .get_current_logical_size_non_incremental(up_to_lsn, ctx)
    2241       106262 :             .await?;
    2242            0 :         debug!("calculated logical size: {logical_size}");
    2243          615 :         timer.stop_and_record();
    2244          615 :         Ok(logical_size)
    2245          655 :     }
    2246              : 
    2247              :     /// Update current logical size, adding `delta' to the old value.
    2248       642421 :     fn update_current_logical_size(&self, delta: i64) {
    2249       642421 :         let logical_size = &self.current_logical_size;
    2250       642421 :         logical_size.increment_size(delta);
    2251       642421 : 
    2252       642421 :         // Also set the value in the prometheus gauge. Note that
    2253       642421 :         // there is a race condition here: if this is is called by two
    2254       642421 :         // threads concurrently, the prometheus gauge might be set to
    2255       642421 :         // one value while current_logical_size is set to the
    2256       642421 :         // other.
    2257       642421 :         match logical_size.current_size() {
    2258       642359 :             CurrentLogicalSize::Exact(ref new_current_size) => self
    2259       642359 :                 .metrics
    2260       642359 :                 .current_logical_size_gauge
    2261       642359 :                 .set(new_current_size.into()),
    2262           62 :             CurrentLogicalSize::Approximate(_) => {
    2263           62 :                 // don't update the gauge yet, this allows us not to update the gauge back and
    2264           62 :                 // forth between the initial size calculation task.
    2265           62 :             }
    2266              :         }
    2267       642421 :     }
    2268              : 
    2269         2254 :     async fn find_layer(&self, layer_file_name: &str) -> Option<Layer> {
    2270         2254 :         let guard = self.layers.read().await;
    2271       800061 :         for historic_layer in guard.layer_map().iter_historic_layers() {
    2272       800061 :             let historic_layer_name = historic_layer.filename().file_name();
    2273       800061 :             if layer_file_name == historic_layer_name {
    2274         2254 :                 return Some(guard.get_from_desc(&historic_layer));
    2275       797807 :             }
    2276              :         }
    2277              : 
    2278            0 :         None
    2279         2254 :     }
    2280              : 
    2281              :     /// The timeline heatmap is a hint to secondary locations from the primary location,
    2282              :     /// indicating which layers are currently on-disk on the primary.
    2283              :     ///
    2284              :     /// None is returned if the Timeline is in a state where uploading a heatmap
    2285              :     /// doesn't make sense, such as shutting down or initializing.  The caller
    2286              :     /// should treat this as a cue to simply skip doing any heatmap uploading
    2287              :     /// for this timeline.
    2288            8 :     pub(crate) async fn generate_heatmap(&self) -> Option<HeatMapTimeline> {
    2289           17 :         let eviction_info = self.get_local_layers_for_disk_usage_eviction().await;
    2290              : 
    2291            8 :         let remote_client = match &self.remote_client {
    2292            8 :             Some(c) => c,
    2293            0 :             None => return None,
    2294              :         };
    2295              : 
    2296            8 :         let layer_file_names = eviction_info
    2297            8 :             .resident_layers
    2298            8 :             .iter()
    2299         2493 :             .map(|l| l.layer.get_name())
    2300            8 :             .collect::<Vec<_>>();
    2301              : 
    2302            8 :         let decorated = match remote_client.get_layers_metadata(layer_file_names) {
    2303            8 :             Ok(d) => d,
    2304              :             Err(_) => {
    2305              :                 // Getting metadata only fails on Timeline in bad state.
    2306            0 :                 return None;
    2307              :             }
    2308              :         };
    2309              : 
    2310            8 :         let heatmap_layers = std::iter::zip(
    2311            8 :             eviction_info.resident_layers.into_iter(),
    2312            8 :             decorated.into_iter(),
    2313            8 :         )
    2314         2493 :         .filter_map(|(layer, remote_info)| {
    2315         2493 :             remote_info.map(|remote_info| {
    2316         2493 :                 HeatMapLayer::new(
    2317         2493 :                     layer.layer.get_name(),
    2318         2493 :                     IndexLayerMetadata::from(remote_info),
    2319         2493 :                     layer.last_activity_ts,
    2320         2493 :                 )
    2321         2493 :             })
    2322         2493 :         });
    2323            8 : 
    2324            8 :         Some(HeatMapTimeline::new(
    2325            8 :             self.timeline_id,
    2326            8 :             heatmap_layers.collect(),
    2327            8 :         ))
    2328            8 :     }
    2329              : }
    2330              : 
    2331              : type TraversalId = String;
    2332              : 
    2333              : trait TraversalLayerExt {
    2334              :     fn traversal_id(&self) -> TraversalId;
    2335              : }
    2336              : 
    2337              : impl TraversalLayerExt for Layer {
    2338         1051 :     fn traversal_id(&self) -> TraversalId {
    2339         1051 :         self.local_path().to_string()
    2340         1051 :     }
    2341              : }
    2342              : 
    2343              : impl TraversalLayerExt for Arc<InMemoryLayer> {
    2344          337 :     fn traversal_id(&self) -> TraversalId {
    2345          337 :         format!("timeline {} in-memory {self}", self.get_timeline_id())
    2346          337 :     }
    2347              : }
    2348              : 
    2349              : impl Timeline {
    2350              :     ///
    2351              :     /// Get a handle to a Layer for reading.
    2352              :     ///
    2353              :     /// The returned Layer might be from an ancestor timeline, if the
    2354              :     /// segment hasn't been updated on this timeline yet.
    2355              :     ///
    2356              :     /// This function takes the current timeline's locked LayerMap as an argument,
    2357              :     /// so callers can avoid potential race conditions.
    2358              :     ///
    2359              :     /// # Cancel-Safety
    2360              :     ///
    2361              :     /// This method is cancellation-safe.
    2362      8625177 :     async fn get_reconstruct_data(
    2363      8625177 :         &self,
    2364      8625177 :         key: Key,
    2365      8625177 :         request_lsn: Lsn,
    2366      8625177 :         reconstruct_state: &mut ValueReconstructState,
    2367      8625177 :         ctx: &RequestContext,
    2368      8625179 :     ) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
    2369      8625179 :         // Start from the current timeline.
    2370      8625179 :         let mut timeline_owned;
    2371      8625179 :         let mut timeline = self;
    2372      8625179 : 
    2373      8625179 :         let mut read_count = scopeguard::guard(0, |cnt| {
    2374      8625175 :             crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
    2375      8625179 :         });
    2376      8625179 : 
    2377      8625179 :         // For debugging purposes, collect the path of layers that we traversed
    2378      8625179 :         // through. It's included in the error message if we fail to find the key.
    2379      8625179 :         let mut traversal_path = Vec::<TraversalPathItem>::new();
    2380              : 
    2381      8625179 :         let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
    2382      1776726 :             *cached_lsn
    2383              :         } else {
    2384      6848453 :             Lsn(0)
    2385              :         };
    2386              : 
    2387              :         // 'prev_lsn' tracks the last LSN that we were at in our search. It's used
    2388              :         // to check that each iteration make some progress, to break infinite
    2389              :         // looping if something goes wrong.
    2390      8625179 :         let mut prev_lsn = Lsn(u64::MAX);
    2391      8625179 : 
    2392      8625179 :         let mut result = ValueReconstructResult::Continue;
    2393      8625179 :         let mut cont_lsn = Lsn(request_lsn.0 + 1);
    2394              : 
    2395     40959817 :         'outer: loop {
    2396     40959817 :             if self.cancel.is_cancelled() {
    2397           38 :                 return Err(PageReconstructError::Cancelled);
    2398     40959779 :             }
    2399     40959779 : 
    2400     40959779 :             // The function should have updated 'state'
    2401     40959779 :             //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
    2402     40959779 :             match result {
    2403      6879484 :                 ValueReconstructResult::Complete => return Ok(traversal_path),
    2404              :                 ValueReconstructResult::Continue => {
    2405              :                     // If we reached an earlier cached page image, we're done.
    2406     34080290 :                     if cont_lsn == cached_lsn + 1 {
    2407      1744355 :                         MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
    2408      1744355 :                         return Ok(traversal_path);
    2409     32335935 :                     }
    2410     32335935 :                     if prev_lsn <= cont_lsn {
    2411              :                         // Didn't make any progress in last iteration. Error out to avoid
    2412              :                         // getting stuck in the loop.
    2413         1259 :                         return Err(layer_traversal_error(format!(
    2414         1259 :                             "could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
    2415         1259 :                             key,
    2416         1259 :                             Lsn(cont_lsn.0 - 1),
    2417         1259 :                             request_lsn,
    2418         1259 :                             timeline.ancestor_lsn
    2419         1259 :                         ), traversal_path));
    2420     32334676 :                     }
    2421     32334676 :                     prev_lsn = cont_lsn;
    2422              :                 }
    2423              :                 ValueReconstructResult::Missing => {
    2424              :                     return Err(layer_traversal_error(
    2425            5 :                         if cfg!(test) {
    2426            4 :                             format!(
    2427            4 :                                 "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}\n{}",
    2428            4 :                                 key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
    2429            4 :                             )
    2430              :                         } else {
    2431            1 :                             format!(
    2432            1 :                                 "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}",
    2433            1 :                                 key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn
    2434            1 :                             )
    2435              :                         },
    2436            5 :                         traversal_path,
    2437              :                     ));
    2438              :                 }
    2439              :             }
    2440              : 
    2441              :             // Recurse into ancestor if needed
    2442     32334676 :             if is_inherited_key(key) && Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
    2443            0 :                 trace!(
    2444            0 :                     "going into ancestor {}, cont_lsn is {}",
    2445            0 :                     timeline.ancestor_lsn,
    2446            0 :                     cont_lsn
    2447            0 :                 );
    2448              : 
    2449      1298714 :                 timeline_owned = timeline.get_ready_ancestor_timeline(ctx).await?;
    2450      1298710 :                 timeline = &*timeline_owned;
    2451      1298710 :                 prev_lsn = Lsn(u64::MAX);
    2452      1298710 :                 continue 'outer;
    2453     31035962 :             }
    2454              : 
    2455     31035962 :             let guard = timeline.layers.read().await;
    2456     31035962 :             let layers = guard.layer_map();
    2457              : 
    2458              :             // Check the open and frozen in-memory layers first, in order from newest
    2459              :             // to oldest.
    2460     31035962 :             if let Some(open_layer) = &layers.open_layer {
    2461     26023624 :                 let start_lsn = open_layer.get_lsn_range().start;
    2462     26023624 :                 if cont_lsn > start_lsn {
    2463              :                     //info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
    2464              :                     // Get all the data needed to reconstruct the page version from this layer.
    2465              :                     // But if we have an older cached page image, no need to go past that.
    2466      6700533 :                     let lsn_floor = max(cached_lsn + 1, start_lsn);
    2467      6700533 :                     result = match open_layer
    2468      6700533 :                         .get_value_reconstruct_data(
    2469      6700533 :                             key,
    2470      6700533 :                             lsn_floor..cont_lsn,
    2471      6700533 :                             reconstruct_state,
    2472      6700533 :                             ctx,
    2473      6700533 :                         )
    2474       669114 :                         .await
    2475              :                     {
    2476      6700533 :                         Ok(result) => result,
    2477            0 :                         Err(e) => return Err(PageReconstructError::from(e)),
    2478              :                     };
    2479      6700533 :                     cont_lsn = lsn_floor;
    2480      6700533 :                     // metrics: open_layer does not count as fs access, so we are not updating `read_count`
    2481      6700533 :                     traversal_path.push((
    2482      6700533 :                         result,
    2483      6700533 :                         cont_lsn,
    2484      6700533 :                         Box::new({
    2485      6700533 :                             let open_layer = Arc::clone(open_layer);
    2486      6700533 :                             move || open_layer.traversal_id()
    2487      6700533 :                         }),
    2488      6700533 :                     ));
    2489      6700533 :                     continue 'outer;
    2490     19323091 :                 }
    2491      5012338 :             }
    2492     24335429 :             for frozen_layer in layers.frozen_layers.iter().rev() {
    2493       941050 :                 let start_lsn = frozen_layer.get_lsn_range().start;
    2494       941050 :                 if cont_lsn > start_lsn {
    2495              :                     //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
    2496       261809 :                     let lsn_floor = max(cached_lsn + 1, start_lsn);
    2497       261809 :                     result = match frozen_layer
    2498       261809 :                         .get_value_reconstruct_data(
    2499       261809 :                             key,
    2500       261809 :                             lsn_floor..cont_lsn,
    2501       261809 :                             reconstruct_state,
    2502       261809 :                             ctx,
    2503       261809 :                         )
    2504        18969 :                         .await
    2505              :                     {
    2506       261809 :                         Ok(result) => result,
    2507            0 :                         Err(e) => return Err(PageReconstructError::from(e)),
    2508              :                     };
    2509       261809 :                     cont_lsn = lsn_floor;
    2510       261809 :                     // metrics: open_layer does not count as fs access, so we are not updating `read_count`
    2511       261809 :                     traversal_path.push((
    2512       261809 :                         result,
    2513       261809 :                         cont_lsn,
    2514       261809 :                         Box::new({
    2515       261809 :                             let frozen_layer = Arc::clone(frozen_layer);
    2516       261809 :                             move || frozen_layer.traversal_id()
    2517       261809 :                         }),
    2518       261809 :                     ));
    2519       261809 :                     continue 'outer;
    2520       679241 :                 }
    2521              :             }
    2522              : 
    2523     24073620 :             if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
    2524     23934553 :                 let layer = guard.get_from_desc(&layer);
    2525     23934553 :                 // Get all the data needed to reconstruct the page version from this layer.
    2526     23934553 :                 // But if we have an older cached page image, no need to go past that.
    2527     23934553 :                 let lsn_floor = max(cached_lsn + 1, lsn_floor);
    2528     23934553 :                 result = match layer
    2529     23934553 :                     .get_value_reconstruct_data(key, lsn_floor..cont_lsn, reconstruct_state, ctx)
    2530      1207064 :                     .await
    2531              :                 {
    2532     23934519 :                     Ok(result) => result,
    2533           22 :                     Err(e) => return Err(PageReconstructError::from(e)),
    2534              :                 };
    2535     23934519 :                 cont_lsn = lsn_floor;
    2536     23934519 :                 *read_count += 1;
    2537     23934519 :                 traversal_path.push((
    2538     23934519 :                     result,
    2539     23934519 :                     cont_lsn,
    2540     23934519 :                     Box::new({
    2541     23934519 :                         let layer = layer.to_owned();
    2542     23934519 :                         move || layer.traversal_id()
    2543     23934519 :                     }),
    2544     23934519 :                 ));
    2545     23934519 :                 continue 'outer;
    2546       139067 :             } else if timeline.ancestor_timeline.is_some() {
    2547              :                 // Nothing on this timeline. Traverse to parent
    2548       139066 :                 result = ValueReconstructResult::Continue;
    2549       139066 :                 cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
    2550       139066 :                 continue 'outer;
    2551              :             } else {
    2552              :                 // Nothing found
    2553            1 :                 result = ValueReconstructResult::Missing;
    2554            1 :                 continue 'outer;
    2555              :             }
    2556              :         }
    2557      8625167 :     }
    2558              : 
    2559              :     /// # Cancel-safety
    2560              :     ///
    2561              :     /// This method is cancellation-safe.
    2562      8625194 :     async fn lookup_cached_page(
    2563      8625194 :         &self,
    2564      8625194 :         key: &Key,
    2565      8625194 :         lsn: Lsn,
    2566      8625194 :         ctx: &RequestContext,
    2567      8625196 :     ) -> Option<(Lsn, Bytes)> {
    2568      8625196 :         let cache = page_cache::get();
    2569              : 
    2570              :         // FIXME: It's pointless to check the cache for things that are not 8kB pages.
    2571              :         // We should look at the key to determine if it's a cacheable object
    2572      8625196 :         let (lsn, read_guard) = cache
    2573      8625196 :             .lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
    2574      6848453 :             .await?;
    2575      1776743 :         let img = Bytes::from(read_guard.to_vec());
    2576      1776743 :         Some((lsn, img))
    2577      8625196 :     }
    2578              : 
    2579      1298714 :     async fn get_ready_ancestor_timeline(
    2580      1298714 :         &self,
    2581      1298714 :         ctx: &RequestContext,
    2582      1298714 :     ) -> Result<Arc<Timeline>, GetReadyAncestorError> {
    2583      1298714 :         let ancestor = match self.get_ancestor_timeline() {
    2584      1298714 :             Ok(timeline) => timeline,
    2585            0 :             Err(e) => return Err(GetReadyAncestorError::from(e)),
    2586              :         };
    2587              : 
    2588              :         // It's possible that the ancestor timeline isn't active yet, or
    2589              :         // is active but hasn't yet caught up to the branch point. Wait
    2590              :         // for it.
    2591              :         //
    2592              :         // This cannot happen while the pageserver is running normally,
    2593              :         // because you cannot create a branch from a point that isn't
    2594              :         // present in the pageserver yet. However, we don't wait for the
    2595              :         // branch point to be uploaded to cloud storage before creating
    2596              :         // a branch. I.e., the branch LSN need not be remote consistent
    2597              :         // for the branching operation to succeed.
    2598              :         //
    2599              :         // Hence, if we try to load a tenant in such a state where
    2600              :         // 1. the existence of the branch was persisted (in IndexPart and/or locally)
    2601              :         // 2. but the ancestor state is behind branch_lsn because it was not yet persisted
    2602              :         // then we will need to wait for the ancestor timeline to
    2603              :         // re-stream WAL up to branch_lsn before we access it.
    2604              :         //
    2605              :         // How can a tenant get in such a state?
    2606              :         // - ungraceful pageserver process exit
    2607              :         // - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
    2608              :         //
    2609              :         // NB: this could be avoided by requiring
    2610              :         //   branch_lsn >= remote_consistent_lsn
    2611              :         // during branch creation.
    2612      1298714 :         match ancestor.wait_to_become_active(ctx).await {
    2613      1298710 :             Ok(()) => {}
    2614              :             Err(TimelineState::Stopping) => {
    2615            2 :                 return Err(GetReadyAncestorError::AncestorStopping(
    2616            2 :                     ancestor.timeline_id,
    2617            2 :                 ));
    2618              :             }
    2619            2 :             Err(state) => {
    2620            2 :                 return Err(GetReadyAncestorError::Other(anyhow::anyhow!(
    2621            2 :                     "Timeline {} will not become active. Current state: {:?}",
    2622            2 :                     ancestor.timeline_id,
    2623            2 :                     &state,
    2624            2 :                 )));
    2625              :             }
    2626              :         }
    2627      1298710 :         ancestor
    2628      1298710 :             .wait_lsn(self.ancestor_lsn, ctx)
    2629            6 :             .await
    2630      1298710 :             .map_err(|e| match e {
    2631            0 :                 e @ WaitLsnError::Timeout(_) => GetReadyAncestorError::AncestorLsnTimeout(e),
    2632            0 :                 WaitLsnError::Shutdown => GetReadyAncestorError::Cancelled,
    2633            0 :                 e @ WaitLsnError::BadState => GetReadyAncestorError::Other(anyhow::anyhow!(e)),
    2634      1298710 :             })?;
    2635              : 
    2636      1298710 :         Ok(ancestor)
    2637      1298714 :     }
    2638              : 
    2639      1298714 :     fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
    2640      1298714 :         let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
    2641            0 :             format!(
    2642            0 :                 "Ancestor is missing. Timeline id: {} Ancestor id {:?}",
    2643            0 :                 self.timeline_id,
    2644            0 :                 self.get_ancestor_timeline_id(),
    2645            0 :             )
    2646      1298714 :         })?;
    2647      1298714 :         Ok(Arc::clone(ancestor))
    2648      1298714 :     }
    2649              : 
    2650      1073801 :     pub(crate) fn get_shard_identity(&self) -> &ShardIdentity {
    2651      1073801 :         &self.shard_identity
    2652      1073801 :     }
    2653              : 
    2654              :     ///
    2655              :     /// Get a handle to the latest layer for appending.
    2656              :     ///
    2657      2951969 :     async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
    2658      2951969 :         let mut guard = self.layers.write().await;
    2659      2951969 :         let layer = guard
    2660      2951969 :             .get_layer_for_write(
    2661      2951969 :                 lsn,
    2662      2951969 :                 self.get_last_record_lsn(),
    2663      2951969 :                 self.conf,
    2664      2951969 :                 self.timeline_id,
    2665      2951969 :                 self.tenant_shard_id,
    2666      2951969 :             )
    2667          269 :             .await?;
    2668      2951969 :         Ok(layer)
    2669      2951969 :     }
    2670              : 
    2671      1214102 :     async fn put_value(
    2672      1214102 :         &self,
    2673      1214102 :         key: Key,
    2674      1214102 :         lsn: Lsn,
    2675      1214102 :         val: &Value,
    2676      1214102 :         ctx: &RequestContext,
    2677      1214102 :     ) -> anyhow::Result<()> {
    2678              :         //info!("PUT: key {} at {}", key, lsn);
    2679      1214102 :         let layer = self.get_layer_for_write(lsn).await?;
    2680      1214102 :         layer.put_value(key, lsn, val, ctx).await?;
    2681      1214102 :         Ok(())
    2682      1214102 :     }
    2683              : 
    2684      1718651 :     async fn put_values(
    2685      1718651 :         &self,
    2686      1718651 :         values: &HashMap<Key, Vec<(Lsn, Value)>>,
    2687      1718651 :         ctx: &RequestContext,
    2688      1718651 :     ) -> anyhow::Result<()> {
    2689              :         // Pick the first LSN in the batch to get the layer to write to.
    2690      1718651 :         for lsns in values.values() {
    2691      1718651 :             if let Some((lsn, _)) = lsns.first() {
    2692      1718651 :                 let layer = self.get_layer_for_write(*lsn).await?;
    2693      1718651 :                 layer.put_values(values, ctx).await?;
    2694      1718650 :                 break;
    2695            0 :             }
    2696              :         }
    2697      1718650 :         Ok(())
    2698      1718650 :     }
    2699              : 
    2700        19216 :     async fn put_tombstones(&self, tombstones: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
    2701        19216 :         if let Some((_, lsn)) = tombstones.first() {
    2702        19216 :             let layer = self.get_layer_for_write(*lsn).await?;
    2703        19216 :             layer.put_tombstones(tombstones).await?;
    2704            0 :         }
    2705        19216 :         Ok(())
    2706        19216 :     }
    2707              : 
    2708     76570323 :     pub(crate) fn finish_write(&self, new_lsn: Lsn) {
    2709     76570323 :         assert!(new_lsn.is_aligned());
    2710              : 
    2711     76570323 :         self.metrics.last_record_gauge.set(new_lsn.0 as i64);
    2712     76570323 :         self.last_record_lsn.advance(new_lsn);
    2713     76570323 :     }
    2714              : 
    2715         5489 :     async fn freeze_inmem_layer(&self, write_lock_held: bool) {
    2716              :         // Freeze the current open in-memory layer. It will be written to disk on next
    2717              :         // iteration.
    2718         5489 :         let _write_guard = if write_lock_held {
    2719         3688 :             None
    2720              :         } else {
    2721         1801 :             Some(self.write_lock.lock().await)
    2722              :         };
    2723         5489 :         let mut guard = self.layers.write().await;
    2724         5489 :         guard
    2725         5489 :             .try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at)
    2726            5 :             .await;
    2727         5489 :     }
    2728              : 
    2729              :     /// Layer flusher task's main loop.
    2730         1551 :     async fn flush_loop(
    2731         1551 :         self: &Arc<Self>,
    2732         1551 :         mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
    2733         1551 :         ctx: &RequestContext,
    2734         1551 :     ) {
    2735         1551 :         info!("started flush loop");
    2736              :         loop {
    2737         6673 :             tokio::select! {
    2738        12020 :                 _ = self.cancel.cancelled() => {
    2739        12020 :                     info!("shutting down layer flush task");
    2740        12020 :                     break;
    2741        12020 :                 },
    2742        12020 :                 _ = task_mgr::shutdown_watcher() => {
    2743        12020 :                     info!("shutting down layer flush task");
    2744        12020 :                     break;
    2745        12020 :                 },
    2746        12020 :                 _ = layer_flush_start_rx.changed() => {}
    2747        12020 :             }
    2748              : 
    2749            0 :             trace!("waking up");
    2750         5129 :             let timer = self.metrics.flush_time_histo.start_timer();
    2751         5129 :             let flush_counter = *layer_flush_start_rx.borrow();
    2752        10388 :             let result = loop {
    2753        10388 :                 if self.cancel.is_cancelled() {
    2754            0 :                     info!("dropping out of flush loop for timeline shutdown");
    2755              :                     // Note: we do not bother transmitting into [`layer_flush_done_tx`], because
    2756              :                     // anyone waiting on that will respect self.cancel as well: they will stop
    2757              :                     // waiting at the same time we as drop out of this loop.
    2758            0 :                     return;
    2759        10388 :                 }
    2760              : 
    2761        10388 :                 let layer_to_flush = {
    2762        10388 :                     let guard = self.layers.read().await;
    2763        10388 :                     guard.layer_map().frozen_layers.front().cloned()
    2764              :                     // drop 'layers' lock to allow concurrent reads and writes
    2765              :                 };
    2766        10388 :                 let Some(layer_to_flush) = layer_to_flush else {
    2767         5122 :                     break Ok(());
    2768              :                 };
    2769        22252 :                 match self.flush_frozen_layer(layer_to_flush, ctx).await {
    2770         5259 :                     Ok(()) => {}
    2771              :                     Err(FlushLayerError::Cancelled) => {
    2772            1 :                         info!("dropping out of flush loop for timeline shutdown");
    2773            1 :                         return;
    2774              :                     }
    2775            0 :                     err @ Err(
    2776              :                         FlushLayerError::Other(_) | FlushLayerError::CreateImageLayersError(_),
    2777              :                     ) => {
    2778            0 :                         error!("could not flush frozen layer: {err:?}");
    2779            0 :                         break err;
    2780              :                     }
    2781              :                 }
    2782              :             };
    2783              :             // Notify any listeners that we're done
    2784         5122 :             let _ = self
    2785         5122 :                 .layer_flush_done_tx
    2786         5122 :                 .send_replace((flush_counter, result));
    2787         5122 : 
    2788         5122 :             timer.stop_and_record();
    2789              :         }
    2790          576 :     }
    2791              : 
    2792         1801 :     async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
    2793         1801 :         let mut rx = self.layer_flush_done_tx.subscribe();
    2794         1801 : 
    2795         1801 :         // Increment the flush cycle counter and wake up the flush task.
    2796         1801 :         // Remember the new value, so that when we listen for the flush
    2797         1801 :         // to finish, we know when the flush that we initiated has
    2798         1801 :         // finished, instead of some other flush that was started earlier.
    2799         1801 :         let mut my_flush_request = 0;
    2800         1801 : 
    2801         1801 :         let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
    2802         1801 :         if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
    2803           44 :             anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
    2804         1757 :         }
    2805         1757 : 
    2806         1757 :         self.layer_flush_start_tx.send_modify(|counter| {
    2807         1757 :             my_flush_request = *counter + 1;
    2808         1757 :             *counter = my_flush_request;
    2809         1757 :         });
    2810              : 
    2811         3513 :         loop {
    2812         3513 :             {
    2813         3513 :                 let (last_result_counter, last_result) = &*rx.borrow();
    2814         3513 :                 if *last_result_counter >= my_flush_request {
    2815         1756 :                     if let Err(_err) = last_result {
    2816              :                         // We already logged the original error in
    2817              :                         // flush_loop. We cannot propagate it to the caller
    2818              :                         // here, because it might not be Cloneable
    2819            0 :                         anyhow::bail!(
    2820            0 :                             "Could not flush frozen layer. Request id: {}",
    2821            0 :                             my_flush_request
    2822            0 :                         );
    2823              :                     } else {
    2824         1756 :                         return Ok(());
    2825              :                     }
    2826         1757 :                 }
    2827              :             }
    2828            0 :             trace!("waiting for flush to complete");
    2829         1757 :             tokio::select! {
    2830         1756 :                 rx_e = rx.changed() => {
    2831              :                     rx_e?;
    2832              :                 },
    2833              :                 // Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring
    2834              :                 // the notification from [`flush_loop`] that it completed.
    2835              :                 _ = self.cancel.cancelled() => {
    2836            1 :                     tracing::info!("Cancelled layer flush due on timeline shutdown");
    2837              :                     return Ok(())
    2838              :                 }
    2839              :             };
    2840            0 :             trace!("done")
    2841              :         }
    2842         1801 :     }
    2843              : 
    2844         3688 :     fn flush_frozen_layers(&self) {
    2845         3688 :         self.layer_flush_start_tx.send_modify(|val| *val += 1);
    2846         3688 :     }
    2847              : 
    2848              :     /// Flush one frozen in-memory layer to disk, as a new delta layer.
    2849         5184 :     #[instrument(skip_all, fields(layer=%frozen_layer))]
    2850              :     async fn flush_frozen_layer(
    2851              :         self: &Arc<Self>,
    2852              :         frozen_layer: Arc<InMemoryLayer>,
    2853              :         ctx: &RequestContext,
    2854              :     ) -> Result<(), FlushLayerError> {
    2855              :         debug_assert_current_span_has_tenant_and_timeline_id();
    2856              :         // As a special case, when we have just imported an image into the repository,
    2857              :         // instead of writing out a L0 delta layer, we directly write out image layer
    2858              :         // files instead. This is possible as long as *all* the data imported into the
    2859              :         // repository have the same LSN.
    2860              :         let lsn_range = frozen_layer.get_lsn_range();
    2861              :         let (layers_to_upload, delta_layer_to_add) =
    2862              :             if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
    2863              :                 #[cfg(test)]
    2864              :                 match &mut *self.flush_loop_state.lock().unwrap() {
    2865              :                     FlushLoopState::NotStarted | FlushLoopState::Exited => {
    2866              :                         panic!("flush loop not running")
    2867              :                     }
    2868              :                     FlushLoopState::Running {
    2869              :                         initdb_optimization_count,
    2870              :                         ..
    2871              :                     } => {
    2872              :                         *initdb_optimization_count += 1;
    2873              :                     }
    2874              :                 }
    2875              :                 // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
    2876              :                 // require downloading anything during initial import.
    2877              :                 let (partitioning, _lsn) = self
    2878              :                     .repartition(
    2879              :                         self.initdb_lsn,
    2880              :                         self.get_compaction_target_size(),
    2881              :                         EnumSet::empty(),
    2882              :                         ctx,
    2883              :                     )
    2884              :                     .await?;
    2885              : 
    2886              :                 if self.cancel.is_cancelled() {
    2887              :                     return Err(FlushLayerError::Cancelled);
    2888              :                 }
    2889              : 
    2890              :                 // For image layers, we add them immediately into the layer map.
    2891              :                 (
    2892              :                     self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
    2893              :                         .await?,
    2894              :                     None,
    2895              :                 )
    2896              :             } else {
    2897              :                 #[cfg(test)]
    2898              :                 match &mut *self.flush_loop_state.lock().unwrap() {
    2899              :                     FlushLoopState::NotStarted | FlushLoopState::Exited => {
    2900              :                         panic!("flush loop not running")
    2901              :                     }
    2902              :                     FlushLoopState::Running {
    2903              :                         expect_initdb_optimization,
    2904              :                         ..
    2905              :                     } => {
    2906              :                         assert!(!*expect_initdb_optimization, "expected initdb optimization");
    2907              :                     }
    2908              :                 }
    2909              :                 // Normal case, write out a L0 delta layer file.
    2910              :                 // `create_delta_layer` will not modify the layer map.
    2911              :                 // We will remove frozen layer and add delta layer in one atomic operation later.
    2912              :                 let layer = self.create_delta_layer(&frozen_layer, ctx).await?;
    2913              :                 (
    2914              :                     // FIXME: even though we have a single image and single delta layer assumption
    2915              :                     // we push them to vec
    2916              :                     vec![layer.clone()],
    2917              :                     Some(layer),
    2918              :                 )
    2919              :             };
    2920              : 
    2921         5264 :         pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
    2922              : 
    2923              :         if self.cancel.is_cancelled() {
    2924              :             return Err(FlushLayerError::Cancelled);
    2925              :         }
    2926              : 
    2927              :         let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
    2928              :         let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
    2929              : 
    2930              :         // The new on-disk layers are now in the layer map. We can remove the
    2931              :         // in-memory layer from the map now. The flushed layer is stored in
    2932              :         // the mapping in `create_delta_layer`.
    2933              :         let metadata = {
    2934              :             let mut guard = self.layers.write().await;
    2935              : 
    2936              :             if self.cancel.is_cancelled() {
    2937              :                 return Err(FlushLayerError::Cancelled);
    2938              :             }
    2939              : 
    2940              :             guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
    2941              : 
    2942              :             if disk_consistent_lsn != old_disk_consistent_lsn {
    2943              :                 assert!(disk_consistent_lsn > old_disk_consistent_lsn);
    2944              :                 self.disk_consistent_lsn.store(disk_consistent_lsn);
    2945              : 
    2946              :                 // Schedule remote uploads that will reflect our new disk_consistent_lsn
    2947              :                 Some(self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?)
    2948              :             } else {
    2949              :                 None
    2950              :             }
    2951              :             // release lock on 'layers'
    2952              :         };
    2953              : 
    2954              :         // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
    2955              :         // a compaction can delete the file and then it won't be available for uploads any more.
    2956              :         // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
    2957              :         // race situation.
    2958              :         // See https://github.com/neondatabase/neon/issues/4526
    2959         5263 :         pausable_failpoint!("flush-frozen-pausable");
    2960              : 
    2961              :         // This failpoint is used by another test case `test_pageserver_recovery`.
    2962            0 :         fail_point!("flush-frozen-exit");
    2963              : 
    2964              :         // Update the metadata file, with new 'disk_consistent_lsn'
    2965              :         //
    2966              :         // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
    2967              :         // *all* the layers, to avoid fsyncing the file multiple times.
    2968              : 
    2969              :         // If we updated our disk_consistent_lsn, persist the updated metadata to local disk.
    2970              :         if let Some(metadata) = metadata {
    2971              :             save_metadata(
    2972              :                 self.conf,
    2973              :                 &self.tenant_shard_id,
    2974              :                 &self.timeline_id,
    2975              :                 &metadata,
    2976              :             )
    2977              :             .await
    2978              :             .context("save_metadata")?;
    2979              :         }
    2980              :         Ok(())
    2981              :     }
    2982              : 
    2983              :     /// Update metadata file
    2984         5283 :     fn schedule_uploads(
    2985         5283 :         &self,
    2986         5283 :         disk_consistent_lsn: Lsn,
    2987         5283 :         layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
    2988         5283 :     ) -> anyhow::Result<TimelineMetadata> {
    2989         5283 :         // We can only save a valid 'prev_record_lsn' value on disk if we
    2990         5283 :         // flushed *all* in-memory changes to disk. We only track
    2991         5283 :         // 'prev_record_lsn' in memory for the latest processed record, so we
    2992         5283 :         // don't remember what the correct value that corresponds to some old
    2993         5283 :         // LSN is. But if we flush everything, then the value corresponding
    2994         5283 :         // current 'last_record_lsn' is correct and we can store it on disk.
    2995         5283 :         let RecordLsn {
    2996         5283 :             last: last_record_lsn,
    2997         5283 :             prev: prev_record_lsn,
    2998         5283 :         } = self.last_record_lsn.load();
    2999         5283 :         let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
    3000         1528 :             Some(prev_record_lsn)
    3001              :         } else {
    3002         3755 :             None
    3003              :         };
    3004              : 
    3005         5283 :         let ancestor_timeline_id = self
    3006         5283 :             .ancestor_timeline
    3007         5283 :             .as_ref()
    3008         5283 :             .map(|ancestor| ancestor.timeline_id);
    3009         5283 : 
    3010         5283 :         let metadata = TimelineMetadata::new(
    3011         5283 :             disk_consistent_lsn,
    3012         5283 :             ondisk_prev_record_lsn,
    3013         5283 :             ancestor_timeline_id,
    3014         5283 :             self.ancestor_lsn,
    3015         5283 :             *self.latest_gc_cutoff_lsn.read(),
    3016         5283 :             self.initdb_lsn,
    3017         5283 :             self.pg_version,
    3018         5283 :         );
    3019         5283 : 
    3020         5283 :         fail_point!("checkpoint-before-saving-metadata", |x| bail!(
    3021            0 :             "{}",
    3022            0 :             x.unwrap()
    3023         5283 :         ));
    3024              : 
    3025         5283 :         if let Some(remote_client) = &self.remote_client {
    3026        10546 :             for layer in layers_to_upload {
    3027         5263 :                 remote_client.schedule_layer_file_upload(layer)?;
    3028              :             }
    3029         5283 :             remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
    3030            0 :         }
    3031              : 
    3032         5283 :         Ok(metadata)
    3033         5283 :     }
    3034              : 
    3035           20 :     async fn update_metadata_file(
    3036           20 :         &self,
    3037           20 :         disk_consistent_lsn: Lsn,
    3038           20 :         layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
    3039           20 :     ) -> anyhow::Result<()> {
    3040           20 :         let metadata = self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?;
    3041              : 
    3042           20 :         save_metadata(
    3043           20 :             self.conf,
    3044           20 :             &self.tenant_shard_id,
    3045           20 :             &self.timeline_id,
    3046           20 :             &metadata,
    3047           20 :         )
    3048            0 :         .await
    3049           20 :         .context("save_metadata")?;
    3050              : 
    3051           20 :         Ok(())
    3052           20 :     }
    3053              : 
    3054            2 :     pub(crate) async fn preserve_initdb_archive(&self) -> anyhow::Result<()> {
    3055            2 :         if let Some(remote_client) = &self.remote_client {
    3056            2 :             remote_client
    3057            2 :                 .preserve_initdb_archive(
    3058            2 :                     &self.tenant_shard_id.tenant_id,
    3059            2 :                     &self.timeline_id,
    3060            2 :                     &self.cancel,
    3061            2 :                 )
    3062            2 :                 .await?;
    3063              :         } else {
    3064            0 :             bail!("No remote storage configured, but was asked to backup the initdb archive for {} / {}", self.tenant_shard_id.tenant_id, self.timeline_id);
    3065              :         }
    3066            2 :         Ok(())
    3067            2 :     }
    3068              : 
    3069              :     // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
    3070              :     // in layer map immediately. The caller is responsible to put it into the layer map.
    3071         5186 :     async fn create_delta_layer(
    3072         5186 :         self: &Arc<Self>,
    3073         5186 :         frozen_layer: &Arc<InMemoryLayer>,
    3074         5186 :         ctx: &RequestContext,
    3075         5186 :     ) -> anyhow::Result<ResidentLayer> {
    3076         5186 :         let span = tracing::info_span!("blocking");
    3077         5186 :         let new_delta: ResidentLayer = tokio::task::spawn_blocking({
    3078         5186 :             let self_clone = Arc::clone(self);
    3079         5186 :             let frozen_layer = Arc::clone(frozen_layer);
    3080         5186 :             let ctx = ctx.attached_child();
    3081         5186 :             move || {
    3082         5186 :                 // Write it out
    3083         5186 :                 // Keep this inside `spawn_blocking` and `Handle::current`
    3084         5186 :                 // as long as the write path is still sync and the read impl
    3085         5186 :                 // is still not fully async. Otherwise executor threads would
    3086         5186 :                 // be blocked.
    3087         5186 :                 let _g = span.entered();
    3088         5186 :                 let new_delta =
    3089         5186 :                     Handle::current().block_on(frozen_layer.write_to_disk(&self_clone, &ctx))?;
    3090         5186 :                 let new_delta_path = new_delta.local_path().to_owned();
    3091         5186 : 
    3092         5186 :                 // Sync it to disk.
    3093         5186 :                 //
    3094         5186 :                 // We must also fsync the timeline dir to ensure the directory entries for
    3095         5186 :                 // new layer files are durable.
    3096         5186 :                 //
    3097         5186 :                 // NB: timeline dir must be synced _after_ the file contents are durable.
    3098         5186 :                 // So, two separate fsyncs are required, they mustn't be batched.
    3099         5186 :                 //
    3100         5186 :                 // TODO: If we're running inside 'flush_frozen_layers' and there are multiple
    3101         5186 :                 // files to flush, the fsync overhead can be reduces as follows:
    3102         5186 :                 // 1. write them all to temporary file names
    3103         5186 :                 // 2. fsync them
    3104         5186 :                 // 3. rename to the final name
    3105         5186 :                 // 4. fsync the parent directory.
    3106         5186 :                 // Note that (1),(2),(3) today happen inside write_to_disk().
    3107         5186 :                 //
    3108         5186 :                 // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
    3109         5186 :                 par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
    3110         5186 :                 par_fsync::par_fsync(&[self_clone
    3111         5186 :                     .conf
    3112         5186 :                     .timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id)])
    3113         5186 :                 .context("fsync of timeline dir")?;
    3114              : 
    3115         5184 :                 anyhow::Ok(new_delta)
    3116         5186 :             }
    3117         5186 :         })
    3118         5184 :         .await
    3119         5184 :         .context("spawn_blocking")
    3120         5184 :         .and_then(|x| x)?;
    3121              : 
    3122         5184 :         Ok(new_delta)
    3123         5184 :     }
    3124              : 
    3125         1603 :     async fn repartition(
    3126         1603 :         &self,
    3127         1603 :         lsn: Lsn,
    3128         1603 :         partition_size: u64,
    3129         1603 :         flags: EnumSet<CompactFlags>,
    3130         1603 :         ctx: &RequestContext,
    3131         1603 :     ) -> anyhow::Result<(KeyPartitioning, Lsn)> {
    3132         1603 :         {
    3133         1603 :             let partitioning_guard = self.partitioning.lock().unwrap();
    3134         1603 :             let distance = lsn.0 - partitioning_guard.1 .0;
    3135         1603 :             if partitioning_guard.1 != Lsn(0)
    3136          900 :                 && distance <= self.repartition_threshold
    3137          719 :                 && !flags.contains(CompactFlags::ForceRepartition)
    3138              :             {
    3139            0 :                 debug!(
    3140            0 :                     distance,
    3141            0 :                     threshold = self.repartition_threshold,
    3142            0 :                     "no repartitioning needed"
    3143            0 :                 );
    3144          717 :                 return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
    3145          886 :             }
    3146              :         }
    3147       213408 :         let keyspace = self.collect_keyspace(lsn, ctx).await?;
    3148          882 :         let partitioning = keyspace.partition(partition_size);
    3149          882 : 
    3150          882 :         let mut partitioning_guard = self.partitioning.lock().unwrap();
    3151          882 :         if lsn > partitioning_guard.1 {
    3152          882 :             *partitioning_guard = (partitioning, lsn);
    3153          882 :         } else {
    3154            0 :             warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
    3155              :         }
    3156          882 :         Ok((partitioning_guard.0.clone(), partitioning_guard.1))
    3157         1602 :     }
    3158              : 
    3159              :     // Is it time to create a new image layer for the given partition?
    3160        37180 :     async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
    3161        37180 :         let threshold = self.get_image_creation_threshold();
    3162              : 
    3163        37180 :         let guard = self.layers.read().await;
    3164        37180 :         let layers = guard.layer_map();
    3165        37180 : 
    3166        37180 :         let mut max_deltas = 0;
    3167        37180 :         {
    3168        37180 :             let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
    3169        37180 :             if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
    3170         6220 :                 let img_range =
    3171         6220 :                     partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
    3172         6220 :                 if wanted.overlaps(&img_range) {
    3173              :                     //
    3174              :                     // gc_timeline only pays attention to image layers that are older than the GC cutoff,
    3175              :                     // but create_image_layers creates image layers at last-record-lsn.
    3176              :                     // So it's possible that gc_timeline wants a new image layer to be created for a key range,
    3177              :                     // but the range is already covered by image layers at more recent LSNs. Before we
    3178              :                     // create a new image layer, check if the range is already covered at more recent LSNs.
    3179            0 :                     if !layers
    3180            0 :                         .image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))
    3181              :                     {
    3182            0 :                         debug!(
    3183            0 :                             "Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
    3184            0 :                             img_range.start, img_range.end, cutoff_lsn, lsn
    3185            0 :                         );
    3186            0 :                         return true;
    3187            0 :                     }
    3188         6220 :                 }
    3189        30960 :             }
    3190              :         }
    3191              : 
    3192      2066684 :         for part_range in &partition.ranges {
    3193      2035454 :             let image_coverage = layers.image_coverage(part_range, lsn);
    3194      3884589 :             for (img_range, last_img) in image_coverage {
    3195      1855085 :                 let img_lsn = if let Some(last_img) = last_img {
    3196       365293 :                     last_img.get_lsn_range().end
    3197              :                 } else {
    3198      1489792 :                     Lsn(0)
    3199              :                 };
    3200              :                 // Let's consider an example:
    3201              :                 //
    3202              :                 // delta layer with LSN range 71-81
    3203              :                 // delta layer with LSN range 81-91
    3204              :                 // delta layer with LSN range 91-101
    3205              :                 // image layer at LSN 100
    3206              :                 //
    3207              :                 // If 'lsn' is still 100, i.e. no new WAL has been processed since the last image layer,
    3208              :                 // there's no need to create a new one. We check this case explicitly, to avoid passing
    3209              :                 // a bogus range to count_deltas below, with start > end. It's even possible that there
    3210              :                 // are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
    3211              :                 // after we read last_record_lsn, which is passed here in the 'lsn' argument.
    3212      1855085 :                 if img_lsn < lsn {
    3213      1820118 :                     let num_deltas =
    3214      1820118 :                         layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold));
    3215      1820118 : 
    3216      1820118 :                     max_deltas = max_deltas.max(num_deltas);
    3217      1820118 :                     if num_deltas >= threshold {
    3218            0 :                         debug!(
    3219            0 :                             "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
    3220            0 :                             img_range.start, img_range.end, num_deltas, img_lsn, lsn
    3221            0 :                         );
    3222         5949 :                         return true;
    3223      1814168 :                     }
    3224        34967 :                 }
    3225              :             }
    3226              :         }
    3227              : 
    3228            0 :         debug!(
    3229            0 :             max_deltas,
    3230            0 :             "none of the partitioned ranges had >= {threshold} deltas"
    3231            0 :         );
    3232        31230 :         false
    3233        37179 :     }
    3234              : 
    3235            0 :     #[tracing::instrument(skip_all, fields(%lsn, %force))]
    3236              :     async fn create_image_layers(
    3237              :         self: &Arc<Timeline>,
    3238              :         partitioning: &KeyPartitioning,
    3239              :         lsn: Lsn,
    3240              :         force: bool,
    3241              :         ctx: &RequestContext,
    3242              :     ) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
    3243              :         let timer = self.metrics.create_images_time_histo.start_timer();
    3244              :         let mut image_layers = Vec::new();
    3245              : 
    3246              :         // We need to avoid holes between generated image layers.
    3247              :         // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
    3248              :         // image layer with hole between them. In this case such layer can not be utilized by GC.
    3249              :         //
    3250              :         // How such hole between partitions can appear?
    3251              :         // if we have relation with relid=1 and size 100 and relation with relid=2 with size 200 then result of
    3252              :         // KeySpace::partition may contain partitions <100000000..100000099> and <200000000..200000199>.
    3253              :         // If there is delta layer <100000000..300000000> then it never be garbage collected because
    3254              :         // image layers  <100000000..100000099> and <200000000..200000199> are not completely covering it.
    3255              :         let mut start = Key::MIN;
    3256              : 
    3257              :         for partition in partitioning.parts.iter() {
    3258              :             let img_range = start..partition.ranges.last().unwrap().end;
    3259              :             start = img_range.end;
    3260              :             if force || self.time_for_new_image_layer(partition, lsn).await {
    3261              :                 let mut image_layer_writer = ImageLayerWriter::new(
    3262              :                     self.conf,
    3263              :                     self.timeline_id,
    3264              :                     self.tenant_shard_id,
    3265              :                     &img_range,
    3266              :                     lsn,
    3267              :                 )
    3268              :                 .await?;
    3269              : 
    3270            0 :                 fail_point!("image-layer-writer-fail-before-finish", |_| {
    3271            0 :                     Err(CreateImageLayersError::Other(anyhow::anyhow!(
    3272            0 :                         "failpoint image-layer-writer-fail-before-finish"
    3273            0 :                     )))
    3274            0 :                 });
    3275              : 
    3276              :                 let mut key_request_accum = KeySpaceAccum::new();
    3277              :                 for range in &partition.ranges {
    3278              :                     let mut key = range.start;
    3279              :                     while key < range.end {
    3280              :                         if self.shard_identity.is_key_disposable(&key) {
    3281            0 :                             debug!(
    3282            0 :                                 "Dropping key {} during compaction (it belongs on shard {:?})",
    3283            0 :                                 key,
    3284            0 :                                 self.shard_identity.get_shard_number(&key)
    3285            0 :                             );
    3286              :                             key = key.next();
    3287              :                             continue;
    3288              :                         }
    3289              : 
    3290              :                         key_request_accum.add_key(key);
    3291              :                         if key_request_accum.size() >= Timeline::MAX_GET_VECTORED_KEYS
    3292              :                             || key.next() == range.end
    3293              :                         {
    3294              :                             let results = self
    3295              :                                 .get_vectored(
    3296              :                                     &key_request_accum.consume_keyspace().ranges,
    3297              :                                     lsn,
    3298              :                                     ctx,
    3299              :                                 )
    3300              :                                 .await?;
    3301              : 
    3302              :                             for (img_key, img) in results {
    3303              :                                 let img = match img {
    3304              :                                     Ok(img) => img,
    3305              :                                     Err(err) => {
    3306              :                                         // If we fail to reconstruct a VM or FSM page, we can zero the
    3307              :                                         // page without losing any actual user data. That seems better
    3308              :                                         // than failing repeatedly and getting stuck.
    3309              :                                         //
    3310              :                                         // We had a bug at one point, where we truncated the FSM and VM
    3311              :                                         // in the pageserver, but the Postgres didn't know about that
    3312              :                                         // and continued to generate incremental WAL records for pages
    3313              :                                         // that didn't exist in the pageserver. Trying to replay those
    3314              :                                         // WAL records failed to find the previous image of the page.
    3315              :                                         // This special case allows us to recover from that situation.
    3316              :                                         // See https://github.com/neondatabase/neon/issues/2601.
    3317              :                                         //
    3318              :                                         // Unfortunately we cannot do this for the main fork, or for
    3319              :                                         // any metadata keys, keys, as that would lead to actual data
    3320              :                                         // loss.
    3321              :                                         if is_rel_fsm_block_key(img_key)
    3322              :                                             || is_rel_vm_block_key(img_key)
    3323              :                                         {
    3324            0 :                                             warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
    3325              :                                             ZERO_PAGE.clone()
    3326              :                                         } else {
    3327              :                                             return Err(
    3328              :                                                 CreateImageLayersError::PageReconstructError(err),
    3329              :                                             );
    3330              :                                         }
    3331              :                                     }
    3332              :                                 };
    3333              : 
    3334              :                                 image_layer_writer.put_image(img_key, &img).await?;
    3335              :                             }
    3336              :                         }
    3337              : 
    3338              :                         key = key.next();
    3339              :                     }
    3340              :                 }
    3341              :                 let image_layer = image_layer_writer.finish(self).await?;
    3342              :                 image_layers.push(image_layer);
    3343              :             }
    3344              :         }
    3345              :         // All layers that the GC wanted us to create have now been created.
    3346              :         //
    3347              :         // It's possible that another GC cycle happened while we were compacting, and added
    3348              :         // something new to wanted_image_layers, and we now clear that before processing it.
    3349              :         // That's OK, because the next GC iteration will put it back in.
    3350              :         *self.wanted_image_layers.lock().unwrap() = None;
    3351              : 
    3352              :         // Sync the new layer to disk before adding it to the layer map, to make sure
    3353              :         // we don't garbage collect something based on the new layer, before it has
    3354              :         // reached the disk.
    3355              :         //
    3356              :         // We must also fsync the timeline dir to ensure the directory entries for
    3357              :         // new layer files are durable
    3358              :         //
    3359              :         // Compaction creates multiple image layers. It would be better to create them all
    3360              :         // and fsync them all in parallel.
    3361              :         let all_paths = image_layers
    3362              :             .iter()
    3363         6029 :             .map(|layer| layer.local_path().to_owned())
    3364              :             .collect::<Vec<_>>();
    3365              : 
    3366              :         par_fsync::par_fsync_async(&all_paths)
    3367              :             .await
    3368              :             .context("fsync of newly created layer files")?;
    3369              : 
    3370              :         if !all_paths.is_empty() {
    3371              :             par_fsync::par_fsync_async(&[self
    3372              :                 .conf
    3373              :                 .timeline_path(&self.tenant_shard_id, &self.timeline_id)])
    3374              :             .await
    3375              :             .context("fsync of timeline dir")?;
    3376              :         }
    3377              : 
    3378              :         let mut guard = self.layers.write().await;
    3379              : 
    3380              :         // FIXME: we could add the images to be uploaded *before* returning from here, but right
    3381              :         // now they are being scheduled outside of write lock
    3382              :         guard.track_new_image_layers(&image_layers, &self.metrics);
    3383              :         drop_wlock(guard);
    3384              :         timer.stop_and_record();
    3385              : 
    3386              :         Ok(image_layers)
    3387              :     }
    3388              : 
    3389              :     /// Wait until the background initial logical size calculation is complete, or
    3390              :     /// this Timeline is shut down.  Calling this function will cause the initial
    3391              :     /// logical size calculation to skip waiting for the background jobs barrier.
    3392          245 :     pub(crate) async fn await_initial_logical_size(self: Arc<Self>) {
    3393          245 :         if let Some(await_bg_cancel) = self
    3394          245 :             .current_logical_size
    3395          245 :             .cancel_wait_for_background_loop_concurrency_limit_semaphore
    3396          245 :             .get()
    3397          239 :         {
    3398          239 :             await_bg_cancel.cancel();
    3399          239 :         } else {
    3400              :             // We should not wait if we were not able to explicitly instruct
    3401              :             // the logical size cancellation to skip the concurrency limit semaphore.
    3402              :             // TODO: this is an unexpected case.  We should restructure so that it
    3403              :             // can't happen.
    3404            6 :             tracing::info!(
    3405            6 :                 "await_initial_logical_size: can't get semaphore cancel token, skipping"
    3406            6 :             );
    3407              :         }
    3408              : 
    3409          245 :         tokio::select!(
    3410          476 :             _ = self.current_logical_size.initialized.acquire() => {},
    3411          476 :             _ = self.cancel.cancelled() => {}
    3412          476 :         )
    3413          237 :     }
    3414              : }
    3415              : 
    3416         1223 : #[derive(Default)]
    3417              : struct CompactLevel0Phase1Result {
    3418              :     new_layers: Vec<ResidentLayer>,
    3419              :     deltas_to_compact: Vec<Layer>,
    3420              : }
    3421              : 
    3422              : /// Top-level failure to compact.
    3423            0 : #[derive(Debug, thiserror::Error)]
    3424              : pub(crate) enum CompactionError {
    3425              :     #[error("The timeline or pageserver is shutting down")]
    3426              :     ShuttingDown,
    3427              :     /// Compaction cannot be done right now; page reconstruction and so on.
    3428              :     #[error(transparent)]
    3429              :     Other(#[from] anyhow::Error),
    3430              : }
    3431              : 
    3432              : #[serde_as]
    3433         4088 : #[derive(serde::Serialize)]
    3434              : struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
    3435              : 
    3436        10633 : #[derive(Default)]
    3437              : enum DurationRecorder {
    3438              :     #[default]
    3439              :     NotStarted,
    3440              :     Recorded(RecordedDuration, tokio::time::Instant),
    3441              : }
    3442              : 
    3443              : impl DurationRecorder {
    3444         2983 :     fn till_now(&self) -> DurationRecorder {
    3445         2983 :         match self {
    3446              :             DurationRecorder::NotStarted => {
    3447            0 :                 panic!("must only call on recorded measurements")
    3448              :             }
    3449         2983 :             DurationRecorder::Recorded(_, ended) => {
    3450         2983 :                 let now = tokio::time::Instant::now();
    3451         2983 :                 DurationRecorder::Recorded(RecordedDuration(now - *ended), now)
    3452         2983 :             }
    3453         2983 :         }
    3454         2983 :     }
    3455         2044 :     fn into_recorded(self) -> Option<RecordedDuration> {
    3456         2044 :         match self {
    3457            0 :             DurationRecorder::NotStarted => None,
    3458         2044 :             DurationRecorder::Recorded(recorded, _) => Some(recorded),
    3459              :         }
    3460         2044 :     }
    3461              : }
    3462              : 
    3463         1519 : #[derive(Default)]
    3464              : struct CompactLevel0Phase1StatsBuilder {
    3465              :     version: Option<u64>,
    3466              :     tenant_id: Option<TenantShardId>,
    3467              :     timeline_id: Option<TimelineId>,
    3468              :     read_lock_acquisition_micros: DurationRecorder,
    3469              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    3470              :     read_lock_held_key_sort_micros: DurationRecorder,
    3471              :     read_lock_held_prerequisites_micros: DurationRecorder,
    3472              :     read_lock_held_compute_holes_micros: DurationRecorder,
    3473              :     read_lock_drop_micros: DurationRecorder,
    3474              :     write_layer_files_micros: DurationRecorder,
    3475              :     level0_deltas_count: Option<usize>,
    3476              :     new_deltas_count: Option<usize>,
    3477              :     new_deltas_size: Option<u64>,
    3478              : }
    3479              : 
    3480          292 : #[derive(serde::Serialize)]
    3481              : struct CompactLevel0Phase1Stats {
    3482              :     version: u64,
    3483              :     tenant_id: TenantShardId,
    3484              :     timeline_id: TimelineId,
    3485              :     read_lock_acquisition_micros: RecordedDuration,
    3486              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    3487              :     read_lock_held_key_sort_micros: RecordedDuration,
    3488              :     read_lock_held_prerequisites_micros: RecordedDuration,
    3489              :     read_lock_held_compute_holes_micros: RecordedDuration,
    3490              :     read_lock_drop_micros: RecordedDuration,
    3491              :     write_layer_files_micros: RecordedDuration,
    3492              :     level0_deltas_count: usize,
    3493              :     new_deltas_count: usize,
    3494              :     new_deltas_size: u64,
    3495              : }
    3496              : 
    3497              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    3498              :     type Error = anyhow::Error;
    3499              : 
    3500          292 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    3501          292 :         Ok(Self {
    3502          292 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    3503          292 :             tenant_id: value
    3504          292 :                 .tenant_id
    3505          292 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    3506          292 :             timeline_id: value
    3507          292 :                 .timeline_id
    3508          292 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    3509          292 :             read_lock_acquisition_micros: value
    3510          292 :                 .read_lock_acquisition_micros
    3511          292 :                 .into_recorded()
    3512          292 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    3513          292 :             read_lock_held_spawn_blocking_startup_micros: value
    3514          292 :                 .read_lock_held_spawn_blocking_startup_micros
    3515          292 :                 .into_recorded()
    3516          292 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    3517          292 :             read_lock_held_key_sort_micros: value
    3518          292 :                 .read_lock_held_key_sort_micros
    3519          292 :                 .into_recorded()
    3520          292 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    3521          292 :             read_lock_held_prerequisites_micros: value
    3522          292 :                 .read_lock_held_prerequisites_micros
    3523          292 :                 .into_recorded()
    3524          292 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    3525          292 :             read_lock_held_compute_holes_micros: value
    3526          292 :                 .read_lock_held_compute_holes_micros
    3527          292 :                 .into_recorded()
    3528          292 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    3529          292 :             read_lock_drop_micros: value
    3530          292 :                 .read_lock_drop_micros
    3531          292 :                 .into_recorded()
    3532          292 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    3533          292 :             write_layer_files_micros: value
    3534          292 :                 .write_layer_files_micros
    3535          292 :                 .into_recorded()
    3536          292 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    3537          292 :             level0_deltas_count: value
    3538          292 :                 .level0_deltas_count
    3539          292 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    3540          292 :             new_deltas_count: value
    3541          292 :                 .new_deltas_count
    3542          292 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    3543          292 :             new_deltas_size: value
    3544          292 :                 .new_deltas_size
    3545          292 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    3546              :         })
    3547          292 :     }
    3548              : }
    3549              : 
    3550              : impl Timeline {
    3551              :     /// Level0 files first phase of compaction, explained in the [`Self::compact`] comment.
    3552         1519 :     async fn compact_level0_phase1(
    3553         1519 :         self: &Arc<Self>,
    3554         1519 :         guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
    3555         1519 :         mut stats: CompactLevel0Phase1StatsBuilder,
    3556         1519 :         target_file_size: u64,
    3557         1519 :         ctx: &RequestContext,
    3558         1519 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
    3559         1519 :         stats.read_lock_held_spawn_blocking_startup_micros =
    3560         1519 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
    3561         1519 :         let layers = guard.layer_map();
    3562         1519 :         let level0_deltas = layers.get_level0_deltas()?;
    3563         1519 :         let mut level0_deltas = level0_deltas
    3564         1519 :             .into_iter()
    3565         6971 :             .map(|x| guard.get_from_desc(&x))
    3566         1519 :             .collect_vec();
    3567         1519 :         stats.level0_deltas_count = Some(level0_deltas.len());
    3568         1519 :         // Only compact if enough layers have accumulated.
    3569         1519 :         let threshold = self.get_compaction_threshold();
    3570         1519 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
    3571            0 :             debug!(
    3572            0 :                 level0_deltas = level0_deltas.len(),
    3573            0 :                 threshold, "too few deltas to compact"
    3574            0 :             );
    3575         1223 :             return Ok(CompactLevel0Phase1Result::default());
    3576          296 :         }
    3577          296 : 
    3578          296 :         // This failpoint is used together with `test_duplicate_layers` integration test.
    3579          296 :         // It returns the compaction result exactly the same layers as input to compaction.
    3580          296 :         // We want to ensure that this will not cause any problem when updating the layer map
    3581          296 :         // after the compaction is finished.
    3582          296 :         //
    3583          296 :         // Currently, there are two rare edge cases that will cause duplicated layers being
    3584          296 :         // inserted.
    3585          296 :         // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
    3586          296 :         //    is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
    3587          296 :         //    map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
    3588          296 :         //    point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
    3589          296 :         //    and this causes an overwrite. This is acceptable because the content is the same, and we should do a
    3590          296 :         //    layer replace instead of the normal remove / upload process.
    3591          296 :         // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
    3592          296 :         //    size length. Compaction will likely create the same set of n files afterwards.
    3593          296 :         //
    3594          296 :         // This failpoint is a superset of both of the cases.
    3595          296 :         if cfg!(feature = "testing") {
    3596          296 :             let active = (|| {
    3597          296 :                 ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
    3598          296 :                 false
    3599          296 :             })();
    3600          296 : 
    3601          296 :             if active {
    3602            3 :                 let mut new_layers = Vec::with_capacity(level0_deltas.len());
    3603           43 :                 for delta in &level0_deltas {
    3604              :                     // we are just faking these layers as being produced again for this failpoint
    3605           40 :                     new_layers.push(
    3606           40 :                         delta
    3607           40 :                             .download_and_keep_resident()
    3608            0 :                             .await
    3609           40 :                             .context("download layer for failpoint")?,
    3610              :                     );
    3611              :                 }
    3612            3 :                 tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
    3613            3 :                 return Ok(CompactLevel0Phase1Result {
    3614            3 :                     new_layers,
    3615            3 :                     deltas_to_compact: level0_deltas,
    3616            3 :                 });
    3617          293 :             }
    3618            0 :         }
    3619              : 
    3620              :         // Gather the files to compact in this iteration.
    3621              :         //
    3622              :         // Start with the oldest Level 0 delta file, and collect any other
    3623              :         // level 0 files that form a contiguous sequence, such that the end
    3624              :         // LSN of previous file matches the start LSN of the next file.
    3625              :         //
    3626              :         // Note that if the files don't form such a sequence, we might
    3627              :         // "compact" just a single file. That's a bit pointless, but it allows
    3628              :         // us to get rid of the level 0 file, and compact the other files on
    3629              :         // the next iteration. This could probably made smarter, but such
    3630              :         // "gaps" in the sequence of level 0 files should only happen in case
    3631              :         // of a crash, partial download from cloud storage, or something like
    3632              :         // that, so it's not a big deal in practice.
    3633         8190 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
    3634          293 :         let mut level0_deltas_iter = level0_deltas.iter();
    3635          293 : 
    3636          293 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
    3637          293 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
    3638          293 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
    3639          293 : 
    3640          293 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
    3641         4228 :         for l in level0_deltas_iter {
    3642         3935 :             let lsn_range = &l.layer_desc().lsn_range;
    3643         3935 : 
    3644         3935 :             if lsn_range.start != prev_lsn_end {
    3645            0 :                 break;
    3646         3935 :             }
    3647         3935 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
    3648         3935 :             prev_lsn_end = lsn_range.end;
    3649              :         }
    3650          293 :         let lsn_range = Range {
    3651          293 :             start: deltas_to_compact
    3652          293 :                 .first()
    3653          293 :                 .unwrap()
    3654          293 :                 .layer_desc()
    3655          293 :                 .lsn_range
    3656          293 :                 .start,
    3657          293 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
    3658          293 :         };
    3659              : 
    3660          293 :         info!(
    3661          293 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
    3662          293 :             lsn_range.start,
    3663          293 :             lsn_range.end,
    3664          293 :             deltas_to_compact.len(),
    3665          293 :             level0_deltas.len()
    3666          293 :         );
    3667              : 
    3668         4228 :         for l in deltas_to_compact.iter() {
    3669         4228 :             info!("compact includes {l}");
    3670              :         }
    3671              : 
    3672              :         // We don't need the original list of layers anymore. Drop it so that
    3673              :         // we don't accidentally use it later in the function.
    3674          293 :         drop(level0_deltas);
    3675          293 : 
    3676          293 :         stats.read_lock_held_prerequisites_micros = stats
    3677          293 :             .read_lock_held_spawn_blocking_startup_micros
    3678          293 :             .till_now();
    3679          293 : 
    3680          293 :         // Determine N largest holes where N is number of compacted layers.
    3681          293 :         let max_holes = deltas_to_compact.len();
    3682          293 :         let last_record_lsn = self.get_last_record_lsn();
    3683          293 :         let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
    3684          293 :         let min_hole_coverage_size = 3; // TODO: something more flexible?
    3685          293 : 
    3686          293 :         // min-heap (reserve space for one more element added before eviction)
    3687          293 :         let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
    3688          293 :         let mut prev: Option<Key> = None;
    3689          293 : 
    3690          293 :         let mut all_keys = Vec::new();
    3691              : 
    3692         4228 :         for l in deltas_to_compact.iter() {
    3693         4228 :             all_keys.extend(l.load_keys(ctx).await?);
    3694              :         }
    3695              : 
    3696              :         // FIXME: should spawn_blocking the rest of this function
    3697              : 
    3698              :         // The current stdlib sorting implementation is designed in a way where it is
    3699              :         // particularly fast where the slice is made up of sorted sub-ranges.
    3700    156655146 :         all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    3701          293 : 
    3702          293 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
    3703              : 
    3704     17514710 :         for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
    3705     17514710 :             if let Some(prev_key) = prev {
    3706              :                 // just first fast filter
    3707     17514417 :                 if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
    3708       203121 :                     let key_range = prev_key..next_key;
    3709       203121 :                     // Measuring hole by just subtraction of i128 representation of key range boundaries
    3710       203121 :                     // has not so much sense, because largest holes will corresponds field1/field2 changes.
    3711       203121 :                     // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
    3712       203121 :                     // That is why it is better to measure size of hole as number of covering image layers.
    3713       203121 :                     let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
    3714       203121 :                     if coverage_size >= min_hole_coverage_size {
    3715          211 :                         heap.push(Hole {
    3716          211 :                             key_range,
    3717          211 :                             coverage_size,
    3718          211 :                         });
    3719          211 :                         if heap.len() > max_holes {
    3720          123 :                             heap.pop(); // remove smallest hole
    3721          123 :                         }
    3722       202910 :                     }
    3723     17311296 :                 }
    3724          293 :             }
    3725     17514710 :             prev = Some(next_key.next());
    3726              :         }
    3727          293 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
    3728          293 :         drop_rlock(guard);
    3729          293 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
    3730          293 :         let mut holes = heap.into_vec();
    3731          293 :         holes.sort_unstable_by_key(|hole| hole.key_range.start);
    3732          293 :         let mut next_hole = 0; // index of next hole in holes vector
    3733          293 : 
    3734          293 :         // This iterator walks through all key-value pairs from all the layers
    3735          293 :         // we're compacting, in key, LSN order.
    3736          293 :         let all_values_iter = all_keys.iter();
    3737          293 : 
    3738          293 :         // This iterator walks through all keys and is needed to calculate size used by each key
    3739          293 :         let mut all_keys_iter = all_keys
    3740          293 :             .iter()
    3741     17503159 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
    3742     17502866 :             .coalesce(|mut prev, cur| {
    3743     17502866 :                 // Coalesce keys that belong to the same key pair.
    3744     17502866 :                 // This ensures that compaction doesn't put them
    3745     17502866 :                 // into different layer files.
    3746     17502866 :                 // Still limit this by the target file size,
    3747     17502866 :                 // so that we keep the size of the files in
    3748     17502866 :                 // check.
    3749     17502866 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
    3750     15129234 :                     prev.2 += cur.2;
    3751     15129234 :                     Ok(prev)
    3752              :                 } else {
    3753      2373632 :                     Err((prev, cur))
    3754              :                 }
    3755     17502866 :             });
    3756          293 : 
    3757          293 :         // Merge the contents of all the input delta layers into a new set
    3758          293 :         // of delta layers, based on the current partitioning.
    3759          293 :         //
    3760          293 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
    3761          293 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
    3762          293 :         // would be too large. In that case, we also split on the LSN dimension.
    3763          293 :         //
    3764          293 :         // LSN
    3765          293 :         //  ^
    3766          293 :         //  |
    3767          293 :         //  | +-----------+            +--+--+--+--+
    3768          293 :         //  | |           |            |  |  |  |  |
    3769          293 :         //  | +-----------+            |  |  |  |  |
    3770          293 :         //  | |           |            |  |  |  |  |
    3771          293 :         //  | +-----------+     ==>    |  |  |  |  |
    3772          293 :         //  | |           |            |  |  |  |  |
    3773          293 :         //  | +-----------+            |  |  |  |  |
    3774          293 :         //  | |           |            |  |  |  |  |
    3775          293 :         //  | +-----------+            +--+--+--+--+
    3776          293 :         //  |
    3777          293 :         //  +--------------> key
    3778          293 :         //
    3779          293 :         //
    3780          293 :         // If one key (X) has a lot of page versions:
    3781          293 :         //
    3782          293 :         // LSN
    3783          293 :         //  ^
    3784          293 :         //  |                                 (X)
    3785          293 :         //  | +-----------+            +--+--+--+--+
    3786          293 :         //  | |           |            |  |  |  |  |
    3787          293 :         //  | +-----------+            |  |  +--+  |
    3788          293 :         //  | |           |            |  |  |  |  |
    3789          293 :         //  | +-----------+     ==>    |  |  |  |  |
    3790          293 :         //  | |           |            |  |  +--+  |
    3791          293 :         //  | +-----------+            |  |  |  |  |
    3792          293 :         //  | |           |            |  |  |  |  |
    3793          293 :         //  | +-----------+            +--+--+--+--+
    3794          293 :         //  |
    3795          293 :         //  +--------------> key
    3796          293 :         // TODO: this actually divides the layers into fixed-size chunks, not
    3797          293 :         // based on the partitioning.
    3798          293 :         //
    3799          293 :         // TODO: we should also opportunistically materialize and
    3800          293 :         // garbage collect what we can.
    3801          293 :         let mut new_layers = Vec::new();
    3802          293 :         let mut prev_key: Option<Key> = None;
    3803          293 :         let mut writer: Option<DeltaLayerWriter> = None;
    3804          293 :         let mut key_values_total_size = 0u64;
    3805          293 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
    3806          293 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
    3807              : 
    3808              :         for &DeltaEntry {
    3809     17503158 :             key, lsn, ref val, ..
    3810     17503450 :         } in all_values_iter
    3811              :         {
    3812     17503158 :             let value = val.load(ctx).await?;
    3813     17503157 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
    3814     17503157 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    3815     17503157 :             if !same_key || lsn == dup_end_lsn {
    3816      2373923 :                 let mut next_key_size = 0u64;
    3817      2373923 :                 let is_dup_layer = dup_end_lsn.is_valid();
    3818      2373923 :                 dup_start_lsn = Lsn::INVALID;
    3819      2373923 :                 if !same_key {
    3820      2373885 :                     dup_end_lsn = Lsn::INVALID;
    3821      2373885 :                 }
    3822              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    3823      2373924 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    3824      2373924 :                     next_key_size = next_size;
    3825      2373924 :                     if key != next_key {
    3826      2373593 :                         if dup_end_lsn.is_valid() {
    3827           14 :                             // We are writting segment with duplicates:
    3828           14 :                             // place all remaining values of this key in separate segment
    3829           14 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    3830           14 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    3831      2373579 :                         }
    3832      2373593 :                         break;
    3833          331 :                     }
    3834          331 :                     key_values_total_size += next_size;
    3835          331 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    3836          331 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    3837          331 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    3838              :                         // Split key between multiple layers: such layer can contain only single key
    3839           38 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    3840           24 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    3841              :                         } else {
    3842           14 :                             lsn // start with the first LSN for this key
    3843              :                         };
    3844           38 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    3845           38 :                         break;
    3846          293 :                     }
    3847              :                 }
    3848              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    3849      2373923 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    3850            0 :                     dup_start_lsn = dup_end_lsn;
    3851            0 :                     dup_end_lsn = lsn_range.end;
    3852      2373923 :                 }
    3853      2373923 :                 if writer.is_some() {
    3854      2373630 :                     let written_size = writer.as_mut().unwrap().size();
    3855      2373630 :                     let contains_hole =
    3856      2373630 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    3857              :                     // check if key cause layer overflow or contains hole...
    3858      2373630 :                     if is_dup_layer
    3859      2373578 :                         || dup_end_lsn.is_valid()
    3860      2373567 :                         || written_size + key_values_total_size > target_file_size
    3861      2363277 :                         || contains_hole
    3862              :                     {
    3863              :                         // ... if so, flush previous layer and prepare to write new one
    3864        10439 :                         new_layers.push(
    3865        10439 :                             writer
    3866        10439 :                                 .take()
    3867        10439 :                                 .unwrap()
    3868        10439 :                                 .finish(prev_key.unwrap().next(), self)
    3869         1190 :                                 .await?,
    3870              :                         );
    3871        10439 :                         writer = None;
    3872        10439 : 
    3873        10439 :                         if contains_hole {
    3874           88 :                             // skip hole
    3875           88 :                             next_hole += 1;
    3876        10351 :                         }
    3877      2363191 :                     }
    3878          293 :                 }
    3879              :                 // Remember size of key value because at next iteration we will access next item
    3880      2373923 :                 key_values_total_size = next_key_size;
    3881     15129234 :             }
    3882     17503157 :             if writer.is_none() {
    3883        10732 :                 // Create writer if not initiaized yet
    3884        10732 :                 writer = Some(
    3885              :                     DeltaLayerWriter::new(
    3886        10732 :                         self.conf,
    3887        10732 :                         self.timeline_id,
    3888        10732 :                         self.tenant_shard_id,
    3889        10732 :                         key,
    3890        10732 :                         if dup_end_lsn.is_valid() {
    3891              :                             // this is a layer containing slice of values of the same key
    3892            0 :                             debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    3893           52 :                             dup_start_lsn..dup_end_lsn
    3894              :                         } else {
    3895            0 :                             debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    3896        10680 :                             lsn_range.clone()
    3897              :                         },
    3898              :                     )
    3899           10 :                     .await?,
    3900              :                 );
    3901     17492425 :             }
    3902              : 
    3903     17503157 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    3904            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    3905            0 :                     "failpoint delta-layer-writer-fail-before-finish"
    3906            0 :                 )))
    3907     17503157 :             });
    3908              : 
    3909     17503157 :             if !self.shard_identity.is_key_disposable(&key) {
    3910     17503157 :                 writer.as_mut().unwrap().put_value(key, lsn, value).await?;
    3911              :             } else {
    3912            0 :                 debug!(
    3913            0 :                     "Dropping key {} during compaction (it belongs on shard {:?})",
    3914            0 :                     key,
    3915            0 :                     self.shard_identity.get_shard_number(&key)
    3916            0 :                 );
    3917              :             }
    3918              : 
    3919     17503157 :             if !new_layers.is_empty() {
    3920     14171587 :                 fail_point!("after-timeline-compacted-first-L1");
    3921     14171587 :             }
    3922              : 
    3923     17503157 :             prev_key = Some(key);
    3924              :         }
    3925          292 :         if let Some(writer) = writer {
    3926          292 :             new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
    3927            0 :         }
    3928              : 
    3929              :         // Sync layers
    3930          292 :         if !new_layers.is_empty() {
    3931              :             // Print a warning if the created layer is larger than double the target size
    3932              :             // Add two pages for potential overhead. This should in theory be already
    3933              :             // accounted for in the target calculation, but for very small targets,
    3934              :             // we still might easily hit the limit otherwise.
    3935          292 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    3936        10730 :             for layer in new_layers.iter() {
    3937        10730 :                 if layer.layer_desc().file_size > warn_limit {
    3938            0 :                     warn!(
    3939            0 :                         %layer,
    3940            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    3941            0 :                     );
    3942        10730 :                 }
    3943              :             }
    3944              : 
    3945              :             // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
    3946          292 :             let layer_paths: Vec<Utf8PathBuf> = new_layers
    3947          292 :                 .iter()
    3948        10730 :                 .map(|l| l.local_path().to_owned())
    3949          292 :                 .collect();
    3950          292 : 
    3951          292 :             // Fsync all the layer files and directory using multiple threads to
    3952          292 :             // minimize latency.
    3953          292 :             par_fsync::par_fsync_async(&layer_paths)
    3954          527 :                 .await
    3955          292 :                 .context("fsync all new layers")?;
    3956              : 
    3957          292 :             let timeline_dir = self
    3958          292 :                 .conf
    3959          292 :                 .timeline_path(&self.tenant_shard_id, &self.timeline_id);
    3960          292 : 
    3961          292 :             par_fsync::par_fsync_async(&[timeline_dir])
    3962          323 :                 .await
    3963          292 :                 .context("fsync of timeline dir")?;
    3964            0 :         }
    3965              : 
    3966          292 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    3967          292 :         stats.new_deltas_count = Some(new_layers.len());
    3968        10730 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    3969          292 : 
    3970          292 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    3971          292 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    3972              :         {
    3973          292 :             Ok(stats_json) => {
    3974          292 :                 info!(
    3975          292 :                     stats_json = stats_json.as_str(),
    3976          292 :                     "compact_level0_phase1 stats available"
    3977          292 :                 )
    3978              :             }
    3979            0 :             Err(e) => {
    3980            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    3981              :             }
    3982              :         }
    3983              : 
    3984          292 :         Ok(CompactLevel0Phase1Result {
    3985          292 :             new_layers,
    3986          292 :             deltas_to_compact: deltas_to_compact
    3987          292 :                 .into_iter()
    3988         4213 :                 .map(|x| x.drop_eviction_guard())
    3989          292 :                 .collect::<Vec<_>>(),
    3990          292 :         })
    3991         1518 :     }
    3992              : 
    3993              :     ///
    3994              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
    3995              :     /// as Level 1 files.
    3996              :     ///
    3997         1519 :     async fn compact_level0(
    3998         1519 :         self: &Arc<Self>,
    3999         1519 :         target_file_size: u64,
    4000         1519 :         ctx: &RequestContext,
    4001         1519 :     ) -> Result<(), CompactionError> {
    4002              :         let CompactLevel0Phase1Result {
    4003         1518 :             new_layers,
    4004         1518 :             deltas_to_compact,
    4005              :         } = {
    4006         1519 :             let phase1_span = info_span!("compact_level0_phase1");
    4007         1519 :             let ctx = ctx.attached_child();
    4008         1519 :             let mut stats = CompactLevel0Phase1StatsBuilder {
    4009         1519 :                 version: Some(2),
    4010         1519 :                 tenant_id: Some(self.tenant_shard_id),
    4011         1519 :                 timeline_id: Some(self.timeline_id),
    4012         1519 :                 ..Default::default()
    4013         1519 :             };
    4014         1519 : 
    4015         1519 :             let begin = tokio::time::Instant::now();
    4016         1519 :             let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
    4017         1519 :             let now = tokio::time::Instant::now();
    4018         1519 :             stats.read_lock_acquisition_micros =
    4019         1519 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
    4020         1519 :             self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
    4021         1519 :                 .instrument(phase1_span)
    4022       331540 :                 .await?
    4023              :         };
    4024              : 
    4025         1518 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
    4026              :             // nothing to do
    4027         1223 :             return Ok(());
    4028          295 :         }
    4029              : 
    4030          295 :         let mut guard = self.layers.write().await;
    4031              : 
    4032          295 :         let mut duplicated_layers = HashSet::new();
    4033          295 : 
    4034          295 :         let mut insert_layers = Vec::with_capacity(new_layers.len());
    4035              : 
    4036        11065 :         for l in &new_layers {
    4037        10770 :             if guard.contains(l.as_ref()) {
    4038              :                 // expected in tests
    4039           40 :                 tracing::error!(layer=%l, "duplicated L1 layer");
    4040              : 
    4041              :                 // good ways to cause a duplicate: we repeatedly error after taking the writelock
    4042              :                 // `guard`  on self.layers. as of writing this, there are no error returns except
    4043              :                 // for compact_level0_phase1 creating an L0, which does not happen in practice
    4044              :                 // because we have not implemented L0 => L0 compaction.
    4045           40 :                 duplicated_layers.insert(l.layer_desc().key());
    4046        10730 :             } else if LayerMap::is_l0(l.layer_desc()) {
    4047            0 :                 return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
    4048        10730 :             } else {
    4049        10730 :                 insert_layers.push(l.clone());
    4050        10730 :             }
    4051              :         }
    4052              : 
    4053          295 :         let remove_layers = {
    4054          295 :             let mut deltas_to_compact = deltas_to_compact;
    4055          295 :             // only remove those inputs which were not outputs
    4056         4253 :             deltas_to_compact.retain(|l| !duplicated_layers.contains(&l.layer_desc().key()));
    4057          295 :             deltas_to_compact
    4058          295 :         };
    4059          295 : 
    4060          295 :         // deletion will happen later, the layer file manager calls garbage_collect_on_drop
    4061          295 :         guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
    4062              : 
    4063          295 :         if let Some(remote_client) = self.remote_client.as_ref() {
    4064          295 :             remote_client.schedule_compaction_update(&remove_layers, &new_layers)?;
    4065            0 :         }
    4066              : 
    4067          295 :         drop_wlock(guard);
    4068          295 : 
    4069          295 :         Ok(())
    4070         1518 :     }
    4071              : 
    4072              :     /// Update information about which layer files need to be retained on
    4073              :     /// garbage collection. This is separate from actually performing the GC,
    4074              :     /// and is updated more frequently, so that compaction can remove obsolete
    4075              :     /// page versions more aggressively.
    4076              :     ///
    4077              :     /// TODO: that's wishful thinking, compaction doesn't actually do that
    4078              :     /// currently.
    4079              :     ///
    4080              :     /// The caller specifies how much history is needed with the 3 arguments:
    4081              :     ///
    4082              :     /// retain_lsns: keep a version of each page at these LSNs
    4083              :     /// cutoff_horizon: also keep everything newer than this LSN
    4084              :     /// pitr: the time duration required to keep data for PITR
    4085              :     ///
    4086              :     /// The 'retain_lsns' list is currently used to prevent removing files that
    4087              :     /// are needed by child timelines. In the future, the user might be able to
    4088              :     /// name additional points in time to retain. The caller is responsible for
    4089              :     /// collecting that information.
    4090              :     ///
    4091              :     /// The 'cutoff_horizon' point is used to retain recent versions that might still be
    4092              :     /// needed by read-only nodes. (As of this writing, the caller just passes
    4093              :     /// the latest LSN subtracted by a constant, and doesn't do anything smart
    4094              :     /// to figure out what read-only nodes might actually need.)
    4095              :     ///
    4096              :     /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
    4097              :     /// whether a record is needed for PITR.
    4098              :     ///
    4099              :     /// NOTE: This function holds a short-lived lock to protect the 'gc_info'
    4100              :     /// field, so that the three values passed as argument are stored
    4101              :     /// atomically. But the caller is responsible for ensuring that no new
    4102              :     /// branches are created that would need to be included in 'retain_lsns',
    4103              :     /// for example. The caller should hold `Tenant::gc_cs` lock to ensure
    4104              :     /// that.
    4105              :     ///
    4106            0 :     #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
    4107              :     pub(super) async fn update_gc_info(
    4108              :         &self,
    4109              :         retain_lsns: Vec<Lsn>,
    4110              :         cutoff_horizon: Lsn,
    4111              :         pitr: Duration,
    4112              :         cancel: &CancellationToken,
    4113              :         ctx: &RequestContext,
    4114              :     ) -> anyhow::Result<()> {
    4115              :         // First, calculate pitr_cutoff_timestamp and then convert it to LSN.
    4116              :         //
    4117              :         // Some unit tests depend on garbage-collection working even when
    4118              :         // CLOG data is missing, so that find_lsn_for_timestamp() doesn't
    4119              :         // work, so avoid calling it altogether if time-based retention is not
    4120              :         // configured. It would be pointless anyway.
    4121              :         let pitr_cutoff = if pitr != Duration::ZERO {
    4122              :             let now = SystemTime::now();
    4123              :             if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
    4124              :                 let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
    4125              : 
    4126              :                 match self
    4127              :                     .find_lsn_for_timestamp(pitr_timestamp, cancel, ctx)
    4128              :                     .await?
    4129              :                 {
    4130              :                     LsnForTimestamp::Present(lsn) => lsn,
    4131              :                     LsnForTimestamp::Future(lsn) => {
    4132              :                         // The timestamp is in the future. That sounds impossible,
    4133              :                         // but what it really means is that there hasn't been
    4134              :                         // any commits since the cutoff timestamp.
    4135            0 :                         debug!("future({})", lsn);
    4136              :                         cutoff_horizon
    4137              :                     }
    4138              :                     LsnForTimestamp::Past(lsn) => {
    4139            0 :                         debug!("past({})", lsn);
    4140              :                         // conservative, safe default is to remove nothing, when we
    4141              :                         // have no commit timestamp data available
    4142              :                         *self.get_latest_gc_cutoff_lsn()
    4143              :                     }
    4144              :                     LsnForTimestamp::NoData(lsn) => {
    4145            0 :                         debug!("nodata({})", lsn);
    4146              :                         // conservative, safe default is to remove nothing, when we
    4147              :                         // have no commit timestamp data available
    4148              :                         *self.get_latest_gc_cutoff_lsn()
    4149              :                     }
    4150              :                 }
    4151              :             } else {
    4152              :                 // If we don't have enough data to convert to LSN,
    4153              :                 // play safe and don't remove any layers.
    4154              :                 *self.get_latest_gc_cutoff_lsn()
    4155              :             }
    4156              :         } else {
    4157              :             // No time-based retention was configured. Set time-based cutoff to
    4158              :             // same as LSN based.
    4159              :             cutoff_horizon
    4160              :         };
    4161              : 
    4162              :         // Grab the lock and update the values
    4163              :         *self.gc_info.write().unwrap() = GcInfo {
    4164              :             retain_lsns,
    4165              :             horizon_cutoff: cutoff_horizon,
    4166              :             pitr_cutoff,
    4167              :         };
    4168              : 
    4169              :         Ok(())
    4170              :     }
    4171              : 
    4172              :     /// Garbage collect layer files on a timeline that are no longer needed.
    4173              :     ///
    4174              :     /// Currently, we don't make any attempt at removing unneeded page versions
    4175              :     /// within a layer file. We can only remove the whole file if it's fully
    4176              :     /// obsolete.
    4177          720 :     pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
    4178          720 :         // this is most likely the background tasks, but it might be the spawned task from
    4179          720 :         // immediate_gc
    4180          720 :         let cancel = crate::task_mgr::shutdown_token();
    4181          720 :         let _g = tokio::select! {
    4182          719 :             guard = self.gc_lock.lock() => guard,
    4183              :             _ = self.cancel.cancelled() => return Ok(GcResult::default()),
    4184              :             _ = cancel.cancelled() => return Ok(GcResult::default()),
    4185              :         };
    4186          719 :         let timer = self.metrics.garbage_collect_histo.start_timer();
    4187              : 
    4188            0 :         fail_point!("before-timeline-gc");
    4189              : 
    4190              :         // Is the timeline being deleted?
    4191          719 :         if self.is_stopping() {
    4192            0 :             anyhow::bail!("timeline is Stopping");
    4193          719 :         }
    4194          719 : 
    4195          719 :         let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
    4196          719 :             let gc_info = self.gc_info.read().unwrap();
    4197          719 : 
    4198          719 :             let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
    4199          719 :             let pitr_cutoff = gc_info.pitr_cutoff;
    4200          719 :             let retain_lsns = gc_info.retain_lsns.clone();
    4201          719 :             (horizon_cutoff, pitr_cutoff, retain_lsns)
    4202          719 :         };
    4203          719 : 
    4204          719 :         let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
    4205          719 :         let standby_horizon = self.standby_horizon.load();
    4206          719 :         let new_gc_cutoff = if standby_horizon != Lsn::INVALID {
    4207           10 :             Lsn::min(standby_horizon, new_gc_cutoff)
    4208              :         } else {
    4209          709 :             new_gc_cutoff
    4210              :         };
    4211              : 
    4212              :         // Reset standby horizon to ignore  it if it is not updated till next GC
    4213          719 :         self.standby_horizon.store(Lsn::INVALID);
    4214              : 
    4215          719 :         let res = self
    4216          719 :             .gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
    4217          719 :             .instrument(
    4218          719 :                 info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
    4219              :             )
    4220           99 :             .await?;
    4221              : 
    4222              :         // only record successes
    4223          719 :         timer.stop_and_record();
    4224          719 : 
    4225          719 :         Ok(res)
    4226          720 :     }
    4227              : 
    4228          719 :     async fn gc_timeline(
    4229          719 :         &self,
    4230          719 :         horizon_cutoff: Lsn,
    4231          719 :         pitr_cutoff: Lsn,
    4232          719 :         retain_lsns: Vec<Lsn>,
    4233          719 :         new_gc_cutoff: Lsn,
    4234          719 :     ) -> anyhow::Result<GcResult> {
    4235          719 :         let now = SystemTime::now();
    4236          719 :         let mut result: GcResult = GcResult::default();
    4237          719 : 
    4238          719 :         // Nothing to GC. Return early.
    4239          719 :         let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
    4240          719 :         if latest_gc_cutoff >= new_gc_cutoff {
    4241           98 :             info!(
    4242           98 :                 "Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
    4243           98 :             );
    4244           98 :             return Ok(result);
    4245          621 :         }
    4246              : 
    4247              :         // We need to ensure that no one tries to read page versions or create
    4248              :         // branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
    4249              :         // for details. This will block until the old value is no longer in use.
    4250              :         //
    4251              :         // The GC cutoff should only ever move forwards.
    4252          621 :         let waitlist = {
    4253          621 :             let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
    4254          621 :             ensure!(
    4255          621 :                 *write_guard <= new_gc_cutoff,
    4256            0 :                 "Cannot move GC cutoff LSN backwards (was {}, new {})",
    4257            0 :                 *write_guard,
    4258              :                 new_gc_cutoff
    4259              :             );
    4260          621 :             write_guard.store_and_unlock(new_gc_cutoff)
    4261          621 :         };
    4262          621 :         waitlist.wait().await;
    4263              : 
    4264          621 :         info!("GC starting");
    4265              : 
    4266            0 :         debug!("retain_lsns: {:?}", retain_lsns);
    4267              : 
    4268          621 :         let mut layers_to_remove = Vec::new();
    4269          621 :         let mut wanted_image_layers = KeySpaceRandomAccum::default();
    4270              : 
    4271              :         // Scan all layers in the timeline (remote or on-disk).
    4272              :         //
    4273              :         // Garbage collect the layer if all conditions are satisfied:
    4274              :         // 1. it is older than cutoff LSN;
    4275              :         // 2. it is older than PITR interval;
    4276              :         // 3. it doesn't need to be retained for 'retain_lsns';
    4277              :         // 4. newer on-disk image layers cover the layer's whole key range
    4278              :         //
    4279              :         // TODO holding a write lock is too agressive and avoidable
    4280          621 :         let mut guard = self.layers.write().await;
    4281          621 :         let layers = guard.layer_map();
    4282        16897 :         'outer: for l in layers.iter_historic_layers() {
    4283        16897 :             result.layers_total += 1;
    4284        16897 : 
    4285        16897 :             // 1. Is it newer than GC horizon cutoff point?
    4286        16897 :             if l.get_lsn_range().end > horizon_cutoff {
    4287            0 :                 debug!(
    4288            0 :                     "keeping {} because it's newer than horizon_cutoff {}",
    4289            0 :                     l.filename(),
    4290            0 :                     horizon_cutoff,
    4291            0 :                 );
    4292         1660 :                 result.layers_needed_by_cutoff += 1;
    4293         1660 :                 continue 'outer;
    4294        15237 :             }
    4295        15237 : 
    4296        15237 :             // 2. It is newer than PiTR cutoff point?
    4297        15237 :             if l.get_lsn_range().end > pitr_cutoff {
    4298            0 :                 debug!(
    4299            0 :                     "keeping {} because it's newer than pitr_cutoff {}",
    4300            0 :                     l.filename(),
    4301            0 :                     pitr_cutoff,
    4302            0 :                 );
    4303           40 :                 result.layers_needed_by_pitr += 1;
    4304           40 :                 continue 'outer;
    4305        15197 :             }
    4306              : 
    4307              :             // 3. Is it needed by a child branch?
    4308              :             // NOTE With that we would keep data that
    4309              :             // might be referenced by child branches forever.
    4310              :             // We can track this in child timeline GC and delete parent layers when
    4311              :             // they are no longer needed. This might be complicated with long inheritance chains.
    4312              :             //
    4313              :             // TODO Vec is not a great choice for `retain_lsns`
    4314        15221 :             for retain_lsn in &retain_lsns {
    4315              :                 // start_lsn is inclusive
    4316          458 :                 if &l.get_lsn_range().start <= retain_lsn {
    4317            0 :                     debug!(
    4318            0 :                         "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
    4319            0 :                         l.filename(),
    4320            0 :                         retain_lsn,
    4321            0 :                         l.is_incremental(),
    4322            0 :                     );
    4323          434 :                     result.layers_needed_by_branches += 1;
    4324          434 :                     continue 'outer;
    4325           24 :                 }
    4326              :             }
    4327              : 
    4328              :             // 4. Is there a later on-disk layer for this relation?
    4329              :             //
    4330              :             // The end-LSN is exclusive, while disk_consistent_lsn is
    4331              :             // inclusive. For example, if disk_consistent_lsn is 100, it is
    4332              :             // OK for a delta layer to have end LSN 101, but if the end LSN
    4333              :             // is 102, then it might not have been fully flushed to disk
    4334              :             // before crash.
    4335              :             //
    4336              :             // For example, imagine that the following layers exist:
    4337              :             //
    4338              :             // 1000      - image (A)
    4339              :             // 1000-2000 - delta (B)
    4340              :             // 2000      - image (C)
    4341              :             // 2000-3000 - delta (D)
    4342              :             // 3000      - image (E)
    4343              :             //
    4344              :             // If GC horizon is at 2500, we can remove layers A and B, but
    4345              :             // we cannot remove C, even though it's older than 2500, because
    4346              :             // the delta layer 2000-3000 depends on it.
    4347        14763 :             if !layers
    4348        14763 :                 .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
    4349              :             {
    4350            0 :                 debug!("keeping {} because it is the latest layer", l.filename());
    4351              :                 // Collect delta key ranges that need image layers to allow garbage
    4352              :                 // collecting the layers.
    4353              :                 // It is not so obvious whether we need to propagate information only about
    4354              :                 // delta layers. Image layers can form "stairs" preventing old image from been deleted.
    4355              :                 // But image layers are in any case less sparse than delta layers. Also we need some
    4356              :                 // protection from replacing recent image layers with new one after each GC iteration.
    4357        13566 :                 if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
    4358            0 :                     wanted_image_layers.add_range(l.get_key_range());
    4359        13566 :                 }
    4360        13566 :                 result.layers_not_updated += 1;
    4361        13566 :                 continue 'outer;
    4362         1197 :             }
    4363              : 
    4364              :             // We didn't find any reason to keep this file, so remove it.
    4365            0 :             debug!(
    4366            0 :                 "garbage collecting {} is_dropped: xx is_incremental: {}",
    4367            0 :                 l.filename(),
    4368            0 :                 l.is_incremental(),
    4369            0 :             );
    4370         1197 :             layers_to_remove.push(l);
    4371              :         }
    4372          621 :         self.wanted_image_layers
    4373          621 :             .lock()
    4374          621 :             .unwrap()
    4375          621 :             .replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
    4376          621 : 
    4377          621 :         if !layers_to_remove.is_empty() {
    4378              :             // Persist the new GC cutoff value in the metadata file, before
    4379              :             // we actually remove anything.
    4380              :             //
    4381              :             // This does not in fact have any effect as we no longer consider local metadata unless
    4382              :             // running without remote storage.
    4383              :             //
    4384              :             // This unconditionally schedules also an index_part.json update, even though, we will
    4385              :             // be doing one a bit later with the unlinked gc'd layers.
    4386              :             //
    4387              :             // TODO: remove when implementing <https://github.com/neondatabase/neon/issues/4099>.
    4388           20 :             self.update_metadata_file(self.disk_consistent_lsn.load(), None)
    4389            0 :                 .await?;
    4390              : 
    4391           20 :             let gc_layers = layers_to_remove
    4392           20 :                 .iter()
    4393         1197 :                 .map(|x| guard.get_from_desc(x))
    4394           20 :                 .collect::<Vec<Layer>>();
    4395           20 : 
    4396           20 :             result.layers_removed = gc_layers.len() as u64;
    4397              : 
    4398           20 :             if let Some(remote_client) = self.remote_client.as_ref() {
    4399           20 :                 remote_client.schedule_gc_update(&gc_layers)?;
    4400            0 :             }
    4401              : 
    4402           20 :             guard.finish_gc_timeline(&gc_layers);
    4403           20 : 
    4404           20 :             #[cfg(feature = "testing")]
    4405           20 :             {
    4406           20 :                 result.doomed_layers = gc_layers;
    4407           20 :             }
    4408          601 :         }
    4409              : 
    4410          621 :         info!(
    4411          621 :             "GC completed removing {} layers, cutoff {}",
    4412          621 :             result.layers_removed, new_gc_cutoff
    4413          621 :         );
    4414              : 
    4415          621 :         result.elapsed = now.elapsed()?;
    4416          621 :         Ok(result)
    4417          719 :     }
    4418              : 
    4419              :     /// Reconstruct a value, using the given base image and WAL records in 'data'.
    4420      8623837 :     async fn reconstruct_value(
    4421      8623837 :         &self,
    4422      8623837 :         key: Key,
    4423      8623837 :         request_lsn: Lsn,
    4424      8623837 :         mut data: ValueReconstructState,
    4425      8623839 :     ) -> Result<Bytes, PageReconstructError> {
    4426      8623839 :         // Perform WAL redo if needed
    4427      8623839 :         data.records.reverse();
    4428      8623839 : 
    4429      8623839 :         // If we have a page image, and no WAL, we're all set
    4430      8623839 :         if data.records.is_empty() {
    4431      5762922 :             if let Some((img_lsn, img)) = &data.img {
    4432            0 :                 trace!(
    4433            0 :                     "found page image for key {} at {}, no WAL redo required, req LSN {}",
    4434            0 :                     key,
    4435            0 :                     img_lsn,
    4436            0 :                     request_lsn,
    4437            0 :                 );
    4438      5762922 :                 Ok(img.clone())
    4439              :             } else {
    4440            0 :                 Err(PageReconstructError::from(anyhow!(
    4441            0 :                     "base image for {key} at {request_lsn} not found"
    4442            0 :                 )))
    4443              :             }
    4444              :         } else {
    4445              :             // We need to do WAL redo.
    4446              :             //
    4447              :             // If we don't have a base image, then the oldest WAL record better initialize
    4448              :             // the page
    4449      2860917 :             if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
    4450            0 :                 Err(PageReconstructError::from(anyhow!(
    4451            0 :                     "Base image for {} at {} not found, but got {} WAL records",
    4452            0 :                     key,
    4453            0 :                     request_lsn,
    4454            0 :                     data.records.len()
    4455            0 :                 )))
    4456              :             } else {
    4457      2860917 :                 if data.img.is_some() {
    4458            0 :                     trace!(
    4459            0 :                         "found {} WAL records and a base image for {} at {}, performing WAL redo",
    4460            0 :                         data.records.len(),
    4461            0 :                         key,
    4462            0 :                         request_lsn
    4463            0 :                     );
    4464              :                 } else {
    4465            0 :                     trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
    4466              :                 };
    4467              : 
    4468      2860917 :                 let last_rec_lsn = data.records.last().unwrap().0;
    4469              : 
    4470      2860917 :                 let img = match self
    4471      2860917 :                     .walredo_mgr
    4472      2860917 :                     .as_ref()
    4473      2860917 :                     .context("timeline has no walredo manager")
    4474      2860917 :                     .map_err(PageReconstructError::WalRedo)?
    4475      2860917 :                     .request_redo(key, request_lsn, data.img, data.records, self.pg_version)
    4476            0 :                     .await
    4477      2860917 :                     .context("reconstruct a page image")
    4478              :                 {
    4479      2860917 :                     Ok(img) => img,
    4480            0 :                     Err(e) => return Err(PageReconstructError::WalRedo(e)),
    4481              :                 };
    4482              : 
    4483      2860917 :                 if img.len() == page_cache::PAGE_SZ {
    4484      2857268 :                     let cache = page_cache::get();
    4485      2857268 :                     if let Err(e) = cache
    4486      2857268 :                         .memorize_materialized_page(
    4487      2857268 :                             self.tenant_shard_id,
    4488      2857268 :                             self.timeline_id,
    4489      2857268 :                             key,
    4490      2857268 :                             last_rec_lsn,
    4491      2857268 :                             &img,
    4492      2857268 :                         )
    4493        10099 :                         .await
    4494      2857268 :                         .context("Materialized page memoization failed")
    4495              :                     {
    4496            0 :                         return Err(PageReconstructError::from(e));
    4497      2857268 :                     }
    4498         3649 :                 }
    4499              : 
    4500      2860917 :                 Ok(img)
    4501              :             }
    4502              :         }
    4503      8623839 :     }
    4504              : 
    4505            2 :     pub(crate) async fn spawn_download_all_remote_layers(
    4506            2 :         self: Arc<Self>,
    4507            2 :         request: DownloadRemoteLayersTaskSpawnRequest,
    4508            2 :     ) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
    4509            2 :         use pageserver_api::models::DownloadRemoteLayersTaskState;
    4510            2 : 
    4511            2 :         // this is not really needed anymore; it has tests which really check the return value from
    4512            2 :         // http api. it would be better not to maintain this anymore.
    4513            2 : 
    4514            2 :         let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
    4515            2 :         if let Some(st) = &*status_guard {
    4516            1 :             match &st.state {
    4517              :                 DownloadRemoteLayersTaskState::Running => {
    4518            0 :                     return Err(st.clone());
    4519              :                 }
    4520              :                 DownloadRemoteLayersTaskState::ShutDown
    4521            1 :                 | DownloadRemoteLayersTaskState::Completed => {
    4522            1 :                     *status_guard = None;
    4523            1 :                 }
    4524              :             }
    4525            1 :         }
    4526              : 
    4527            2 :         let self_clone = Arc::clone(&self);
    4528            2 :         let task_id = task_mgr::spawn(
    4529            2 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    4530            2 :             task_mgr::TaskKind::DownloadAllRemoteLayers,
    4531            2 :             Some(self.tenant_shard_id),
    4532            2 :             Some(self.timeline_id),
    4533            2 :             "download all remote layers task",
    4534              :             false,
    4535            2 :             async move {
    4536            9 :                 self_clone.download_all_remote_layers(request).await;
    4537            2 :                 let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
    4538            2 :                  match &mut *status_guard {
    4539              :                     None => {
    4540            0 :                         warn!("tasks status is supposed to be Some(), since we are running");
    4541              :                     }
    4542            2 :                     Some(st) => {
    4543            2 :                         let exp_task_id = format!("{}", task_mgr::current_task_id().unwrap());
    4544            2 :                         if st.task_id != exp_task_id {
    4545            0 :                             warn!("task id changed while we were still running, expecting {} but have {}", exp_task_id, st.task_id);
    4546            2 :                         } else {
    4547            2 :                             st.state = DownloadRemoteLayersTaskState::Completed;
    4548            2 :                         }
    4549              :                     }
    4550              :                 };
    4551            2 :                 Ok(())
    4552            2 :             }
    4553            2 :             .instrument(info_span!(parent: None, "download_all_remote_layers", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
    4554              :         );
    4555              : 
    4556            2 :         let initial_info = DownloadRemoteLayersTaskInfo {
    4557            2 :             task_id: format!("{task_id}"),
    4558            2 :             state: DownloadRemoteLayersTaskState::Running,
    4559            2 :             total_layer_count: 0,
    4560            2 :             successful_download_count: 0,
    4561            2 :             failed_download_count: 0,
    4562            2 :         };
    4563            2 :         *status_guard = Some(initial_info.clone());
    4564            2 : 
    4565            2 :         Ok(initial_info)
    4566            2 :     }
    4567              : 
    4568            2 :     async fn download_all_remote_layers(
    4569            2 :         self: &Arc<Self>,
    4570            2 :         request: DownloadRemoteLayersTaskSpawnRequest,
    4571            2 :     ) {
    4572              :         use pageserver_api::models::DownloadRemoteLayersTaskState;
    4573              : 
    4574            2 :         let remaining = {
    4575            2 :             let guard = self.layers.read().await;
    4576            2 :             guard
    4577            2 :                 .layer_map()
    4578            2 :                 .iter_historic_layers()
    4579           10 :                 .map(|desc| guard.get_from_desc(&desc))
    4580            2 :                 .collect::<Vec<_>>()
    4581            2 :         };
    4582            2 :         let total_layer_count = remaining.len();
    4583            2 : 
    4584            2 :         macro_rules! lock_status {
    4585            2 :             ($st:ident) => {
    4586            2 :                 let mut st = self.download_all_remote_layers_task_info.write().unwrap();
    4587            2 :                 let st = st
    4588            2 :                     .as_mut()
    4589            2 :                     .expect("this function is only called after the task has been spawned");
    4590            2 :                 assert_eq!(
    4591            2 :                     st.task_id,
    4592            2 :                     format!(
    4593            2 :                         "{}",
    4594            2 :                         task_mgr::current_task_id().expect("we run inside a task_mgr task")
    4595            2 :                     )
    4596            2 :                 );
    4597            2 :                 let $st = st;
    4598            2 :             };
    4599            2 :         }
    4600            2 : 
    4601            2 :         {
    4602            2 :             lock_status!(st);
    4603            2 :             st.total_layer_count = total_layer_count as u64;
    4604            2 :         }
    4605            2 : 
    4606            2 :         let mut remaining = remaining.into_iter();
    4607            2 :         let mut have_remaining = true;
    4608            2 :         let mut js = tokio::task::JoinSet::new();
    4609            2 : 
    4610            2 :         let cancel = task_mgr::shutdown_token();
    4611            2 : 
    4612            2 :         let limit = request.max_concurrent_downloads;
    4613              : 
    4614              :         loop {
    4615           12 :             while js.len() < limit.get() && have_remaining && !cancel.is_cancelled() {
    4616           12 :                 let Some(next) = remaining.next() else {
    4617            2 :                     have_remaining = false;
    4618            2 :                     break;
    4619              :                 };
    4620              : 
    4621           10 :                 let span = tracing::info_span!("download", layer = %next);
    4622              : 
    4623           10 :                 js.spawn(
    4624           10 :                     async move {
    4625           26 :                         let res = next.download().await;
    4626           10 :                         (next, res)
    4627           10 :                     }
    4628           10 :                     .instrument(span),
    4629           10 :                 );
    4630              :             }
    4631              : 
    4632           12 :             while let Some(res) = js.join_next().await {
    4633           10 :                 match res {
    4634              :                     Ok((_, Ok(_))) => {
    4635            5 :                         lock_status!(st);
    4636            5 :                         st.successful_download_count += 1;
    4637              :                     }
    4638            5 :                     Ok((layer, Err(e))) => {
    4639            5 :                         tracing::error!(%layer, "download failed: {e:#}");
    4640            5 :                         lock_status!(st);
    4641            5 :                         st.failed_download_count += 1;
    4642              :                     }
    4643            0 :                     Err(je) if je.is_cancelled() => unreachable!("not used here"),
    4644            0 :                     Err(je) if je.is_panic() => {
    4645            0 :                         lock_status!(st);
    4646            0 :                         st.failed_download_count += 1;
    4647              :                     }
    4648            0 :                     Err(je) => tracing::warn!("unknown joinerror: {je:?}"),
    4649              :                 }
    4650              :             }
    4651              : 
    4652            2 :             if js.is_empty() && (!have_remaining || cancel.is_cancelled()) {
    4653            2 :                 break;
    4654            0 :             }
    4655              :         }
    4656              : 
    4657              :         {
    4658            2 :             lock_status!(st);
    4659            2 :             st.state = DownloadRemoteLayersTaskState::Completed;
    4660            2 :         }
    4661            2 :     }
    4662              : 
    4663           50 :     pub(crate) fn get_download_all_remote_layers_task_info(
    4664           50 :         &self,
    4665           50 :     ) -> Option<DownloadRemoteLayersTaskInfo> {
    4666           50 :         self.download_all_remote_layers_task_info
    4667           50 :             .read()
    4668           50 :             .unwrap()
    4669           50 :             .clone()
    4670           50 :     }
    4671              : }
    4672              : 
    4673              : impl Timeline {
    4674              :     /// Returns non-remote layers for eviction.
    4675           43 :     pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
    4676           43 :         let guard = self.layers.read().await;
    4677           43 :         let layers = guard.layer_map();
    4678           43 : 
    4679           43 :         let mut max_layer_size: Option<u64> = None;
    4680           43 :         let mut resident_layers = Vec::new();
    4681              : 
    4682         3018 :         for l in layers.iter_historic_layers() {
    4683         3018 :             let file_size = l.file_size();
    4684         3018 :             max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
    4685         3018 : 
    4686         3018 :             let l = guard.get_from_desc(&l);
    4687              : 
    4688         3018 :             let l = match l.keep_resident().await {
    4689         3017 :                 Ok(Some(l)) => l,
    4690            1 :                 Ok(None) => continue,
    4691            0 :                 Err(e) => {
    4692              :                     // these should not happen, but we cannot make them statically impossible right
    4693              :                     // now.
    4694            0 :                     tracing::warn!(layer=%l, "failed to keep the layer resident: {e:#}");
    4695            0 :                     continue;
    4696              :                 }
    4697              :             };
    4698              : 
    4699         3017 :             let last_activity_ts = l.access_stats().latest_activity().unwrap_or_else(|| {
    4700            0 :                 // We only use this fallback if there's an implementation error.
    4701            0 :                 // `latest_activity` already does rate-limited warn!() log.
    4702            0 :                 debug!(layer=%l, "last_activity returns None, using SystemTime::now");
    4703            0 :                 SystemTime::now()
    4704         3017 :             });
    4705         3017 : 
    4706         3017 :             resident_layers.push(EvictionCandidate {
    4707         3017 :                 layer: l.drop_eviction_guard().into(),
    4708         3017 :                 last_activity_ts,
    4709         3017 :                 relative_last_activity: finite_f32::FiniteF32::ZERO,
    4710         3017 :             });
    4711              :         }
    4712              : 
    4713           43 :         DiskUsageEvictionInfo {
    4714           43 :             max_layer_size,
    4715           43 :             resident_layers,
    4716           43 :         }
    4717           43 :     }
    4718              : 
    4719        22356 :     pub(crate) fn get_shard_index(&self) -> ShardIndex {
    4720        22356 :         ShardIndex {
    4721        22356 :             shard_number: self.tenant_shard_id.shard_number,
    4722        22356 :             shard_count: self.tenant_shard_id.shard_count,
    4723        22356 :         }
    4724        22356 :     }
    4725              : }
    4726              : 
    4727              : type TraversalPathItem = (
    4728              :     ValueReconstructResult,
    4729              :     Lsn,
    4730              :     Box<dyn Send + FnOnce() -> TraversalId>,
    4731              : );
    4732              : 
    4733              : /// Helper function for get_reconstruct_data() to add the path of layers traversed
    4734              : /// to an error, as anyhow context information.
    4735         1264 : fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructError {
    4736         1264 :     // We want the original 'msg' to be the outermost context. The outermost context
    4737         1264 :     // is the most high-level information, which also gets propagated to the client.
    4738         1264 :     let mut msg_iter = path
    4739         1264 :         .into_iter()
    4740         1388 :         .map(|(r, c, l)| {
    4741         1388 :             format!(
    4742         1388 :                 "layer traversal: result {:?}, cont_lsn {}, layer: {}",
    4743         1388 :                 r,
    4744         1388 :                 c,
    4745         1388 :                 l(),
    4746         1388 :             )
    4747         1388 :         })
    4748         1264 :         .chain(std::iter::once(msg));
    4749         1264 :     // Construct initial message from the first traversed layer
    4750         1264 :     let err = anyhow!(msg_iter.next().unwrap());
    4751         1264 : 
    4752         1264 :     // Append all subsequent traversals, and the error message 'msg', as contexts.
    4753         1388 :     let msg = msg_iter.fold(err, |err, msg| err.context(msg));
    4754         1264 :     PageReconstructError::from(msg)
    4755         1264 : }
    4756              : 
    4757              : /// Various functions to mutate the timeline.
    4758              : // TODO Currently, Deref is used to allow easy access to read methods from this trait.
    4759              : // This is probably considered a bad practice in Rust and should be fixed eventually,
    4760              : // but will cause large code changes.
    4761              : pub(crate) struct TimelineWriter<'a> {
    4762              :     tl: &'a Timeline,
    4763              :     _write_guard: tokio::sync::MutexGuard<'a, ()>,
    4764              : }
    4765              : 
    4766              : impl Deref for TimelineWriter<'_> {
    4767              :     type Target = Timeline;
    4768              : 
    4769            0 :     fn deref(&self) -> &Self::Target {
    4770            0 :         self.tl
    4771            0 :     }
    4772              : }
    4773              : 
    4774              : impl<'a> TimelineWriter<'a> {
    4775              :     /// Put a new page version that can be constructed from a WAL record
    4776              :     ///
    4777              :     /// This will implicitly extend the relation, if the page is beyond the
    4778              :     /// current end-of-file.
    4779      1214102 :     pub(crate) async fn put(
    4780      1214102 :         &self,
    4781      1214102 :         key: Key,
    4782      1214102 :         lsn: Lsn,
    4783      1214102 :         value: &Value,
    4784      1214102 :         ctx: &RequestContext,
    4785      1214102 :     ) -> anyhow::Result<()> {
    4786      1214102 :         self.tl.put_value(key, lsn, value, ctx).await
    4787      1214102 :     }
    4788              : 
    4789      1718651 :     pub(crate) async fn put_batch(
    4790      1718651 :         &self,
    4791      1718651 :         batch: &HashMap<Key, Vec<(Lsn, Value)>>,
    4792      1718651 :         ctx: &RequestContext,
    4793      1718651 :     ) -> anyhow::Result<()> {
    4794      1718651 :         self.tl.put_values(batch, ctx).await
    4795      1718650 :     }
    4796              : 
    4797        19216 :     pub(crate) async fn delete_batch(&self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
    4798        19216 :         self.tl.put_tombstones(batch).await
    4799        19216 :     }
    4800              : 
    4801              :     /// Track the end of the latest digested WAL record.
    4802              :     /// Remember the (end of) last valid WAL record remembered in the timeline.
    4803              :     ///
    4804              :     /// Call this after you have finished writing all the WAL up to 'lsn'.
    4805              :     ///
    4806              :     /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for
    4807              :     /// the 'lsn' or anything older. The previous last record LSN is stored alongside
    4808              :     /// the latest and can be read.
    4809     76570323 :     pub(crate) fn finish_write(&self, new_lsn: Lsn) {
    4810     76570323 :         self.tl.finish_write(new_lsn);
    4811     76570323 :     }
    4812              : 
    4813       642421 :     pub(crate) fn update_current_logical_size(&self, delta: i64) {
    4814       642421 :         self.tl.update_current_logical_size(delta)
    4815       642421 :     }
    4816              : }
    4817              : 
    4818              : // We need TimelineWriter to be send in upcoming conversion of
    4819              : // Timeline::layers to tokio::sync::RwLock.
    4820            2 : #[test]
    4821            2 : fn is_send() {
    4822            2 :     fn _assert_send<T: Send>() {}
    4823            2 :     _assert_send::<TimelineWriter<'_>>();
    4824            2 : }
    4825              : 
    4826              : /// Add a suffix to a layer file's name: .{num}.old
    4827              : /// Uses the first available num (starts at 0)
    4828            1 : fn rename_to_backup(path: &Utf8Path) -> anyhow::Result<()> {
    4829            1 :     let filename = path
    4830            1 :         .file_name()
    4831            1 :         .ok_or_else(|| anyhow!("Path {path} don't have a file name"))?;
    4832            1 :     let mut new_path = path.to_owned();
    4833              : 
    4834            1 :     for i in 0u32.. {
    4835            1 :         new_path.set_file_name(format!("{filename}.{i}.old"));
    4836            1 :         if !new_path.exists() {
    4837            1 :             std::fs::rename(path, &new_path)
    4838            1 :                 .with_context(|| format!("rename {path:?} to {new_path:?}"))?;
    4839            1 :             return Ok(());
    4840            0 :         }
    4841              :     }
    4842              : 
    4843            0 :     bail!("couldn't find an unused backup number for {:?}", path)
    4844            1 : }
    4845              : 
    4846              : #[cfg(test)]
    4847              : mod tests {
    4848              :     use utils::{id::TimelineId, lsn::Lsn};
    4849              : 
    4850              :     use crate::tenant::{
    4851              :         harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline,
    4852              :     };
    4853              : 
    4854            2 :     #[tokio::test]
    4855            2 :     async fn two_layer_eviction_attempts_at_the_same_time() {
    4856            2 :         let harness =
    4857            2 :             TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
    4858            2 : 
    4859            2 :         let ctx = any_context();
    4860            2 :         let tenant = harness.try_load(&ctx).await.unwrap();
    4861            2 :         let timeline = tenant
    4862            2 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
    4863            7 :             .await
    4864            2 :             .unwrap();
    4865              : 
    4866            2 :         let layer = find_some_layer(&timeline).await;
    4867            2 :         let layer = layer
    4868            2 :             .keep_resident()
    4869            0 :             .await
    4870            2 :             .expect("no download => no downloading errors")
    4871            2 :             .expect("should had been resident")
    4872            2 :             .drop_eviction_guard();
    4873            2 : 
    4874            2 :         let first = async { layer.evict_and_wait().await };
    4875            2 :         let second = async { layer.evict_and_wait().await };
    4876              : 
    4877            3 :         let (first, second) = tokio::join!(first, second);
    4878              : 
    4879            2 :         let res = layer.keep_resident().await;
    4880            2 :         assert!(matches!(res, Ok(None)), "{res:?}");
    4881              : 
    4882            2 :         match (first, second) {
    4883            0 :             (Ok(()), Ok(())) => {
    4884            0 :                 // because there are no more timeline locks being taken on eviction path, we can
    4885            0 :                 // witness all three outcomes here.
    4886            0 :             }
    4887            2 :             (Ok(()), Err(EvictionError::NotFound)) | (Err(EvictionError::NotFound), Ok(())) => {
    4888            2 :                 // if one completes before the other, this is fine just as well.
    4889            2 :             }
    4890            0 :             other => unreachable!("unexpected {:?}", other),
    4891              :         }
    4892              :     }
    4893              : 
    4894            2 :     fn any_context() -> crate::context::RequestContext {
    4895            2 :         use crate::context::*;
    4896            2 :         use crate::task_mgr::*;
    4897            2 :         RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
    4898            2 :     }
    4899              : 
    4900            2 :     async fn find_some_layer(timeline: &Timeline) -> Layer {
    4901            2 :         let layers = timeline.layers.read().await;
    4902            2 :         let desc = layers
    4903            2 :             .layer_map()
    4904            2 :             .iter_historic_layers()
    4905            2 :             .next()
    4906            2 :             .expect("must find one layer to evict");
    4907            2 : 
    4908            2 :         layers.get_from_desc(&desc)
    4909            2 :     }
    4910              : }
        

Generated by: LCOV version 2.1-beta