LCOV - code coverage report
Current view: top level - pageserver/src/tenant - timeline.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 88.8 % 2955 2623
Test Date: 2023-09-06 10:18:01 Functions: 71.8 % 372 267

            Line data    Source code
       1              : pub mod delete;
       2              : mod eviction_task;
       3              : mod init;
       4              : pub mod layer_manager;
       5              : mod logical_size;
       6              : pub mod span;
       7              : pub mod uninit;
       8              : mod walreceiver;
       9              : 
      10              : use anyhow::{anyhow, bail, ensure, Context, Result};
      11              : use bytes::Bytes;
      12              : use fail::fail_point;
      13              : use futures::StreamExt;
      14              : use itertools::Itertools;
      15              : use pageserver_api::models::{
      16              :     DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
      17              :     DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus,
      18              :     TimelineState,
      19              : };
      20              : use remote_storage::GenericRemoteStorage;
      21              : use serde_with::serde_as;
      22              : use storage_broker::BrokerClientChannel;
      23              : use tokio::runtime::Handle;
      24              : use tokio::sync::{oneshot, watch, TryAcquireError};
      25              : use tokio_util::sync::CancellationToken;
      26              : use tracing::*;
      27              : use utils::id::TenantTimelineId;
      28              : 
      29              : use std::cmp::{max, min, Ordering};
      30              : use std::collections::{BinaryHeap, HashMap, HashSet};
      31              : use std::ops::{Deref, Range};
      32              : use std::path::{Path, PathBuf};
      33              : use std::pin::pin;
      34              : use std::sync::atomic::Ordering as AtomicOrdering;
      35              : use std::sync::{Arc, Mutex, RwLock, Weak};
      36              : use std::time::{Duration, Instant, SystemTime};
      37              : 
      38              : use crate::context::{
      39              :     AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
      40              : };
      41              : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
      42              : use crate::tenant::storage_layer::delta_layer::DeltaEntry;
      43              : use crate::tenant::storage_layer::{
      44              :     DeltaLayerWriter, ImageLayerWriter, InMemoryLayer, LayerAccessStats, LayerFileName, RemoteLayer,
      45              : };
      46              : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
      47              : use crate::tenant::{
      48              :     layer_map::{LayerMap, SearchResult},
      49              :     metadata::{save_metadata, TimelineMetadata},
      50              :     par_fsync,
      51              :     storage_layer::{PersistentLayer, ValueReconstructResult, ValueReconstructState},
      52              : };
      53              : 
      54              : use crate::config::PageServerConf;
      55              : use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
      56              : use crate::metrics::{
      57              :     TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
      58              :     RECONSTRUCT_TIME, UNEXPECTED_ONDEMAND_DOWNLOADS,
      59              : };
      60              : use crate::pgdatadir_mapping::LsnForTimestamp;
      61              : use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
      62              : use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
      63              : use crate::tenant::config::{EvictionPolicy, TenantConfOpt};
      64              : use pageserver_api::reltag::RelTag;
      65              : 
      66              : use postgres_connection::PgConnectionConfig;
      67              : use postgres_ffi::to_pg_timestamp;
      68              : use utils::{
      69              :     completion,
      70              :     generation::Generation,
      71              :     id::{TenantId, TimelineId},
      72              :     lsn::{AtomicLsn, Lsn, RecordLsn},
      73              :     seqwait::SeqWait,
      74              :     simple_rcu::{Rcu, RcuReadGuard},
      75              : };
      76              : 
      77              : use crate::page_cache;
      78              : use crate::repository::GcResult;
      79              : use crate::repository::{Key, Value};
      80              : use crate::task_mgr;
      81              : use crate::task_mgr::TaskKind;
      82              : use crate::walredo::WalRedoManager;
      83              : use crate::ZERO_PAGE;
      84              : 
      85              : use self::delete::DeleteTimelineFlow;
      86              : pub(super) use self::eviction_task::EvictionTaskTenantState;
      87              : use self::eviction_task::EvictionTaskTimelineState;
      88              : use self::layer_manager::LayerManager;
      89              : use self::logical_size::LogicalSize;
      90              : use self::walreceiver::{WalReceiver, WalReceiverConf};
      91              : 
      92              : use super::config::TenantConf;
      93              : use super::debug_assert_current_span_has_tenant_and_timeline_id;
      94              : use super::remote_timeline_client::index::IndexPart;
      95              : use super::remote_timeline_client::RemoteTimelineClient;
      96              : use super::storage_layer::{
      97              :     AsLayerDesc, DeltaLayer, ImageLayer, LayerAccessStatsReset, PersistentLayerDesc,
      98              : };
      99              : 
     100           78 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
     101              : pub(super) enum FlushLoopState {
     102              :     NotStarted,
     103              :     Running {
     104              :         #[cfg(test)]
     105              :         expect_initdb_optimization: bool,
     106              :         #[cfg(test)]
     107              :         initdb_optimization_count: usize,
     108              :     },
     109              :     Exited,
     110              : }
     111              : 
     112              : /// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
     113            0 : #[derive(Debug, Clone, PartialEq, Eq)]
     114              : pub struct Hole {
     115              :     key_range: Range<Key>,
     116              :     coverage_size: usize,
     117              : }
     118              : 
     119              : impl Ord for Hole {
     120            4 :     fn cmp(&self, other: &Self) -> Ordering {
     121            4 :         other.coverage_size.cmp(&self.coverage_size) // inverse order
     122            4 :     }
     123              : }
     124              : 
     125              : impl PartialOrd for Hole {
     126            4 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     127            4 :         Some(self.cmp(other))
     128            4 :     }
     129              : }
     130              : 
     131              : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
     132              : /// Can be removed after all refactors are done.
     133          290 : fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
     134          290 :     drop(rlock)
     135          290 : }
     136              : 
     137              : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
     138              : /// Can be removed after all refactors are done.
     139         2831 : fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
     140         2831 :     drop(rlock)
     141         2831 : }
     142              : 
     143              : /// The outward-facing resources required to build a Timeline
     144              : pub struct TimelineResources {
     145              :     pub remote_client: Option<RemoteTimelineClient>,
     146              : }
     147              : 
     148              : pub struct Timeline {
     149              :     conf: &'static PageServerConf,
     150              :     tenant_conf: Arc<RwLock<TenantConfOpt>>,
     151              : 
     152              :     myself: Weak<Self>,
     153              : 
     154              :     pub tenant_id: TenantId,
     155              :     pub timeline_id: TimelineId,
     156              : 
     157              :     /// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
     158              :     /// Never changes for the lifetime of this [`Timeline`] object.
     159              :     generation: Generation,
     160              : 
     161              :     pub pg_version: u32,
     162              : 
     163              :     /// The tuple has two elements.
     164              :     /// 1. `LayerFileManager` keeps track of the various physical representations of the layer files (inmem, local, remote).
     165              :     /// 2. `LayerMap`, the acceleration data structure for `get_reconstruct_data`.
     166              :     ///
     167              :     /// `LayerMap` maps out the `(PAGE,LSN) / (KEY,LSN)` space, which is composed of `(KeyRange, LsnRange)` rectangles.
     168              :     /// We describe these rectangles through the `PersistentLayerDesc` struct.
     169              :     ///
     170              :     /// When we want to reconstruct a page, we first find the `PersistentLayerDesc`'s that we need for page reconstruction,
     171              :     /// using `LayerMap`. Then, we use `LayerFileManager` to get the `PersistentLayer`'s that correspond to the
     172              :     /// `PersistentLayerDesc`'s.
     173              :     ///
     174              :     /// Hence, it's important to keep things coherent. The `LayerFileManager` must always have an entry for all
     175              :     /// `PersistentLayerDesc`'s in the `LayerMap`. If it doesn't, `LayerFileManager::get_from_desc` will panic at
     176              :     /// runtime, e.g., during page reconstruction.
     177              :     ///
     178              :     /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
     179              :     /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
     180              :     pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
     181              : 
     182              :     /// Set of key ranges which should be covered by image layers to
     183              :     /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
     184              :     /// It is used by compaction task when it checks if new image layer should be created.
     185              :     /// Newly created image layer doesn't help to remove the delta layer, until the
     186              :     /// newly created image layer falls off the PITR horizon. So on next GC cycle,
     187              :     /// gc_timeline may still want the new image layer to be created. To avoid redundant
     188              :     /// image layers creation we should check if image layer exists but beyond PITR horizon.
     189              :     /// This is why we need remember GC cutoff LSN.
     190              :     ///
     191              :     wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
     192              : 
     193              :     last_freeze_at: AtomicLsn,
     194              :     // Atomic would be more appropriate here.
     195              :     last_freeze_ts: RwLock<Instant>,
     196              : 
     197              :     // WAL redo manager
     198              :     walredo_mgr: Arc<dyn WalRedoManager + Sync + Send>,
     199              : 
     200              :     /// Remote storage client.
     201              :     /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
     202              :     pub remote_client: Option<Arc<RemoteTimelineClient>>,
     203              : 
     204              :     // What page versions do we hold in the repository? If we get a
     205              :     // request > last_record_lsn, we need to wait until we receive all
     206              :     // the WAL up to the request. The SeqWait provides functions for
     207              :     // that. TODO: If we get a request for an old LSN, such that the
     208              :     // versions have already been garbage collected away, we should
     209              :     // throw an error, but we don't track that currently.
     210              :     //
     211              :     // last_record_lsn.load().last points to the end of last processed WAL record.
     212              :     //
     213              :     // We also remember the starting point of the previous record in
     214              :     // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
     215              :     // first WAL record when the node is started up. But here, we just
     216              :     // keep track of it.
     217              :     last_record_lsn: SeqWait<RecordLsn, Lsn>,
     218              : 
     219              :     // All WAL records have been processed and stored durably on files on
     220              :     // local disk, up to this LSN. On crash and restart, we need to re-process
     221              :     // the WAL starting from this point.
     222              :     //
     223              :     // Some later WAL records might have been processed and also flushed to disk
     224              :     // already, so don't be surprised to see some, but there's no guarantee on
     225              :     // them yet.
     226              :     disk_consistent_lsn: AtomicLsn,
     227              : 
     228              :     // Parent timeline that this timeline was branched from, and the LSN
     229              :     // of the branch point.
     230              :     ancestor_timeline: Option<Arc<Timeline>>,
     231              :     ancestor_lsn: Lsn,
     232              : 
     233              :     pub(super) metrics: TimelineMetrics,
     234              : 
     235              :     /// Ensures layers aren't frozen by checkpointer between
     236              :     /// [`Timeline::get_layer_for_write`] and layer reads.
     237              :     /// Locked automatically by [`TimelineWriter`] and checkpointer.
     238              :     /// Must always be acquired before the layer map/individual layer lock
     239              :     /// to avoid deadlock.
     240              :     write_lock: tokio::sync::Mutex<()>,
     241              : 
     242              :     /// Used to avoid multiple `flush_loop` tasks running
     243              :     pub(super) flush_loop_state: Mutex<FlushLoopState>,
     244              : 
     245              :     /// layer_flush_start_tx can be used to wake up the layer-flushing task.
     246              :     /// The value is a counter, incremented every time a new flush cycle is requested.
     247              :     /// The flush cycle counter is sent back on the layer_flush_done channel when
     248              :     /// the flush finishes. You can use that to wait for the flush to finish.
     249              :     layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
     250              :     /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
     251              :     layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>,
     252              : 
     253              :     /// Layer removal lock.
     254              :     /// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
     255              :     /// This lock is acquired in [`Timeline::gc`] and [`Timeline::compact`].
     256              :     /// This is an `Arc<Mutex>` lock because we need an owned
     257              :     /// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
     258              :     /// Note that [`DeleteTimelineFlow`] uses `delete_progress` field.
     259              :     pub(super) layer_removal_cs: Arc<tokio::sync::Mutex<()>>,
     260              : 
     261              :     // Needed to ensure that we can't create a branch at a point that was already garbage collected
     262              :     pub latest_gc_cutoff_lsn: Rcu<Lsn>,
     263              : 
     264              :     // List of child timelines and their branch points. This is needed to avoid
     265              :     // garbage collecting data that is still needed by the child timelines.
     266              :     pub gc_info: std::sync::RwLock<GcInfo>,
     267              : 
     268              :     // It may change across major versions so for simplicity
     269              :     // keep it after running initdb for a timeline.
     270              :     // It is needed in checks when we want to error on some operations
     271              :     // when they are requested for pre-initdb lsn.
     272              :     // It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn",
     273              :     // though let's keep them both for better error visibility.
     274              :     pub initdb_lsn: Lsn,
     275              : 
     276              :     /// When did we last calculate the partitioning?
     277              :     partitioning: Mutex<(KeyPartitioning, Lsn)>,
     278              : 
     279              :     /// Configuration: how often should the partitioning be recalculated.
     280              :     repartition_threshold: u64,
     281              : 
     282              :     /// Current logical size of the "datadir", at the last LSN.
     283              :     current_logical_size: LogicalSize,
     284              : 
     285              :     /// Information about the last processed message by the WAL receiver,
     286              :     /// or None if WAL receiver has not received anything for this timeline
     287              :     /// yet.
     288              :     pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
     289              :     pub walreceiver: Mutex<Option<WalReceiver>>,
     290              : 
     291              :     /// Relation size cache
     292              :     pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
     293              : 
     294              :     download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
     295              : 
     296              :     state: watch::Sender<TimelineState>,
     297              : 
     298              :     /// Prevent two tasks from deleting the timeline at the same time. If held, the
     299              :     /// timeline is being deleted. If 'true', the timeline has already been deleted.
     300              :     pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
     301              : 
     302              :     eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
     303              : 
     304              :     /// Barrier to wait before doing initial logical size calculation. Used only during startup.
     305              :     initial_logical_size_can_start: Option<completion::Barrier>,
     306              : 
     307              :     /// Completion shared between all timelines loaded during startup; used to delay heavier
     308              :     /// background tasks until some logical sizes have been calculated.
     309              :     initial_logical_size_attempt: Mutex<Option<completion::Completion>>,
     310              : 
     311              :     /// Load or creation time information about the disk_consistent_lsn and when the loading
     312              :     /// happened. Used for consumption metrics.
     313              :     pub(crate) loaded_at: (Lsn, SystemTime),
     314              : }
     315              : 
     316              : pub struct WalReceiverInfo {
     317              :     pub wal_source_connconf: PgConnectionConfig,
     318              :     pub last_received_msg_lsn: Lsn,
     319              :     pub last_received_msg_ts: u128,
     320              : }
     321              : 
     322              : ///
     323              : /// Information about how much history needs to be retained, needed by
     324              : /// Garbage Collection.
     325              : ///
     326              : pub struct GcInfo {
     327              :     /// Specific LSNs that are needed.
     328              :     ///
     329              :     /// Currently, this includes all points where child branches have
     330              :     /// been forked off from. In the future, could also include
     331              :     /// explicit user-defined snapshot points.
     332              :     pub retain_lsns: Vec<Lsn>,
     333              : 
     334              :     /// In addition to 'retain_lsns', keep everything newer than this
     335              :     /// point.
     336              :     ///
     337              :     /// This is calculated by subtracting 'gc_horizon' setting from
     338              :     /// last-record LSN
     339              :     ///
     340              :     /// FIXME: is this inclusive or exclusive?
     341              :     pub horizon_cutoff: Lsn,
     342              : 
     343              :     /// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this
     344              :     /// point.
     345              :     ///
     346              :     /// This is calculated by finding a number such that a record is needed for PITR
     347              :     /// if only if its LSN is larger than 'pitr_cutoff'.
     348              :     pub pitr_cutoff: Lsn,
     349              : }
     350              : 
     351              : /// An error happened in a get() operation.
     352           29 : #[derive(thiserror::Error)]
     353              : pub enum PageReconstructError {
     354              :     #[error(transparent)]
     355              :     Other(#[from] anyhow::Error),
     356              : 
     357              :     /// The operation would require downloading a layer that is missing locally.
     358              :     NeedsDownload(TenantTimelineId, LayerFileName),
     359              : 
     360              :     /// The operation was cancelled
     361              :     Cancelled,
     362              : 
     363              :     /// The ancestor of this is being stopped
     364              :     AncestorStopping(TimelineId),
     365              : 
     366              :     /// An error happened replaying WAL records
     367              :     #[error(transparent)]
     368              :     WalRedo(#[from] crate::walredo::WalRedoError),
     369              : }
     370              : 
     371              : impl std::fmt::Debug for PageReconstructError {
     372            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
     373            0 :         match self {
     374            0 :             Self::Other(err) => err.fmt(f),
     375            0 :             Self::NeedsDownload(tenant_timeline_id, layer_file_name) => {
     376            0 :                 write!(
     377            0 :                     f,
     378            0 :                     "layer {}/{} needs download",
     379            0 :                     tenant_timeline_id,
     380            0 :                     layer_file_name.file_name()
     381            0 :                 )
     382              :             }
     383            0 :             Self::Cancelled => write!(f, "cancelled"),
     384            0 :             Self::AncestorStopping(timeline_id) => {
     385            0 :                 write!(f, "ancestor timeline {timeline_id} is being stopped")
     386              :             }
     387            0 :             Self::WalRedo(err) => err.fmt(f),
     388              :         }
     389            0 :     }
     390              : }
     391              : 
     392              : impl std::fmt::Display for PageReconstructError {
     393           18 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
     394           18 :         match self {
     395           17 :             Self::Other(err) => err.fmt(f),
     396            0 :             Self::NeedsDownload(tenant_timeline_id, layer_file_name) => {
     397            0 :                 write!(
     398            0 :                     f,
     399            0 :                     "layer {}/{} needs download",
     400            0 :                     tenant_timeline_id,
     401            0 :                     layer_file_name.file_name()
     402            0 :                 )
     403              :             }
     404            0 :             Self::Cancelled => write!(f, "cancelled"),
     405            1 :             Self::AncestorStopping(timeline_id) => {
     406            1 :                 write!(f, "ancestor timeline {timeline_id} is being stopped")
     407              :             }
     408            0 :             Self::WalRedo(err) => err.fmt(f),
     409              :         }
     410           18 :     }
     411              : }
     412              : 
     413            0 : #[derive(Clone, Copy)]
     414              : pub enum LogicalSizeCalculationCause {
     415              :     Initial,
     416              :     ConsumptionMetricsSyntheticSize,
     417              :     EvictionTaskImitation,
     418              :     TenantSizeHandler,
     419              : }
     420              : 
     421              : /// Public interface functions
     422              : impl Timeline {
     423              :     /// Get the LSN where this branch was created
     424         2820 :     pub fn get_ancestor_lsn(&self) -> Lsn {
     425         2820 :         self.ancestor_lsn
     426         2820 :     }
     427              : 
     428              :     /// Get the ancestor's timeline id
     429         5176 :     pub fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
     430         5176 :         self.ancestor_timeline
     431         5176 :             .as_ref()
     432         5176 :             .map(|ancestor| ancestor.timeline_id)
     433         5176 :     }
     434              : 
     435              :     /// Lock and get timeline's GC cuttof
     436      4604171 :     pub fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
     437      4604171 :         self.latest_gc_cutoff_lsn.read()
     438      4604171 :     }
     439              : 
     440              :     /// Look up given page version.
     441              :     ///
     442              :     /// If a remote layer file is needed, it is downloaded as part of this
     443              :     /// call.
     444              :     ///
     445              :     /// NOTE: It is considered an error to 'get' a key that doesn't exist. The
     446              :     /// abstraction above this needs to store suitable metadata to track what
     447              :     /// data exists with what keys, in separate metadata entries. If a
     448              :     /// non-existent key is requested, we may incorrectly return a value from
     449              :     /// an ancestor branch, for example, or waste a lot of cycles chasing the
     450              :     /// non-existing key.
     451              :     ///
     452      7262298 :     pub async fn get(
     453      7262298 :         &self,
     454      7262298 :         key: Key,
     455      7262298 :         lsn: Lsn,
     456      7262298 :         ctx: &RequestContext,
     457      7262300 :     ) -> Result<Bytes, PageReconstructError> {
     458      7262300 :         if !lsn.is_valid() {
     459            0 :             return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
     460      7262300 :         }
     461              : 
     462              :         // XXX: structured stats collection for layer eviction here.
     463            0 :         trace!(
     464            0 :             "get page request for {}@{} from task kind {:?}",
     465            0 :             key,
     466            0 :             lsn,
     467            0 :             ctx.task_kind()
     468            0 :         );
     469              : 
     470              :         // Check the page cache. We will get back the most recent page with lsn <= `lsn`.
     471              :         // The cached image can be returned directly if there is no WAL between the cached image
     472              :         // and requested LSN. The cached image can also be used to reduce the amount of WAL needed
     473              :         // for redo.
     474      7262300 :         let cached_page_img = match self.lookup_cached_page(&key, lsn).await {
     475      1740445 :             Some((cached_lsn, cached_img)) => {
     476      1740445 :                 match cached_lsn.cmp(&lsn) {
     477      1740405 :                     Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
     478              :                     Ordering::Equal => {
     479           40 :                         MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
     480           40 :                         return Ok(cached_img); // exact LSN match, return the image
     481              :                     }
     482              :                     Ordering::Greater => {
     483            0 :                         unreachable!("the returned lsn should never be after the requested lsn")
     484              :                     }
     485              :                 }
     486      1740405 :                 Some((cached_lsn, cached_img))
     487              :             }
     488      5521855 :             None => None,
     489              :         };
     490              : 
     491      7262260 :         let mut reconstruct_state = ValueReconstructState {
     492      7262260 :             records: Vec::new(),
     493      7262260 :             img: cached_page_img,
     494      7262260 :         };
     495      7262260 : 
     496      7262260 :         let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
     497      7262260 :         self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
     498      9754894 :             .await?;
     499      7262204 :         timer.stop_and_record();
     500      7262204 : 
     501      7262204 :         RECONSTRUCT_TIME
     502      7262204 :             .observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
     503          734 :             .await
     504      7262263 :     }
     505              : 
     506              :     /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
     507     89040437 :     pub fn get_last_record_lsn(&self) -> Lsn {
     508     89040437 :         self.last_record_lsn.load().last
     509     89040437 :     }
     510              : 
     511         2337 :     pub fn get_prev_record_lsn(&self) -> Lsn {
     512         2337 :         self.last_record_lsn.load().prev
     513         2337 :     }
     514              : 
     515              :     /// Atomically get both last and prev.
     516         1029 :     pub fn get_last_record_rlsn(&self) -> RecordLsn {
     517         1029 :         self.last_record_lsn.load()
     518         1029 :     }
     519              : 
     520       738701 :     pub fn get_disk_consistent_lsn(&self) -> Lsn {
     521       738701 :         self.disk_consistent_lsn.load()
     522       738701 :     }
     523              : 
     524              :     pub fn get_remote_consistent_lsn(&self) -> Option<Lsn> {
     525       736499 :         if let Some(remote_client) = &self.remote_client {
     526       116151 :             remote_client.last_uploaded_consistent_lsn()
     527              :         } else {
     528       620348 :             None
     529              :         }
     530       736499 :     }
     531              : 
     532              :     /// The sum of the file size of all historic layers in the layer map.
     533              :     /// This method makes no distinction between local and remote layers.
     534              :     /// Hence, the result **does not represent local filesystem usage**.
     535         2781 :     pub async fn layer_size_sum(&self) -> u64 {
     536         2781 :         let guard = self.layers.read().await;
     537         2781 :         let layer_map = guard.layer_map();
     538         2781 :         let mut size = 0;
     539        64575 :         for l in layer_map.iter_historic_layers() {
     540        64575 :             size += l.file_size();
     541        64575 :         }
     542         2781 :         size
     543         2781 :     }
     544              : 
     545           46 :     pub fn resident_physical_size(&self) -> u64 {
     546           46 :         self.metrics.resident_physical_size_gauge.get()
     547           46 :     }
     548              : 
     549              :     ///
     550              :     /// Wait until WAL has been received and processed up to this LSN.
     551              :     ///
     552              :     /// You should call this before any of the other get_* or list_* functions. Calling
     553              :     /// those functions with an LSN that has been processed yet is an error.
     554              :     ///
     555      1295744 :     pub async fn wait_lsn(
     556      1295744 :         &self,
     557      1295744 :         lsn: Lsn,
     558      1295744 :         _ctx: &RequestContext, /* Prepare for use by cancellation */
     559      1295745 :     ) -> anyhow::Result<()> {
     560      1295745 :         anyhow::ensure!(self.is_active(), "Cannot wait for Lsn on inactive timeline");
     561              : 
     562              :         // This should never be called from the WAL receiver, because that could lead
     563              :         // to a deadlock.
     564      1295745 :         anyhow::ensure!(
     565      1295745 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
     566            0 :             "wait_lsn cannot be called in WAL receiver"
     567              :         );
     568      1295745 :         anyhow::ensure!(
     569      1295745 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
     570            0 :             "wait_lsn cannot be called in WAL receiver"
     571              :         );
     572      1295745 :         anyhow::ensure!(
     573      1295745 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
     574            0 :             "wait_lsn cannot be called in WAL receiver"
     575              :         );
     576              : 
     577      1295745 :         let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
     578      1295745 : 
     579      1295745 :         match self
     580      1295745 :             .last_record_lsn
     581      1295745 :             .wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
     582       169869 :             .await
     583              :         {
     584      1295741 :             Ok(()) => Ok(()),
     585            4 :             Err(e) => {
     586            4 :                 // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
     587            4 :                 drop(_timer);
     588            4 :                 let walreceiver_status = {
     589            4 :                     match &*self.walreceiver.lock().unwrap() {
     590            0 :                         None => "stopping or stopped".to_string(),
     591            4 :                         Some(walreceiver) => match walreceiver.status() {
     592            3 :                             Some(status) => status.to_human_readable_string(),
     593            1 :                             None => "Not active".to_string(),
     594              :                         },
     595              :                     }
     596              :                 };
     597            4 :                 Err(anyhow::Error::new(e).context({
     598            4 :                     format!(
     599            4 :                         "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
     600            4 :                         lsn,
     601            4 :                         self.get_last_record_lsn(),
     602            4 :                         self.get_disk_consistent_lsn(),
     603            4 :                         walreceiver_status,
     604            4 :                     )
     605            4 :                 }))
     606              :             }
     607              :         }
     608      1295745 :     }
     609              : 
     610              :     /// Check that it is valid to request operations with that lsn.
     611          582 :     pub fn check_lsn_is_in_scope(
     612          582 :         &self,
     613          582 :         lsn: Lsn,
     614          582 :         latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
     615          582 :     ) -> anyhow::Result<()> {
     616          582 :         ensure!(
     617          582 :             lsn >= **latest_gc_cutoff_lsn,
     618            9 :             "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
     619            9 :             lsn,
     620            9 :             **latest_gc_cutoff_lsn,
     621              :         );
     622          573 :         Ok(())
     623          582 :     }
     624              : 
     625              :     /// Flush to disk all data that was written with the put_* functions
     626         7192 :     #[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
     627              :     pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
     628              :         self.freeze_inmem_layer(false).await;
     629              :         self.flush_frozen_layers_and_wait().await
     630              :     }
     631              : 
     632              :     /// Outermost timeline compaction operation; downloads needed layers.
     633         1401 :     pub async fn compact(
     634         1401 :         self: &Arc<Self>,
     635         1401 :         cancel: &CancellationToken,
     636         1401 :         ctx: &RequestContext,
     637         1401 :     ) -> anyhow::Result<()> {
     638              :         const ROUNDS: usize = 2;
     639              : 
     640              :         static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
     641          238 :             once_cell::sync::Lazy::new(|| {
     642          238 :                 let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
     643          238 :                 let permits = usize::max(
     644          238 :                     1,
     645          238 :                     // while a lot of the work is done on spawn_blocking, we still do
     646          238 :                     // repartitioning in the async context. this should give leave us some workers
     647          238 :                     // unblocked to be blocked on other work, hopefully easing any outside visible
     648          238 :                     // effects of restarts.
     649          238 :                     //
     650          238 :                     // 6/8 is a guess; previously we ran with unlimited 8 and more from
     651          238 :                     // spawn_blocking.
     652          238 :                     (total_threads * 3).checked_div(4).unwrap_or(0),
     653          238 :                 );
     654          238 :                 assert_ne!(permits, 0, "we will not be adding in permits later");
     655          238 :                 assert!(
     656          238 :                     permits < total_threads,
     657            0 :                     "need threads avail for shorter work"
     658              :                 );
     659          238 :                 tokio::sync::Semaphore::new(permits)
     660          238 :             });
     661              : 
     662              :         // this wait probably never needs any "long time spent" logging, because we already nag if
     663              :         // compaction task goes over it's period (20s) which is quite often in production.
     664         1401 :         let _permit = tokio::select! {
     665         1401 :             permit = CONCURRENT_COMPACTIONS.acquire() => {
     666              :                 permit
     667              :             },
     668              :             _ = cancel.cancelled() => {
     669              :                 return Ok(());
     670              :             }
     671              :         };
     672              : 
     673         1401 :         let last_record_lsn = self.get_last_record_lsn();
     674         1401 : 
     675         1401 :         // Last record Lsn could be zero in case the timeline was just created
     676         1401 :         if !last_record_lsn.is_valid() {
     677            0 :             warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
     678            0 :             return Ok(());
     679         1401 :         }
     680              : 
     681              :         // retry two times to allow first round to find layers which need to be downloaded, then
     682              :         // download them, then retry compaction
     683         1402 :         for round in 0..ROUNDS {
     684              :             // should we error out with the most specific error?
     685         1402 :             let last_round = round == ROUNDS - 1;
     686              : 
     687      1078251 :             let res = self.compact_inner(ctx).await;
     688              : 
     689              :             // If `create_image_layers' or `compact_level0` scheduled any
     690              :             // uploads or deletions, but didn't update the index file yet,
     691              :             // do it now.
     692              :             //
     693              :             // This isn't necessary for correctness, the remote state is
     694              :             // consistent without the uploads and deletions, and we would
     695              :             // update the index file on next flush iteration too. But it
     696              :             // could take a while until that happens.
     697              :             //
     698              :             // Additionally, only do this once before we return from this function.
     699         1392 :             if last_round || res.is_ok() {
     700         1390 :                 if let Some(remote_client) = &self.remote_client {
     701          633 :                     remote_client.schedule_index_upload_for_file_changes()?;
     702          757 :                 }
     703            2 :             }
     704              : 
     705            1 :             let rls = match res {
     706         1390 :                 Ok(()) => return Ok(()),
     707            1 :                 Err(CompactionError::DownloadRequired(rls)) if !last_round => {
     708            1 :                     // this can be done at most one time before exiting, waiting
     709            1 :                     rls
     710              :                 }
     711            0 :                 Err(CompactionError::DownloadRequired(rls)) => {
     712            0 :                     anyhow::bail!("Compaction requires downloading multiple times (last was {} layers), possibly battling against eviction", rls.len())
     713              :                 }
     714              :                 Err(CompactionError::ShuttingDown) => {
     715            0 :                     return Ok(());
     716              :                 }
     717            1 :                 Err(CompactionError::Other(e)) => {
     718            1 :                     return Err(e);
     719              :                 }
     720              :             };
     721              : 
     722              :             // this path can be visited in the second round of retrying, if first one found that we
     723              :             // must first download some remote layers
     724            1 :             let total = rls.len();
     725            1 : 
     726            1 :             let mut downloads = rls
     727            1 :                 .into_iter()
     728            3 :                 .map(|rl| self.download_remote_layer(rl))
     729            1 :                 .collect::<futures::stream::FuturesUnordered<_>>();
     730            1 : 
     731            1 :             let mut failed = 0;
     732              : 
     733              :             loop {
     734            4 :                 tokio::select! {
     735              :                     _ = cancel.cancelled() => anyhow::bail!("Cancelled while downloading remote layers"),
     736            4 :                     res = downloads.next() => {
     737              :                         match res {
     738              :                             Some(Ok(())) => {},
     739              :                             Some(Err(e)) => {
     740            0 :                                 warn!("Downloading remote layer for compaction failed: {e:#}");
     741              :                                 failed += 1;
     742              :                             }
     743              :                             None => break,
     744              :                         }
     745              :                     }
     746              :                 }
     747              :             }
     748              : 
     749            1 :             if failed != 0 {
     750            0 :                 anyhow::bail!("{failed} out of {total} layers failed to download, retrying later");
     751            1 :             }
     752              : 
     753              :             // if everything downloaded fine, lets try again
     754              :         }
     755              : 
     756            0 :         unreachable!("retry loop exits")
     757         1391 :     }
     758              : 
     759              :     /// Compaction which might need to be retried after downloading remote layers.
     760         1402 :     async fn compact_inner(self: &Arc<Self>, ctx: &RequestContext) -> Result<(), CompactionError> {
     761              :         //
     762              :         // High level strategy for compaction / image creation:
     763              :         //
     764              :         // 1. First, calculate the desired "partitioning" of the
     765              :         // currently in-use key space. The goal is to partition the
     766              :         // key space into roughly fixed-size chunks, but also take into
     767              :         // account any existing image layers, and try to align the
     768              :         // chunk boundaries with the existing image layers to avoid
     769              :         // too much churn. Also try to align chunk boundaries with
     770              :         // relation boundaries.  In principle, we don't know about
     771              :         // relation boundaries here, we just deal with key-value
     772              :         // pairs, and the code in pgdatadir_mapping.rs knows how to
     773              :         // map relations into key-value pairs. But in practice we know
     774              :         // that 'field6' is the block number, and the fields 1-5
     775              :         // identify a relation. This is just an optimization,
     776              :         // though.
     777              :         //
     778              :         // 2. Once we know the partitioning, for each partition,
     779              :         // decide if it's time to create a new image layer. The
     780              :         // criteria is: there has been too much "churn" since the last
     781              :         // image layer? The "churn" is fuzzy concept, it's a
     782              :         // combination of too many delta files, or too much WAL in
     783              :         // total in the delta file. Or perhaps: if creating an image
     784              :         // file would allow to delete some older files.
     785              :         //
     786              :         // 3. After that, we compact all level0 delta files if there
     787              :         // are too many of them.  While compacting, we also garbage
     788              :         // collect any page versions that are no longer needed because
     789              :         // of the new image layers we created in step 2.
     790              :         //
     791              :         // TODO: This high level strategy hasn't been implemented yet.
     792              :         // Below are functions compact_level0() and create_image_layers()
     793              :         // but they are a bit ad hoc and don't quite work like it's explained
     794              :         // above. Rewrite it.
     795         1402 :         let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
     796              :         // Is the timeline being deleted?
     797         1401 :         if self.is_stopping() {
     798            0 :             trace!("Dropping out of compaction on timeline shutdown");
     799            0 :             return Err(CompactionError::ShuttingDown);
     800         1401 :         }
     801         1401 : 
     802         1401 :         let target_file_size = self.get_checkpoint_distance();
     803         1401 : 
     804         1401 :         // Define partitioning schema if needed
     805         1401 : 
     806         1401 :         match self
     807         1401 :             .repartition(
     808         1401 :                 self.get_last_record_lsn(),
     809         1401 :                 self.get_compaction_target_size(),
     810         1401 :                 ctx,
     811         1401 :             )
     812       383346 :             .await
     813              :         {
     814         1399 :             Ok((partitioning, lsn)) => {
     815         1399 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     816         1399 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     817         1399 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     818         1399 :                     .build();
     819              : 
     820              :                 // 2. Create new image layers for partitions that have been modified
     821              :                 // "enough".
     822         1399 :                 let layer_paths_to_upload = self
     823         1399 :                     .create_image_layers(&partitioning, lsn, false, &image_ctx)
     824       429215 :                     .await
     825         1398 :                     .map_err(anyhow::Error::from)?;
     826         1398 :                 if let Some(remote_client) = &self.remote_client {
     827         1655 :                     for (path, layer_metadata) in layer_paths_to_upload {
     828         1015 :                         remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
     829              :                     }
     830          758 :                 }
     831              : 
     832              :                 // 3. Compact
     833         1398 :                 let timer = self.metrics.compact_time_histo.start_timer();
     834         1398 :                 self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx)
     835       265683 :                     .await?;
     836         1390 :                 timer.stop_and_record();
     837              :             }
     838            0 :             Err(err) => {
     839              :                 // no partitioning? This is normal, if the timeline was just created
     840              :                 // as an empty timeline. Also in unit tests, when we use the timeline
     841              :                 // as a simple key-value store, ignoring the datadir layout. Log the
     842              :                 // error but continue.
     843            0 :                 error!("could not compact, repartitioning keyspace failed: {err:?}");
     844              :             }
     845              :         };
     846              : 
     847         1390 :         Ok(())
     848         1392 :     }
     849              : 
     850              :     /// Mutate the timeline with a [`TimelineWriter`].
     851     74268456 :     pub async fn writer(&self) -> TimelineWriter<'_> {
     852              :         TimelineWriter {
     853     74268522 :             tl: self,
     854     74268522 :             _write_guard: self.write_lock.lock().await,
     855              :         }
     856     74268522 :     }
     857              : 
     858              :     /// Retrieve current logical size of the timeline.
     859              :     ///
     860              :     /// The size could be lagging behind the actual number, in case
     861              :     /// the initial size calculation has not been run (gets triggered on the first size access).
     862              :     ///
     863              :     /// return size and boolean flag that shows if the size is exact
     864       736545 :     pub fn get_current_logical_size(
     865       736545 :         self: &Arc<Self>,
     866       736545 :         ctx: &RequestContext,
     867       736545 :     ) -> anyhow::Result<(u64, bool)> {
     868       736545 :         let current_size = self.current_logical_size.current_size()?;
     869       736545 :         debug!("Current size: {current_size:?}");
     870              : 
     871       736545 :         let mut is_exact = true;
     872       736545 :         let size = current_size.size();
     873          562 :         if let (CurrentLogicalSize::Approximate(_), Some(initial_part_end)) =
     874       736545 :             (current_size, self.current_logical_size.initial_part_end)
     875          562 :         {
     876          562 :             is_exact = false;
     877          562 :             self.try_spawn_size_init_task(initial_part_end, ctx);
     878       735983 :         }
     879              : 
     880       736545 :         Ok((size, is_exact))
     881       736545 :     }
     882              : 
     883              :     /// Check if more than 'checkpoint_distance' of WAL has been accumulated in
     884              :     /// the in-memory layer, and initiate flushing it if so.
     885              :     ///
     886              :     /// Also flush after a period of time without new data -- it helps
     887              :     /// safekeepers to regard pageserver as caught up and suspend activity.
     888       734162 :     pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
     889       734162 :         let last_lsn = self.get_last_record_lsn();
     890       733761 :         let open_layer_size = {
     891       734162 :             let guard = self.layers.read().await;
     892       734162 :             let layers = guard.layer_map();
     893       734162 :             let Some(open_layer) = layers.open_layer.as_ref() else {
     894          401 :                 return Ok(());
     895              :             };
     896       733761 :             open_layer.size().await?
     897              :         };
     898       733761 :         let last_freeze_at = self.last_freeze_at.load();
     899       733761 :         let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
     900       733761 :         let distance = last_lsn.widening_sub(last_freeze_at);
     901       733761 :         // Checkpointing the open layer can be triggered by layer size or LSN range.
     902       733761 :         // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
     903       733761 :         // we want to stay below that with a big margin.  The LSN distance determines how
     904       733761 :         // much WAL the safekeepers need to store.
     905       733761 :         if distance >= self.get_checkpoint_distance().into()
     906       731746 :             || open_layer_size > self.get_checkpoint_distance()
     907       728806 :             || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
     908              :         {
     909         4955 :             info!(
     910         4955 :                 "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
     911         4955 :                 distance,
     912         4955 :                 open_layer_size,
     913         4955 :                 last_freeze_ts.elapsed()
     914         4955 :             );
     915              : 
     916         4955 :             self.freeze_inmem_layer(true).await;
     917         4955 :             self.last_freeze_at.store(last_lsn);
     918         4955 :             *(self.last_freeze_ts.write().unwrap()) = Instant::now();
     919         4955 : 
     920         4955 :             // Wake up the layer flusher
     921         4955 :             self.flush_frozen_layers();
     922       728806 :         }
     923       733761 :         Ok(())
     924       734162 :     }
     925              : 
     926         1190 :     pub fn activate(
     927         1190 :         self: &Arc<Self>,
     928         1190 :         broker_client: BrokerClientChannel,
     929         1190 :         background_jobs_can_start: Option<&completion::Barrier>,
     930         1190 :         ctx: &RequestContext,
     931         1190 :     ) {
     932         1190 :         self.launch_wal_receiver(ctx, broker_client);
     933         1190 :         self.set_state(TimelineState::Active);
     934         1190 :         self.launch_eviction_task(background_jobs_can_start);
     935         1190 :     }
     936              : 
     937         1052 :     #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
     938              :     pub async fn shutdown(self: &Arc<Self>, freeze_and_flush: bool) {
     939              :         debug_assert_current_span_has_tenant_and_timeline_id();
     940              : 
     941              :         // prevent writes to the InMemoryLayer
     942              :         task_mgr::shutdown_tasks(
     943              :             Some(TaskKind::WalReceiverManager),
     944              :             Some(self.tenant_id),
     945              :             Some(self.timeline_id),
     946              :         )
     947              :         .await;
     948              : 
     949              :         // now all writers to InMemory layer are gone, do the final flush if requested
     950              :         if freeze_and_flush {
     951              :             match self.freeze_and_flush().await {
     952              :                 Ok(()) => {}
     953              :                 Err(e) => {
     954           78 :                     warn!("failed to freeze and flush: {e:#}");
     955              :                     return; // TODO: should probably drain remote timeline client anyways?
     956              :                 }
     957              :             }
     958              : 
     959              :             // drain the upload queue
     960              :             let res = if let Some(client) = self.remote_client.as_ref() {
     961              :                 // if we did not wait for completion here, it might be our shutdown process
     962              :                 // didn't wait for remote uploads to complete at all, as new tasks can forever
     963              :                 // be spawned.
     964              :                 //
     965              :                 // what is problematic is the shutting down of RemoteTimelineClient, because
     966              :                 // obviously it does not make sense to stop while we wait for it, but what
     967              :                 // about corner cases like s3 suddenly hanging up?
     968              :                 client.wait_completion().await
     969              :             } else {
     970              :                 Ok(())
     971              :             };
     972              : 
     973              :             if let Err(e) = res {
     974            0 :                 warn!("failed to await for frozen and flushed uploads: {e:#}");
     975              :             }
     976              :         }
     977              :     }
     978              : 
     979              :     pub fn set_state(&self, new_state: TimelineState) {
     980         2090 :         match (self.current_state(), new_state) {
     981         2090 :             (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
     982          170 :                 info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
     983              :             }
     984            0 :             (st, TimelineState::Loading) => {
     985            0 :                 error!("ignoring transition from {st:?} into Loading state");
     986              :             }
     987           19 :             (TimelineState::Broken { .. }, new_state) => {
     988           19 :                 error!("Ignoring state update {new_state:?} for broken timeline");
     989              :             }
     990              :             (TimelineState::Stopping, TimelineState::Active) => {
     991            0 :                 error!("Not activating a Stopping timeline");
     992              :             }
     993         1901 :             (_, new_state) => {
     994         1331 :                 if matches!(
     995         1901 :                     new_state,
     996              :                     TimelineState::Stopping | TimelineState::Broken { .. }
     997          570 :                 ) {
     998          570 :                     // drop the completion guard, if any; it might be holding off the completion
     999          570 :                     // forever needlessly
    1000          570 :                     self.initial_logical_size_attempt
    1001          570 :                         .lock()
    1002          570 :                         .unwrap_or_else(|e| e.into_inner())
    1003          570 :                         .take();
    1004         1331 :                 }
    1005         1901 :                 self.state.send_replace(new_state);
    1006              :             }
    1007              :         }
    1008         2090 :     }
    1009              : 
    1010           44 :     pub fn set_broken(&self, reason: String) {
    1011           44 :         let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
    1012           44 :         let broken_state = TimelineState::Broken {
    1013           44 :             reason,
    1014           44 :             backtrace: backtrace_str,
    1015           44 :         };
    1016           44 :         self.set_state(broken_state)
    1017           44 :     }
    1018              : 
    1019      2016136 :     pub fn current_state(&self) -> TimelineState {
    1020      2016136 :         self.state.borrow().clone()
    1021      2016136 :     }
    1022              : 
    1023          745 :     pub fn is_broken(&self) -> bool {
    1024          745 :         matches!(&*self.state.borrow(), TimelineState::Broken { .. })
    1025          745 :     }
    1026              : 
    1027      1302676 :     pub fn is_active(&self) -> bool {
    1028      1302676 :         self.current_state() == TimelineState::Active
    1029      1302676 :     }
    1030              : 
    1031         2547 :     pub fn is_stopping(&self) -> bool {
    1032         2547 :         self.current_state() == TimelineState::Stopping
    1033         2547 :     }
    1034              : 
    1035         1592 :     pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
    1036         1592 :         self.state.subscribe()
    1037         1592 :     }
    1038              : 
    1039      1069673 :     pub async fn wait_to_become_active(
    1040      1069673 :         &self,
    1041      1069673 :         _ctx: &RequestContext, // Prepare for use by cancellation
    1042      1069674 :     ) -> Result<(), TimelineState> {
    1043      1069674 :         let mut receiver = self.state.subscribe();
    1044      1069690 :         loop {
    1045      1069690 :             let current_state = receiver.borrow().clone();
    1046      1069690 :             match current_state {
    1047              :                 TimelineState::Loading => {
    1048           16 :                     receiver
    1049           16 :                         .changed()
    1050           16 :                         .await
    1051           16 :                         .expect("holding a reference to self");
    1052              :                 }
    1053              :                 TimelineState::Active { .. } => {
    1054      1069672 :                     return Ok(());
    1055              :                 }
    1056              :                 TimelineState::Broken { .. } | TimelineState::Stopping => {
    1057              :                     // There's no chance the timeline can transition back into ::Active
    1058            2 :                     return Err(current_state);
    1059              :                 }
    1060              :             }
    1061              :         }
    1062      1069674 :     }
    1063              : 
    1064          101 :     pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
    1065          101 :         let guard = self.layers.read().await;
    1066          101 :         let layer_map = guard.layer_map();
    1067          101 :         let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
    1068          101 :         if let Some(open_layer) = &layer_map.open_layer {
    1069           23 :             in_memory_layers.push(open_layer.info());
    1070           78 :         }
    1071          101 :         for frozen_layer in &layer_map.frozen_layers {
    1072            0 :             in_memory_layers.push(frozen_layer.info());
    1073            0 :         }
    1074              : 
    1075          101 :         let mut historic_layers = Vec::new();
    1076         2273 :         for historic_layer in layer_map.iter_historic_layers() {
    1077         2273 :             let historic_layer = guard.get_from_desc(&historic_layer);
    1078         2273 :             historic_layers.push(historic_layer.info(reset));
    1079         2273 :         }
    1080              : 
    1081          101 :         LayerMapInfo {
    1082          101 :             in_memory_layers,
    1083          101 :             historic_layers,
    1084          101 :         }
    1085          101 :     }
    1086              : 
    1087           24 :     #[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))]
    1088              :     pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
    1089              :         let Some(layer) = self.find_layer(layer_file_name).await else {
    1090              :             return Ok(None);
    1091              :         };
    1092              :         let Some(remote_layer) = layer.downcast_remote_layer() else {
    1093              :             return Ok(Some(false));
    1094              :         };
    1095              :         if self.remote_client.is_none() {
    1096              :             return Ok(Some(false));
    1097              :         }
    1098              : 
    1099              :         self.download_remote_layer(remote_layer).await?;
    1100              :         Ok(Some(true))
    1101              :     }
    1102              : 
    1103              :     /// Like [`evict_layer_batch`](Self::evict_layer_batch), but for just one layer.
    1104              :     /// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
    1105           25 :     pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
    1106           25 :         let Some(local_layer) = self.find_layer(layer_file_name).await else {
    1107            0 :             return Ok(None);
    1108              :         };
    1109           25 :         let remote_client = self
    1110           25 :             .remote_client
    1111           25 :             .as_ref()
    1112           25 :             .ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?;
    1113              : 
    1114           25 :         let cancel = CancellationToken::new();
    1115           25 :         let results = self
    1116           25 :             .evict_layer_batch(remote_client, &[local_layer], cancel)
    1117            2 :             .await?;
    1118           25 :         assert_eq!(results.len(), 1);
    1119           25 :         let result: Option<Result<(), EvictionError>> = results.into_iter().next().unwrap();
    1120           25 :         match result {
    1121            0 :             None => anyhow::bail!("task_mgr shutdown requested"),
    1122           25 :             Some(Ok(())) => Ok(Some(true)),
    1123            0 :             Some(Err(e)) => Err(anyhow::Error::new(e)),
    1124              :         }
    1125           25 :     }
    1126              : 
    1127              :     /// Evict a batch of layers.
    1128              :     ///
    1129              :     /// GenericRemoteStorage reference is required as a (witness)[witness_article] for "remote storage is configured."
    1130              :     ///
    1131              :     /// [witness_article]: https://willcrichton.net/rust-api-type-patterns/witnesses.html
    1132           11 :     pub(crate) async fn evict_layers(
    1133           11 :         &self,
    1134           11 :         _: &GenericRemoteStorage,
    1135           11 :         layers_to_evict: &[Arc<dyn PersistentLayer>],
    1136           11 :         cancel: CancellationToken,
    1137           11 :     ) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
    1138           11 :         let remote_client = self.remote_client.clone().expect(
    1139           11 :             "GenericRemoteStorage is configured, so timeline must have RemoteTimelineClient",
    1140           11 :         );
    1141           11 : 
    1142           11 :         self.evict_layer_batch(&remote_client, layers_to_evict, cancel)
    1143            0 :             .await
    1144           11 :     }
    1145              : 
    1146              :     /// Evict multiple layers at once, continuing through errors.
    1147              :     ///
    1148              :     /// Try to evict the given `layers_to_evict` by
    1149              :     ///
    1150              :     /// 1. Replacing the given layer object in the layer map with a corresponding [`RemoteLayer`] object.
    1151              :     /// 2. Deleting the now unreferenced layer file from disk.
    1152              :     ///
    1153              :     /// The `remote_client` should be this timeline's `self.remote_client`.
    1154              :     /// We make the caller provide it so that they are responsible for handling the case
    1155              :     /// where someone wants to evict the layer but no remote storage is configured.
    1156              :     ///
    1157              :     /// Returns either `Err()` or `Ok(results)` where `results.len() == layers_to_evict.len()`.
    1158              :     /// If `Err()` is returned, no eviction was attempted.
    1159              :     /// Each position of `Ok(results)` corresponds to the layer in `layers_to_evict`.
    1160              :     /// Meaning of each `result[i]`:
    1161              :     /// - `Some(Err(...))` if layer replacement failed for an unexpected reason
    1162              :     /// - `Some(Ok(true))` if everything went well.
    1163              :     /// - `Some(Ok(false))` if there was an expected reason why the layer could not be replaced, e.g.:
    1164              :     ///    - evictee was not yet downloaded
    1165              :     ///    - replacement failed for an expectable reason (e.g., layer removed by GC before we grabbed all locks)
    1166              :     /// - `None` if no eviction attempt was made for the layer because `cancel.is_cancelled() == true`.
    1167           67 :     async fn evict_layer_batch(
    1168           67 :         &self,
    1169           67 :         remote_client: &Arc<RemoteTimelineClient>,
    1170           67 :         layers_to_evict: &[Arc<dyn PersistentLayer>],
    1171           67 :         cancel: CancellationToken,
    1172           67 :     ) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
    1173           67 :         // ensure that the layers have finished uploading
    1174           67 :         // (don't hold the layer_removal_cs while we do it, we're not removing anything yet)
    1175           67 :         remote_client
    1176           67 :             .wait_completion()
    1177            2 :             .await
    1178           67 :             .context("wait for layer upload ops to complete")?;
    1179              : 
    1180              :         // now lock out layer removal (compaction, gc, timeline deletion)
    1181           67 :         let layer_removal_guard = self.layer_removal_cs.lock().await;
    1182              : 
    1183              :         {
    1184              :             // to avoid racing with detach and delete_timeline
    1185           67 :             let state = self.current_state();
    1186           67 :             anyhow::ensure!(
    1187           67 :                 state == TimelineState::Active,
    1188            0 :                 "timeline is not active but {state:?}"
    1189              :             );
    1190              :         }
    1191              : 
    1192              :         // start the batch update
    1193           67 :         let mut guard = self.layers.write().await;
    1194           67 :         let mut results = Vec::with_capacity(layers_to_evict.len());
    1195              : 
    1196          210 :         for l in layers_to_evict.iter() {
    1197          210 :             let res = if cancel.is_cancelled() {
    1198            0 :                 None
    1199              :             } else {
    1200          210 :                 Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut guard))
    1201              :             };
    1202          210 :             results.push(res);
    1203              :         }
    1204              : 
    1205              :         // commit the updates & release locks
    1206           67 :         drop_wlock(guard);
    1207           67 :         drop(layer_removal_guard);
    1208           67 : 
    1209           67 :         assert_eq!(results.len(), layers_to_evict.len());
    1210           67 :         Ok(results)
    1211           67 :     }
    1212              : 
    1213          210 :     fn evict_layer_batch_impl(
    1214          210 :         &self,
    1215          210 :         _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
    1216          210 :         local_layer: &Arc<dyn PersistentLayer>,
    1217          210 :         layer_mgr: &mut LayerManager,
    1218          210 :     ) -> Result<(), EvictionError> {
    1219          210 :         if local_layer.is_remote_layer() {
    1220            0 :             return Err(EvictionError::CannotEvictRemoteLayer);
    1221          210 :         }
    1222          210 : 
    1223          210 :         let layer_file_size = local_layer.layer_desc().file_size;
    1224              : 
    1225          210 :         let local_layer_mtime = local_layer
    1226          210 :             .local_path()
    1227          210 :             .expect("local layer should have a local path")
    1228          210 :             .metadata()
    1229          210 :             // when the eviction fails because we have already deleted the layer in compaction for
    1230          210 :             // example, a NotFound error bubbles up from here.
    1231          210 :             .map_err(|e| {
    1232            1 :                 if e.kind() == std::io::ErrorKind::NotFound {
    1233            1 :                     EvictionError::FileNotFound
    1234              :                 } else {
    1235            0 :                     EvictionError::StatFailed(e)
    1236              :                 }
    1237          210 :             })?
    1238          209 :             .modified()
    1239          209 :             .map_err(EvictionError::StatFailed)?;
    1240              : 
    1241          209 :         let local_layer_residence_duration =
    1242          209 :             match SystemTime::now().duration_since(local_layer_mtime) {
    1243            0 :                 Err(e) => {
    1244            0 :                     warn!(layer = %local_layer, "layer mtime is in the future: {}", e);
    1245            0 :                     None
    1246              :                 }
    1247          209 :                 Ok(delta) => Some(delta),
    1248              :             };
    1249              : 
    1250          209 :         let layer_metadata = LayerFileMetadata::new(layer_file_size, self.generation);
    1251              : 
    1252          209 :         let new_remote_layer = Arc::new(match local_layer.filename() {
    1253            6 :             LayerFileName::Image(image_name) => RemoteLayer::new_img(
    1254            6 :                 self.tenant_id,
    1255            6 :                 self.timeline_id,
    1256            6 :                 &image_name,
    1257            6 :                 &layer_metadata,
    1258            6 :                 local_layer
    1259            6 :                     .access_stats()
    1260            6 :                     .clone_for_residence_change(LayerResidenceStatus::Evicted),
    1261            6 :             ),
    1262          203 :             LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
    1263          203 :                 self.tenant_id,
    1264          203 :                 self.timeline_id,
    1265          203 :                 &delta_name,
    1266          203 :                 &layer_metadata,
    1267          203 :                 local_layer
    1268          203 :                     .access_stats()
    1269          203 :                     .clone_for_residence_change(LayerResidenceStatus::Evicted),
    1270          203 :             ),
    1271              :         });
    1272              : 
    1273          209 :         assert_eq!(local_layer.layer_desc(), new_remote_layer.layer_desc());
    1274              : 
    1275          209 :         layer_mgr
    1276          209 :             .replace_and_verify(local_layer.clone(), new_remote_layer)
    1277          209 :             .map_err(EvictionError::LayerNotFound)?;
    1278              : 
    1279          208 :         if let Err(e) = local_layer.delete_resident_layer_file() {
    1280              :             // this should never happen, because of layer_removal_cs usage and above stat
    1281              :             // access for mtime
    1282            0 :             error!("failed to remove layer file on evict after replacement: {e:#?}");
    1283          208 :         }
    1284              :         // Always decrement the physical size gauge, even if we failed to delete the file.
    1285              :         // Rationale: we already replaced the layer with a remote layer in the layer map,
    1286              :         // and any subsequent download_remote_layer will
    1287              :         // 1. overwrite the file on disk and
    1288              :         // 2. add the downloaded size to the resident size gauge.
    1289              :         //
    1290              :         // If there is no re-download, and we restart the pageserver, then load_layer_map
    1291              :         // will treat the file as a local layer again, count it towards resident size,
    1292              :         // and it'll be like the layer removal never happened.
    1293              :         // The bump in resident size is perhaps unexpected but overall a robust behavior.
    1294          208 :         self.metrics
    1295          208 :             .resident_physical_size_gauge
    1296          208 :             .sub(layer_file_size);
    1297          208 : 
    1298          208 :         self.metrics.evictions.inc();
    1299              : 
    1300          208 :         if let Some(delta) = local_layer_residence_duration {
    1301          208 :             self.metrics
    1302          208 :                 .evictions_with_low_residence_duration
    1303          208 :                 .read()
    1304          208 :                 .unwrap()
    1305          208 :                 .observe(delta);
    1306          208 :             info!(layer=%local_layer, residence_millis=delta.as_millis(), "evicted layer after known residence period");
    1307              :         } else {
    1308            0 :             info!(layer=%local_layer, "evicted layer after unknown residence period");
    1309              :         }
    1310              : 
    1311          208 :         Ok(())
    1312          210 :     }
    1313              : }
    1314              : 
    1315            0 : #[derive(Debug, thiserror::Error)]
    1316              : pub(crate) enum EvictionError {
    1317              :     #[error("cannot evict a remote layer")]
    1318              :     CannotEvictRemoteLayer,
    1319              :     /// Most likely the to-be evicted layer has been deleted by compaction or gc which use the same
    1320              :     /// locks, so they got to execute before the eviction.
    1321              :     #[error("file backing the layer has been removed already")]
    1322              :     FileNotFound,
    1323              :     #[error("stat failed")]
    1324              :     StatFailed(#[source] std::io::Error),
    1325              :     /// In practice, this can be a number of things, but lets assume it means only this.
    1326              :     ///
    1327              :     /// This case includes situations such as the Layer was evicted and redownloaded in between,
    1328              :     /// because the file existed before an replacement attempt was made but now the Layers are
    1329              :     /// different objects in memory.
    1330              :     #[error("layer was no longer part of LayerMap")]
    1331              :     LayerNotFound(#[source] anyhow::Error),
    1332              : }
    1333              : 
    1334              : /// Number of times we will compute partition within a checkpoint distance.
    1335              : const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
    1336              : 
    1337              : // Private functions
    1338              : impl Timeline {
    1339      1468302 :     fn get_checkpoint_distance(&self) -> u64 {
    1340      1468302 :         let tenant_conf = self.tenant_conf.read().unwrap();
    1341      1468302 :         tenant_conf
    1342      1468302 :             .checkpoint_distance
    1343      1468302 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
    1344      1468302 :     }
    1345              : 
    1346       728806 :     fn get_checkpoint_timeout(&self) -> Duration {
    1347       728806 :         let tenant_conf = self.tenant_conf.read().unwrap();
    1348       728806 :         tenant_conf
    1349       728806 :             .checkpoint_timeout
    1350       728806 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
    1351       728806 :     }
    1352              : 
    1353         1438 :     fn get_compaction_target_size(&self) -> u64 {
    1354         1438 :         let tenant_conf = self.tenant_conf.read().unwrap();
    1355         1438 :         tenant_conf
    1356         1438 :             .compaction_target_size
    1357         1438 :             .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
    1358         1438 :     }
    1359              : 
    1360         1398 :     fn get_compaction_threshold(&self) -> usize {
    1361         1398 :         let tenant_conf = self.tenant_conf.read().unwrap();
    1362         1398 :         tenant_conf
    1363         1398 :             .compaction_threshold
    1364         1398 :             .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
    1365         1398 :     }
    1366              : 
    1367        16758 :     fn get_image_creation_threshold(&self) -> usize {
    1368        16758 :         let tenant_conf = self.tenant_conf.read().unwrap();
    1369        16758 :         tenant_conf
    1370        16758 :             .image_creation_threshold
    1371        16758 :             .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
    1372        16758 :     }
    1373              : 
    1374         2062 :     fn get_eviction_policy(&self) -> EvictionPolicy {
    1375         2062 :         let tenant_conf = self.tenant_conf.read().unwrap();
    1376         2062 :         tenant_conf
    1377         2062 :             .eviction_policy
    1378         2062 :             .unwrap_or(self.conf.default_tenant_conf.eviction_policy)
    1379         2062 :     }
    1380              : 
    1381         1423 :     fn get_evictions_low_residence_duration_metric_threshold(
    1382         1423 :         tenant_conf: &TenantConfOpt,
    1383         1423 :         default_tenant_conf: &TenantConf,
    1384         1423 :     ) -> Duration {
    1385         1423 :         tenant_conf
    1386         1423 :             .evictions_low_residence_duration_metric_threshold
    1387         1423 :             .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
    1388         1423 :     }
    1389              : 
    1390        12272 :     fn get_gc_feedback(&self) -> bool {
    1391        12272 :         let tenant_conf = self.tenant_conf.read().unwrap();
    1392        12272 :         tenant_conf
    1393        12272 :             .gc_feedback
    1394        12272 :             .unwrap_or(self.conf.default_tenant_conf.gc_feedback)
    1395        12272 :     }
    1396              : 
    1397           29 :     pub(super) fn tenant_conf_updated(&self) {
    1398           29 :         // NB: Most tenant conf options are read by background loops, so,
    1399           29 :         // changes will automatically be picked up.
    1400           29 : 
    1401           29 :         // The threshold is embedded in the metric. So, we need to update it.
    1402           29 :         {
    1403           29 :             let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
    1404           29 :                 &self.tenant_conf.read().unwrap(),
    1405           29 :                 &self.conf.default_tenant_conf,
    1406           29 :             );
    1407           29 :             let tenant_id_str = self.tenant_id.to_string();
    1408           29 :             let timeline_id_str = self.timeline_id.to_string();
    1409           29 :             self.metrics
    1410           29 :                 .evictions_with_low_residence_duration
    1411           29 :                 .write()
    1412           29 :                 .unwrap()
    1413           29 :                 .change_threshold(&tenant_id_str, &timeline_id_str, new_threshold);
    1414           29 :         }
    1415           29 :     }
    1416              : 
    1417              :     /// Open a Timeline handle.
    1418              :     ///
    1419              :     /// Loads the metadata for the timeline into memory, but not the layer map.
    1420              :     #[allow(clippy::too_many_arguments)]
    1421         1394 :     pub(super) fn new(
    1422         1394 :         conf: &'static PageServerConf,
    1423         1394 :         tenant_conf: Arc<RwLock<TenantConfOpt>>,
    1424         1394 :         metadata: &TimelineMetadata,
    1425         1394 :         ancestor: Option<Arc<Timeline>>,
    1426         1394 :         timeline_id: TimelineId,
    1427         1394 :         tenant_id: TenantId,
    1428         1394 :         generation: Generation,
    1429         1394 :         walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
    1430         1394 :         resources: TimelineResources,
    1431         1394 :         pg_version: u32,
    1432         1394 :         initial_logical_size_can_start: Option<completion::Barrier>,
    1433         1394 :         initial_logical_size_attempt: Option<completion::Completion>,
    1434         1394 :         state: TimelineState,
    1435         1394 :     ) -> Arc<Self> {
    1436         1394 :         let disk_consistent_lsn = metadata.disk_consistent_lsn();
    1437         1394 :         let (state, _) = watch::channel(state);
    1438         1394 : 
    1439         1394 :         let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
    1440         1394 :         let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
    1441         1394 : 
    1442         1394 :         let tenant_conf_guard = tenant_conf.read().unwrap();
    1443         1394 : 
    1444         1394 :         let evictions_low_residence_duration_metric_threshold =
    1445         1394 :             Self::get_evictions_low_residence_duration_metric_threshold(
    1446         1394 :                 &tenant_conf_guard,
    1447         1394 :                 &conf.default_tenant_conf,
    1448         1394 :             );
    1449         1394 :         drop(tenant_conf_guard);
    1450         1394 : 
    1451         1394 :         Arc::new_cyclic(|myself| {
    1452         1394 :             let mut result = Timeline {
    1453         1394 :                 conf,
    1454         1394 :                 tenant_conf,
    1455         1394 :                 myself: myself.clone(),
    1456         1394 :                 timeline_id,
    1457         1394 :                 tenant_id,
    1458         1394 :                 generation,
    1459         1394 :                 pg_version,
    1460         1394 :                 layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())),
    1461         1394 :                 wanted_image_layers: Mutex::new(None),
    1462         1394 : 
    1463         1394 :                 walredo_mgr,
    1464         1394 :                 walreceiver: Mutex::new(None),
    1465         1394 : 
    1466         1394 :                 remote_client: resources.remote_client.map(Arc::new),
    1467         1394 : 
    1468         1394 :                 // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
    1469         1394 :                 last_record_lsn: SeqWait::new(RecordLsn {
    1470         1394 :                     last: disk_consistent_lsn,
    1471         1394 :                     prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
    1472         1394 :                 }),
    1473         1394 :                 disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
    1474         1394 : 
    1475         1394 :                 last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
    1476         1394 :                 last_freeze_ts: RwLock::new(Instant::now()),
    1477         1394 : 
    1478         1394 :                 loaded_at: (disk_consistent_lsn, SystemTime::now()),
    1479         1394 : 
    1480         1394 :                 ancestor_timeline: ancestor,
    1481         1394 :                 ancestor_lsn: metadata.ancestor_lsn(),
    1482         1394 : 
    1483         1394 :                 metrics: TimelineMetrics::new(
    1484         1394 :                     &tenant_id,
    1485         1394 :                     &timeline_id,
    1486         1394 :                     crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
    1487         1394 :                         "mtime",
    1488         1394 :                         evictions_low_residence_duration_metric_threshold,
    1489         1394 :                     ),
    1490         1394 :                 ),
    1491         1394 : 
    1492         1394 :                 flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
    1493         1394 : 
    1494         1394 :                 layer_flush_start_tx,
    1495         1394 :                 layer_flush_done_tx,
    1496         1394 : 
    1497         1394 :                 write_lock: tokio::sync::Mutex::new(()),
    1498         1394 :                 layer_removal_cs: Default::default(),
    1499         1394 : 
    1500         1394 :                 gc_info: std::sync::RwLock::new(GcInfo {
    1501         1394 :                     retain_lsns: Vec::new(),
    1502         1394 :                     horizon_cutoff: Lsn(0),
    1503         1394 :                     pitr_cutoff: Lsn(0),
    1504         1394 :                 }),
    1505         1394 : 
    1506         1394 :                 latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
    1507         1394 :                 initdb_lsn: metadata.initdb_lsn(),
    1508         1394 : 
    1509         1394 :                 current_logical_size: if disk_consistent_lsn.is_valid() {
    1510              :                     // we're creating timeline data with some layer files existing locally,
    1511              :                     // need to recalculate timeline's logical size based on data in the layers.
    1512          710 :                     LogicalSize::deferred_initial(disk_consistent_lsn)
    1513              :                 } else {
    1514              :                     // we're creating timeline data without any layers existing locally,
    1515              :                     // initial logical size is 0.
    1516          684 :                     LogicalSize::empty_initial()
    1517              :                 },
    1518         1394 :                 partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
    1519         1394 :                 repartition_threshold: 0,
    1520         1394 : 
    1521         1394 :                 last_received_wal: Mutex::new(None),
    1522         1394 :                 rel_size_cache: RwLock::new(HashMap::new()),
    1523         1394 : 
    1524         1394 :                 download_all_remote_layers_task_info: RwLock::new(None),
    1525         1394 : 
    1526         1394 :                 state,
    1527         1394 : 
    1528         1394 :                 eviction_task_timeline_state: tokio::sync::Mutex::new(
    1529         1394 :                     EvictionTaskTimelineState::default(),
    1530         1394 :                 ),
    1531         1394 :                 delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
    1532         1394 : 
    1533         1394 :                 initial_logical_size_can_start,
    1534         1394 :                 initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt),
    1535         1394 :             };
    1536         1394 :             result.repartition_threshold =
    1537         1394 :                 result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
    1538         1394 :             result
    1539         1394 :                 .metrics
    1540         1394 :                 .last_record_gauge
    1541         1394 :                 .set(disk_consistent_lsn.0 as i64);
    1542         1394 :             result
    1543         1394 :         })
    1544         1394 :     }
    1545              : 
    1546         2042 :     pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
    1547         2042 :         let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
    1548         2042 :         match *flush_loop_state {
    1549         1364 :             FlushLoopState::NotStarted => (),
    1550              :             FlushLoopState::Running { .. } => {
    1551          678 :                 info!(
    1552          678 :                     "skipping attempt to start flush_loop twice {}/{}",
    1553          678 :                     self.tenant_id, self.timeline_id
    1554          678 :                 );
    1555          678 :                 return;
    1556              :             }
    1557              :             FlushLoopState::Exited => {
    1558            0 :                 warn!(
    1559            0 :                     "ignoring attempt to restart exited flush_loop {}/{}",
    1560            0 :                     self.tenant_id, self.timeline_id
    1561            0 :                 );
    1562            0 :                 return;
    1563              :             }
    1564              :         }
    1565              : 
    1566         1364 :         let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
    1567         1364 :         let self_clone = Arc::clone(self);
    1568         1364 : 
    1569         1364 :         debug!("spawning flush loop");
    1570         1364 :         *flush_loop_state = FlushLoopState::Running {
    1571         1364 :             #[cfg(test)]
    1572         1364 :             expect_initdb_optimization: false,
    1573         1364 :             #[cfg(test)]
    1574         1364 :             initdb_optimization_count: 0,
    1575         1364 :         };
    1576         1364 :         task_mgr::spawn(
    1577         1364 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    1578         1364 :             task_mgr::TaskKind::LayerFlushTask,
    1579         1364 :             Some(self.tenant_id),
    1580         1364 :             Some(self.timeline_id),
    1581         1364 :             "layer flush task",
    1582              :             false,
    1583         1364 :             async move {
    1584         1364 :                 let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
    1585        23419 :                 self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
    1586          527 :                 let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
    1587          527 :                 assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
    1588          527 :                 *flush_loop_state  = FlushLoopState::Exited;
    1589          527 :                 Ok(())
    1590          527 :             }
    1591         1364 :             .instrument(info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))
    1592              :         );
    1593         2042 :     }
    1594              : 
    1595              :     /// Creates and starts the wal receiver.
    1596              :     ///
    1597              :     /// This function is expected to be called at most once per Timeline's lifecycle
    1598              :     /// when the timeline is activated.
    1599         1190 :     fn launch_wal_receiver(
    1600         1190 :         self: &Arc<Self>,
    1601         1190 :         ctx: &RequestContext,
    1602         1190 :         broker_client: BrokerClientChannel,
    1603         1190 :     ) {
    1604         1190 :         info!(
    1605         1190 :             "launching WAL receiver for timeline {} of tenant {}",
    1606         1190 :             self.timeline_id, self.tenant_id
    1607         1190 :         );
    1608              : 
    1609         1190 :         let tenant_conf_guard = self.tenant_conf.read().unwrap();
    1610         1190 :         let wal_connect_timeout = tenant_conf_guard
    1611         1190 :             .walreceiver_connect_timeout
    1612         1190 :             .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
    1613         1190 :         let lagging_wal_timeout = tenant_conf_guard
    1614         1190 :             .lagging_wal_timeout
    1615         1190 :             .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
    1616         1190 :         let max_lsn_wal_lag = tenant_conf_guard
    1617         1190 :             .max_lsn_wal_lag
    1618         1190 :             .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
    1619         1190 :         drop(tenant_conf_guard);
    1620         1190 : 
    1621         1190 :         let mut guard = self.walreceiver.lock().unwrap();
    1622         1190 :         assert!(
    1623         1190 :             guard.is_none(),
    1624            0 :             "multiple launches / re-launches of WAL receiver are not supported"
    1625              :         );
    1626         1190 :         *guard = Some(WalReceiver::start(
    1627         1190 :             Arc::clone(self),
    1628         1190 :             WalReceiverConf {
    1629         1190 :                 wal_connect_timeout,
    1630         1190 :                 lagging_wal_timeout,
    1631         1190 :                 max_lsn_wal_lag,
    1632         1190 :                 auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
    1633         1190 :                 availability_zone: self.conf.availability_zone.clone(),
    1634         1190 :             },
    1635         1190 :             broker_client,
    1636         1190 :             ctx,
    1637         1190 :         ));
    1638         1190 :     }
    1639              : 
    1640              :     /// Initialize with an empty layer map. Used when creating a new timeline.
    1641         1042 :     pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
    1642         1042 :         let mut layers = self.layers.try_write().expect(
    1643         1042 :             "in the context where we call this function, no other task has access to the object",
    1644         1042 :         );
    1645         1042 :         layers.initialize_empty(Lsn(start_lsn.0));
    1646         1042 :     }
    1647              : 
    1648              :     /// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
    1649              :     /// files.
    1650          326 :     pub(super) async fn load_layer_map(
    1651          326 :         &self,
    1652          326 :         disk_consistent_lsn: Lsn,
    1653          326 :         index_part: Option<IndexPart>,
    1654          326 :     ) -> anyhow::Result<()> {
    1655              :         use init::{Decision::*, Discovered, FutureLayer};
    1656              :         use LayerFileName::*;
    1657              : 
    1658          326 :         let mut guard = self.layers.write().await;
    1659              : 
    1660          326 :         let timer = self.metrics.load_layer_map_histo.start_timer();
    1661          326 : 
    1662          326 :         // Scan timeline directory and create ImageFileName and DeltaFilename
    1663          326 :         // structs representing all files on disk
    1664          326 :         let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id);
    1665          326 :         let (conf, tenant_id, timeline_id) = (self.conf, self.tenant_id, self.timeline_id);
    1666          326 :         let span = tracing::Span::current();
    1667          326 : 
    1668          326 :         // Copy to move into the task we're about to spawn
    1669          326 :         let generation = self.generation;
    1670              : 
    1671          326 :         let (loaded_layers, to_sync, total_physical_size) = tokio::task::spawn_blocking({
    1672          326 :             move || {
    1673          326 :                 let _g = span.entered();
    1674          326 :                 let discovered = init::scan_timeline_dir(&timeline_path)?;
    1675          326 :                 let mut discovered_layers = Vec::with_capacity(discovered.len());
    1676          326 :                 let mut unrecognized_files = Vec::new();
    1677          326 : 
    1678          326 :                 let mut path = timeline_path;
    1679              : 
    1680         4590 :                 for discovered in discovered {
    1681         4264 :                     let (name, kind) = match discovered {
    1682         3850 :                         Discovered::Layer(file_name, file_size) => {
    1683         3850 :                             discovered_layers.push((file_name, file_size));
    1684         3850 :                             continue;
    1685              :                         }
    1686              :                         Discovered::Metadata | Discovered::IgnoredBackup => {
    1687          326 :                             continue;
    1688              :                         }
    1689            0 :                         Discovered::Unknown(file_name) => {
    1690            0 :                             // we will later error if there are any
    1691            0 :                             unrecognized_files.push(file_name);
    1692            0 :                             continue;
    1693              :                         }
    1694           83 :                         Discovered::Ephemeral(name) => (name, "old ephemeral file"),
    1695            5 :                         Discovered::Temporary(name) => (name, "temporary timeline file"),
    1696            0 :                         Discovered::TemporaryDownload(name) => (name, "temporary download"),
    1697              :                     };
    1698           88 :                     path.push(name);
    1699           88 :                     init::cleanup(&path, kind)?;
    1700           88 :                     path.pop();
    1701              :                 }
    1702              : 
    1703          326 :                 if !unrecognized_files.is_empty() {
    1704              :                     // assume that if there are any there are many many.
    1705            0 :                     let n = unrecognized_files.len();
    1706            0 :                     let first = &unrecognized_files[..n.min(10)];
    1707            0 :                     anyhow::bail!(
    1708            0 :                         "unrecognized files in timeline dir (total {n}), first 10: {first:?}"
    1709            0 :                     );
    1710          326 :                 }
    1711          326 : 
    1712          326 :                 let decided = init::reconcile(
    1713          326 :                     discovered_layers,
    1714          326 :                     index_part.as_ref(),
    1715          326 :                     disk_consistent_lsn,
    1716          326 :                     generation,
    1717          326 :                 );
    1718          326 : 
    1719          326 :                 let mut loaded_layers = Vec::new();
    1720          326 :                 let mut needs_upload = Vec::new();
    1721          326 :                 let mut needs_cleanup = Vec::new();
    1722          326 :                 let mut total_physical_size = 0;
    1723              : 
    1724         6667 :                 for (name, decision) in decided {
    1725         6340 :                     let decision = match decision {
    1726            1 :                         Ok(UseRemote { local, remote }) => {
    1727            1 :                             path.push(name.file_name());
    1728            1 :                             init::cleanup_local_file_for_remote(&path, &local, &remote)?;
    1729            1 :                             path.pop();
    1730            1 : 
    1731            1 :                             UseRemote { local, remote }
    1732              :                         }
    1733         6339 :                         Ok(decision) => decision,
    1734            1 :                         Err(FutureLayer { local }) => {
    1735            1 :                             if local.is_some() {
    1736            1 :                                 path.push(name.file_name());
    1737            1 :                                 init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
    1738            1 :                                 path.pop();
    1739            0 :                             }
    1740            1 :                             needs_cleanup.push(name);
    1741            1 :                             continue;
    1742              :                         }
    1743              :                     };
    1744              : 
    1745         6340 :                     match &name {
    1746         5374 :                         Delta(d) => assert!(d.lsn_range.end <= disk_consistent_lsn + 1),
    1747          966 :                         Image(i) => assert!(i.lsn <= disk_consistent_lsn),
    1748              :                     }
    1749              : 
    1750         6340 :                     let status = match &decision {
    1751         3848 :                         UseLocal(_) | NeedsUpload(_) => LayerResidenceStatus::Resident,
    1752         2492 :                         Evicted(_) | UseRemote { .. } => LayerResidenceStatus::Evicted,
    1753              :                     };
    1754              : 
    1755         6340 :                     let stats = LayerAccessStats::for_loading_layer(status);
    1756              : 
    1757         6340 :                     let layer: Arc<dyn PersistentLayer> = match (name, &decision) {
    1758          834 :                         (Delta(d), UseLocal(m) | NeedsUpload(m)) => {
    1759         3830 :                             total_physical_size += m.file_size();
    1760         3830 :                             Arc::new(DeltaLayer::new(
    1761         3830 :                                 conf,
    1762         3830 :                                 timeline_id,
    1763         3830 :                                 tenant_id,
    1764         3830 :                                 &d,
    1765         3830 :                                 m.file_size(),
    1766         3830 :                                 stats,
    1767         3830 :                             ))
    1768              :                         }
    1769           16 :                         (Image(i), UseLocal(m) | NeedsUpload(m)) => {
    1770           18 :                             total_physical_size += m.file_size();
    1771           18 :                             Arc::new(ImageLayer::new(
    1772           18 :                                 conf,
    1773           18 :                                 timeline_id,
    1774           18 :                                 tenant_id,
    1775           18 :                                 &i,
    1776           18 :                                 m.file_size(),
    1777           18 :                                 stats,
    1778           18 :                             ))
    1779              :                         }
    1780         1544 :                         (Delta(d), Evicted(remote) | UseRemote { remote, .. }) => Arc::new(
    1781         1544 :                             RemoteLayer::new_delta(tenant_id, timeline_id, &d, remote, stats),
    1782         1544 :                         ),
    1783          948 :                         (Image(i), Evicted(remote) | UseRemote { remote, .. }) => Arc::new(
    1784          948 :                             RemoteLayer::new_img(tenant_id, timeline_id, &i, remote, stats),
    1785          948 :                         ),
    1786              :                     };
    1787              : 
    1788         6340 :                     if let NeedsUpload(m) = decision {
    1789          850 :                         needs_upload.push((layer.clone(), m));
    1790         5490 :                     }
    1791              : 
    1792         6340 :                     loaded_layers.push(layer);
    1793              :                 }
    1794          326 :                 Ok((
    1795          326 :                     loaded_layers,
    1796          326 :                     (needs_upload, needs_cleanup),
    1797          326 :                     total_physical_size,
    1798          326 :                 ))
    1799          326 :             }
    1800          326 :         })
    1801          324 :         .await
    1802          326 :         .map_err(anyhow::Error::new)
    1803          326 :         .and_then(|x| x)?;
    1804              : 
    1805          326 :         let num_layers = loaded_layers.len();
    1806          326 : 
    1807          326 :         guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
    1808              : 
    1809          326 :         if let Some(rtc) = self.remote_client.as_ref() {
    1810          179 :             let (needs_upload, needs_cleanup) = to_sync;
    1811          565 :             for (layer, m) in needs_upload {
    1812          386 :                 rtc.schedule_layer_file_upload(&layer.layer_desc().filename(), &m)?;
    1813              :             }
    1814          179 :             rtc.schedule_layer_file_deletion(&needs_cleanup)?;
    1815          179 :             rtc.schedule_index_upload_for_file_changes()?;
    1816              :             // Tenant::create_timeline will wait for these uploads to happen before returning, or
    1817              :             // on retry.
    1818          147 :         }
    1819              : 
    1820          326 :         info!(
    1821          326 :             "loaded layer map with {} layers at {}, total physical size: {}",
    1822          326 :             num_layers, disk_consistent_lsn, total_physical_size
    1823          326 :         );
    1824          326 :         self.metrics
    1825          326 :             .resident_physical_size_gauge
    1826          326 :             .set(total_physical_size);
    1827          326 : 
    1828          326 :         timer.stop_and_record();
    1829          326 :         Ok(())
    1830          326 :     }
    1831              : 
    1832          562 :     fn try_spawn_size_init_task(self: &Arc<Self>, lsn: Lsn, ctx: &RequestContext) {
    1833          562 :         let state = self.current_state();
    1834          545 :         if matches!(
    1835          562 :             state,
    1836              :             TimelineState::Broken { .. } | TimelineState::Stopping
    1837              :         ) {
    1838              :             // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
    1839           17 :             return;
    1840          545 :         }
    1841              : 
    1842          545 :         let permit = match Arc::clone(&self.current_logical_size.initial_size_computation)
    1843          545 :             .try_acquire_owned()
    1844              :         {
    1845          370 :             Ok(permit) => permit,
    1846              :             Err(TryAcquireError::NoPermits) => {
    1847              :                 // computation already ongoing or finished with success
    1848          175 :                 return;
    1849              :             }
    1850            0 :             Err(TryAcquireError::Closed) => unreachable!("we never call close"),
    1851              :         };
    1852          370 :         debug_assert!(self
    1853          370 :             .current_logical_size
    1854          370 :             .initial_logical_size
    1855          370 :             .get()
    1856          370 :             .is_none());
    1857              : 
    1858          370 :         info!(
    1859          370 :             "spawning logical size computation from context of task kind {:?}",
    1860          370 :             ctx.task_kind()
    1861          370 :         );
    1862              :         // We need to start the computation task.
    1863              :         // It gets a separate context since it will outlive the request that called this function.
    1864          370 :         let self_clone = Arc::clone(self);
    1865          370 :         let background_ctx = ctx.detached_child(
    1866          370 :             TaskKind::InitialLogicalSizeCalculation,
    1867          370 :             DownloadBehavior::Download,
    1868          370 :         );
    1869          370 :         task_mgr::spawn(
    1870          370 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    1871          370 :             task_mgr::TaskKind::InitialLogicalSizeCalculation,
    1872          370 :             Some(self.tenant_id),
    1873          370 :             Some(self.timeline_id),
    1874          370 :             "initial size calculation",
    1875          370 :             false,
    1876          370 :             // NB: don't log errors here, task_mgr will do that.
    1877          370 :             async move {
    1878          370 : 
    1879          370 :                 let cancel = task_mgr::shutdown_token();
    1880              : 
    1881              :                 // in case we were created during pageserver initialization, wait for
    1882              :                 // initialization to complete before proceeding. startup time init runs on the same
    1883              :                 // runtime.
    1884          370 :                 tokio::select! {
    1885          371 :                     _ = cancel.cancelled() => { return Ok(()); },
    1886          371 :                     _ = completion::Barrier::maybe_wait(self_clone.initial_logical_size_can_start.clone()) => {}
    1887          371 :                 };
    1888              : 
    1889              :                 // hold off background tasks from starting until all timelines get to try at least
    1890              :                 // once initial logical size calculation; though retry will rarely be useful.
    1891              :                 // holding off is done because heavier tasks execute blockingly on the same
    1892              :                 // runtime.
    1893              :                 //
    1894              :                 // dropping this at every outcome is probably better than trying to cling on to it,
    1895              :                 // delay will be terminated by a timeout regardless.
    1896          370 :                 let _completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() };
    1897          370 : 
    1898          370 :                 // no extra cancellation here, because nothing really waits for this to complete compared
    1899          370 :                 // to spawn_ondemand_logical_size_calculation.
    1900          370 :                 let cancel = CancellationToken::new();
    1901              : 
    1902          370 :                 let calculated_size = match self_clone
    1903          370 :                     .logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel)
    1904       139210 :                     .await
    1905              :                 {
    1906          361 :                     Ok(s) => s,
    1907              :                     Err(CalculateLogicalSizeError::Cancelled) => {
    1908              :                         // Don't make noise, this is a common task.
    1909              :                         // In the unlikely case that there is another call to this function, we'll retry
    1910              :                         // because initial_logical_size is still None.
    1911            1 :                         info!("initial size calculation cancelled, likely timeline delete / tenant detach");
    1912            1 :                         return Ok(());
    1913              :                     }
    1914            4 :                     Err(CalculateLogicalSizeError::Other(err)) => {
    1915            1 :                         if let Some(e @ PageReconstructError::AncestorStopping(_)) =
    1916            4 :                             err.root_cause().downcast_ref()
    1917              :                         {
    1918              :                             // This can happen if the timeline parent timeline switches to
    1919              :                             // Stopping state while we're still calculating the initial
    1920              :                             // timeline size for the child, for example if the tenant is
    1921              :                             // being detached or the pageserver is shut down. Like with
    1922              :                             // CalculateLogicalSizeError::Cancelled, don't make noise.
    1923            1 :                             info!("initial size calculation failed because the timeline or its ancestor is Stopping, likely because the tenant is being detached: {e:#}");
    1924            1 :                             return Ok(());
    1925            3 :                         }
    1926            3 :                         return Err(err.context("Failed to calculate logical size"));
    1927              :                     }
    1928              :                 };
    1929              : 
    1930              :                 // we cannot query current_logical_size.current_size() to know the current
    1931              :                 // *negative* value, only truncated to u64.
    1932          361 :                 let added = self_clone
    1933          361 :                     .current_logical_size
    1934          361 :                     .size_added_after_initial
    1935          361 :                     .load(AtomicOrdering::Relaxed);
    1936          361 : 
    1937          361 :                 let sum = calculated_size.saturating_add_signed(added);
    1938          361 : 
    1939          361 :                 // set the gauge value before it can be set in `update_current_logical_size`.
    1940          361 :                 self_clone.metrics.current_logical_size_gauge.set(sum);
    1941          361 : 
    1942          361 :                 match self_clone
    1943          361 :                     .current_logical_size
    1944          361 :                     .initial_logical_size
    1945          361 :                     .set(calculated_size)
    1946              :                 {
    1947          361 :                     Ok(()) => (),
    1948            0 :                     Err(_what_we_just_attempted_to_set) => {
    1949            0 :                         let existing_size = self_clone
    1950            0 :                             .current_logical_size
    1951            0 :                             .initial_logical_size
    1952            0 :                             .get()
    1953            0 :                             .expect("once_cell set was lost, then get failed, impossible.");
    1954            0 :                         // This shouldn't happen because the semaphore is initialized with 1.
    1955            0 :                         // But if it happens, just complain & report success so there are no further retries.
    1956            0 :                         error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing")
    1957              :                     }
    1958              :                 }
    1959              :                 // now that `initial_logical_size.is_some()`, reduce permit count to 0
    1960              :                 // so that we prevent future callers from spawning this task
    1961          361 :                 permit.forget();
    1962          361 :                 Ok(())
    1963          370 :             }.in_current_span(),
    1964          370 :         );
    1965          562 :     }
    1966              : 
    1967           32 :     pub fn spawn_ondemand_logical_size_calculation(
    1968           32 :         self: &Arc<Self>,
    1969           32 :         lsn: Lsn,
    1970           32 :         cause: LogicalSizeCalculationCause,
    1971           32 :         ctx: RequestContext,
    1972           32 :         cancel: CancellationToken,
    1973           32 :     ) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
    1974           32 :         let (sender, receiver) = oneshot::channel();
    1975           32 :         let self_clone = Arc::clone(self);
    1976           32 :         // XXX if our caller loses interest, i.e., ctx is cancelled,
    1977           32 :         // we should stop the size calculation work and return an error.
    1978           32 :         // That would require restructuring this function's API to
    1979           32 :         // return the result directly, instead of a Receiver for the result.
    1980           32 :         let ctx = ctx.detached_child(
    1981           32 :             TaskKind::OndemandLogicalSizeCalculation,
    1982           32 :             DownloadBehavior::Download,
    1983           32 :         );
    1984           32 :         task_mgr::spawn(
    1985           32 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    1986           32 :             task_mgr::TaskKind::OndemandLogicalSizeCalculation,
    1987           32 :             Some(self.tenant_id),
    1988           32 :             Some(self.timeline_id),
    1989           32 :             "ondemand logical size calculation",
    1990           32 :             false,
    1991           32 :             async move {
    1992           32 :                 let res = self_clone
    1993           32 :                     .logical_size_calculation_task(lsn, cause, &ctx, cancel)
    1994         1349 :                     .await;
    1995           32 :                 let _ = sender.send(res).ok();
    1996           32 :                 Ok(()) // Receiver is responsible for handling errors
    1997           32 :             }
    1998           32 :             .in_current_span(),
    1999           32 :         );
    2000           32 :         receiver
    2001           32 :     }
    2002              : 
    2003         1608 :     #[instrument(skip_all)]
    2004              :     async fn logical_size_calculation_task(
    2005              :         self: &Arc<Self>,
    2006              :         lsn: Lsn,
    2007              :         cause: LogicalSizeCalculationCause,
    2008              :         ctx: &RequestContext,
    2009              :         cancel: CancellationToken,
    2010              :     ) -> Result<u64, CalculateLogicalSizeError> {
    2011              :         span::debug_assert_current_span_has_tenant_and_timeline_id();
    2012              : 
    2013              :         let mut timeline_state_updates = self.subscribe_for_state_updates();
    2014              :         let self_calculation = Arc::clone(self);
    2015              : 
    2016          402 :         let mut calculation = pin!(async {
    2017          402 :             let cancel = cancel.child_token();
    2018          402 :             let ctx = ctx.attached_child();
    2019          402 :             self_calculation
    2020          402 :                 .calculate_logical_size(lsn, cause, cancel, &ctx)
    2021       140559 :                 .await
    2022          398 :         });
    2023          400 :         let timeline_state_cancellation = async {
    2024              :             loop {
    2025       140297 :                 match timeline_state_updates.changed().await {
    2026              :                     Ok(()) => {
    2027            0 :                         let new_state = timeline_state_updates.borrow().clone();
    2028            0 :                         match new_state {
    2029              :                             // we're running this job for active timelines only
    2030            0 :                             TimelineState::Active => continue,
    2031              :                             TimelineState::Broken { .. }
    2032              :                             | TimelineState::Stopping
    2033              :                             | TimelineState::Loading => {
    2034            0 :                                 break format!("aborted because timeline became inactive (new state: {new_state:?})")
    2035              :                             }
    2036              :                         }
    2037              :                     }
    2038            0 :                     Err(_sender_dropped_error) => {
    2039            0 :                         // can't happen, the sender is not dropped as long as the Timeline exists
    2040            0 :                         break "aborted because state watch was dropped".to_string();
    2041              :                     }
    2042              :                 }
    2043              :             }
    2044            0 :         };
    2045              : 
    2046          401 :         let taskmgr_shutdown_cancellation = async {
    2047       140425 :             task_mgr::shutdown_watcher().await;
    2048            1 :             "aborted because task_mgr shutdown requested".to_string()
    2049            1 :         };
    2050              : 
    2051              :         loop {
    2052       140961 :             tokio::select! {
    2053          397 :                 res = &mut calculation => { return res }
    2054            0 :                 reason = timeline_state_cancellation => {
    2055            0 :                     debug!(reason = reason, "cancelling calculation");
    2056              :                     cancel.cancel();
    2057              :                     return calculation.await;
    2058              :                 }
    2059            1 :                 reason = taskmgr_shutdown_cancellation => {
    2060            0 :                     debug!(reason = reason, "cancelling calculation");
    2061              :                     cancel.cancel();
    2062              :                     return calculation.await;
    2063              :                 }
    2064              :             }
    2065              :         }
    2066              :     }
    2067              : 
    2068              :     /// Calculate the logical size of the database at the latest LSN.
    2069              :     ///
    2070              :     /// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
    2071              :     /// especially if we need to download remote layers.
    2072          408 :     pub async fn calculate_logical_size(
    2073          408 :         &self,
    2074          408 :         up_to_lsn: Lsn,
    2075          408 :         cause: LogicalSizeCalculationCause,
    2076          408 :         cancel: CancellationToken,
    2077          408 :         ctx: &RequestContext,
    2078          408 :     ) -> Result<u64, CalculateLogicalSizeError> {
    2079          408 :         info!(
    2080          408 :             "Calculating logical size for timeline {} at {}",
    2081          408 :             self.timeline_id, up_to_lsn
    2082          408 :         );
    2083              :         // These failpoints are used by python tests to ensure that we don't delete
    2084              :         // the timeline while the logical size computation is ongoing.
    2085              :         // The first failpoint is used to make this function pause.
    2086              :         // Then the python test initiates timeline delete operation in a thread.
    2087              :         // It waits for a few seconds, then arms the second failpoint and disables
    2088              :         // the first failpoint. The second failpoint prints an error if the timeline
    2089              :         // delete code has deleted the on-disk state while we're still running here.
    2090              :         // It shouldn't do that. If it does it anyway, the error will be caught
    2091              :         // by the test suite, highlighting the problem.
    2092          408 :         fail::fail_point!("timeline-calculate-logical-size-pause");
    2093          408 :         fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
    2094            2 :             if !self
    2095            2 :                 .conf
    2096            2 :                 .metadata_path(&self.tenant_id, &self.timeline_id)
    2097            2 :                 .exists()
    2098              :             {
    2099            0 :                 error!("timeline-calculate-logical-size-pre metadata file does not exist")
    2100            2 :             }
    2101              :             // need to return something
    2102            2 :             Ok(0)
    2103          408 :         });
    2104              :         // See if we've already done the work for initial size calculation.
    2105              :         // This is a short-cut for timelines that are mostly unused.
    2106          406 :         if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
    2107            4 :             return Ok(size);
    2108          402 :         }
    2109          402 :         let storage_time_metrics = match cause {
    2110              :             LogicalSizeCalculationCause::Initial
    2111              :             | LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
    2112          390 :             | LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
    2113              :             LogicalSizeCalculationCause::EvictionTaskImitation => {
    2114           12 :                 &self.metrics.imitate_logical_size_histo
    2115              :             }
    2116              :         };
    2117          402 :         let timer = storage_time_metrics.start_timer();
    2118          402 :         let logical_size = self
    2119          402 :             .get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx)
    2120       140934 :             .await?;
    2121            0 :         debug!("calculated logical size: {logical_size}");
    2122          393 :         timer.stop_and_record();
    2123          393 :         Ok(logical_size)
    2124          404 :     }
    2125              : 
    2126              :     /// Update current logical size, adding `delta' to the old value.
    2127      1094354 :     fn update_current_logical_size(&self, delta: i64) {
    2128      1094354 :         let logical_size = &self.current_logical_size;
    2129      1094354 :         logical_size.increment_size(delta);
    2130      1094354 : 
    2131      1094354 :         // Also set the value in the prometheus gauge. Note that
    2132      1094354 :         // there is a race condition here: if this is is called by two
    2133      1094354 :         // threads concurrently, the prometheus gauge might be set to
    2134      1094354 :         // one value while current_logical_size is set to the
    2135      1094354 :         // other.
    2136      1094354 :         match logical_size.current_size() {
    2137      1093225 :             Ok(CurrentLogicalSize::Exact(new_current_size)) => self
    2138      1093225 :                 .metrics
    2139      1093225 :                 .current_logical_size_gauge
    2140      1093225 :                 .set(new_current_size),
    2141         1129 :             Ok(CurrentLogicalSize::Approximate(_)) => {
    2142         1129 :                 // don't update the gauge yet, this allows us not to update the gauge back and
    2143         1129 :                 // forth between the initial size calculation task.
    2144         1129 :             }
    2145              :             // this is overflow
    2146            0 :             Err(e) => error!("Failed to compute current logical size for metrics update: {e:?}"),
    2147              :         }
    2148      1094354 :     }
    2149              : 
    2150           31 :     async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
    2151           31 :         let guard = self.layers.read().await;
    2152           83 :         for historic_layer in guard.layer_map().iter_historic_layers() {
    2153           83 :             let historic_layer_name = historic_layer.filename().file_name();
    2154           83 :             if layer_file_name == historic_layer_name {
    2155           31 :                 return Some(guard.get_from_desc(&historic_layer));
    2156           52 :             }
    2157              :         }
    2158              : 
    2159            0 :         None
    2160           31 :     }
    2161              : }
    2162              : 
    2163              : type TraversalId = String;
    2164              : 
    2165              : trait TraversalLayerExt {
    2166              :     fn traversal_id(&self) -> TraversalId;
    2167              : }
    2168              : 
    2169              : impl TraversalLayerExt for Arc<dyn PersistentLayer> {
    2170          995 :     fn traversal_id(&self) -> TraversalId {
    2171          995 :         let timeline_id = self.layer_desc().timeline_id;
    2172          995 :         match self.local_path() {
    2173            2 :             Some(local_path) => {
    2174            2 :                 debug_assert!(local_path.to_str().unwrap().contains(&format!("{}", timeline_id)),
    2175            0 :                     "need timeline ID to uniquely identify the layer when traversal crosses ancestor boundary",
    2176              :                 );
    2177            2 :                 format!("{}", local_path.display())
    2178              :             }
    2179              :             None => {
    2180          993 :                 format!("remote {}/{self}", timeline_id)
    2181              :             }
    2182              :         }
    2183          995 :     }
    2184              : }
    2185              : 
    2186              : impl TraversalLayerExt for Arc<InMemoryLayer> {
    2187            2 :     fn traversal_id(&self) -> TraversalId {
    2188            2 :         format!("timeline {} in-memory {self}", self.get_timeline_id())
    2189            2 :     }
    2190              : }
    2191              : 
    2192              : impl Timeline {
    2193              :     ///
    2194              :     /// Get a handle to a Layer for reading.
    2195              :     ///
    2196              :     /// The returned Layer might be from an ancestor timeline, if the
    2197              :     /// segment hasn't been updated on this timeline yet.
    2198              :     ///
    2199              :     /// This function takes the current timeline's locked LayerMap as an argument,
    2200              :     /// so callers can avoid potential race conditions.
    2201      7262258 :     async fn get_reconstruct_data(
    2202      7262258 :         &self,
    2203      7262258 :         key: Key,
    2204      7262258 :         request_lsn: Lsn,
    2205      7262258 :         reconstruct_state: &mut ValueReconstructState,
    2206      7262258 :         ctx: &RequestContext,
    2207      7262260 :     ) -> Result<(), PageReconstructError> {
    2208      7262260 :         // Start from the current timeline.
    2209      7262260 :         let mut timeline_owned;
    2210      7262260 :         let mut timeline = self;
    2211      7262260 : 
    2212      7262260 :         let mut read_count = scopeguard::guard(0, |cnt| {
    2213      7262224 :             crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
    2214      7262260 :         });
    2215      7262260 : 
    2216      7262260 :         // For debugging purposes, collect the path of layers that we traversed
    2217      7262260 :         // through. It's included in the error message if we fail to find the key.
    2218      7262260 :         let mut traversal_path = Vec::<TraversalPathItem>::new();
    2219              : 
    2220      7262260 :         let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
    2221      1740405 :             *cached_lsn
    2222              :         } else {
    2223      5521855 :             Lsn(0)
    2224              :         };
    2225              : 
    2226              :         // 'prev_lsn' tracks the last LSN that we were at in our search. It's used
    2227              :         // to check that each iteration make some progress, to break infinite
    2228              :         // looping if something goes wrong.
    2229      7262260 :         let mut prev_lsn = Lsn(u64::MAX);
    2230      7262260 : 
    2231      7262260 :         let mut result = ValueReconstructResult::Continue;
    2232      7262260 :         let mut cont_lsn = Lsn(request_lsn.0 + 1);
    2233              : 
    2234     60385848 :         'outer: loop {
    2235     60385848 :             // The function should have updated 'state'
    2236     60385848 :             //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
    2237     60385848 :             match result {
    2238      5627647 :                 ValueReconstructResult::Complete => return Ok(()),
    2239              :                 ValueReconstructResult::Continue => {
    2240              :                     // If we reached an earlier cached page image, we're done.
    2241     54758199 :                     if cont_lsn == cached_lsn + 1 {
    2242      1634558 :                         MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
    2243      1634558 :                         return Ok(());
    2244     53123641 :                     }
    2245     53123641 :                     if prev_lsn <= cont_lsn {
    2246              :                         // Didn't make any progress in last iteration. Error out to avoid
    2247              :                         // getting stuck in the loop.
    2248            0 :                         return Err(layer_traversal_error(format!(
    2249            0 :                             "could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
    2250            0 :                             key,
    2251            0 :                             Lsn(cont_lsn.0 - 1),
    2252            0 :                             request_lsn,
    2253            0 :                             timeline.ancestor_lsn
    2254            0 :                         ), traversal_path));
    2255     53123641 :                     }
    2256     53123641 :                     prev_lsn = cont_lsn;
    2257              :                 }
    2258              :                 ValueReconstructResult::Missing => {
    2259              :                     return Err(layer_traversal_error(
    2260            2 :                         if cfg!(test) {
    2261            2 :                             format!(
    2262            2 :                                 "could not find data for key {} at LSN {}, for request at LSN {}\n{}",
    2263            2 :                                 key, cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
    2264            2 :                             )
    2265              :                         } else {
    2266            0 :                             format!(
    2267            0 :                                 "could not find data for key {} at LSN {}, for request at LSN {}",
    2268            0 :                                 key, cont_lsn, request_lsn
    2269            0 :                             )
    2270              :                         },
    2271            2 :                         traversal_path,
    2272              :                     ));
    2273              :                 }
    2274              :             }
    2275              : 
    2276              :             // Recurse into ancestor if needed
    2277     53123641 :             if Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
    2278            0 :                 trace!(
    2279            0 :                     "going into ancestor {}, cont_lsn is {}",
    2280            0 :                     timeline.ancestor_lsn,
    2281            0 :                     cont_lsn
    2282            0 :                 );
    2283      1068484 :                 let ancestor = match timeline.get_ancestor_timeline() {
    2284      1068484 :                     Ok(timeline) => timeline,
    2285            0 :                     Err(e) => return Err(PageReconstructError::from(e)),
    2286              :                 };
    2287              : 
    2288              :                 // It's possible that the ancestor timeline isn't active yet, or
    2289              :                 // is active but hasn't yet caught up to the branch point. Wait
    2290              :                 // for it.
    2291              :                 //
    2292              :                 // This cannot happen while the pageserver is running normally,
    2293              :                 // because you cannot create a branch from a point that isn't
    2294              :                 // present in the pageserver yet. However, we don't wait for the
    2295              :                 // branch point to be uploaded to cloud storage before creating
    2296              :                 // a branch. I.e., the branch LSN need not be remote consistent
    2297              :                 // for the branching operation to succeed.
    2298              :                 //
    2299              :                 // Hence, if we try to load a tenant in such a state where
    2300              :                 // 1. the existence of the branch was persisted (in IndexPart and/or locally)
    2301              :                 // 2. but the ancestor state is behind branch_lsn because it was not yet persisted
    2302              :                 // then we will need to wait for the ancestor timeline to
    2303              :                 // re-stream WAL up to branch_lsn before we access it.
    2304              :                 //
    2305              :                 // How can a tenant get in such a state?
    2306              :                 // - ungraceful pageserver process exit
    2307              :                 // - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
    2308              :                 //
    2309              :                 // NB: this could be avoided by requiring
    2310              :                 //   branch_lsn >= remote_consistent_lsn
    2311              :                 // during branch creation.
    2312      1068484 :                 match ancestor.wait_to_become_active(ctx).await {
    2313      1068482 :                     Ok(()) => {}
    2314            2 :                     Err(state) if state == TimelineState::Stopping => {
    2315            1 :                         return Err(PageReconstructError::AncestorStopping(ancestor.timeline_id));
    2316              :                     }
    2317            1 :                     Err(state) => {
    2318            1 :                         return Err(PageReconstructError::Other(anyhow::anyhow!(
    2319            1 :                             "Timeline {} will not become active. Current state: {:?}",
    2320            1 :                             ancestor.timeline_id,
    2321            1 :                             &state,
    2322            1 :                         )));
    2323              :                     }
    2324              :                 }
    2325      1068482 :                 ancestor
    2326      1068482 :                     .wait_lsn(timeline.ancestor_lsn, ctx)
    2327           10 :                     .await
    2328      1068482 :                     .with_context(|| {
    2329            0 :                         format!(
    2330            0 :                             "wait for lsn {} on ancestor timeline_id={}",
    2331            0 :                             timeline.ancestor_lsn, ancestor.timeline_id
    2332            0 :                         )
    2333      1068482 :                     })?;
    2334              : 
    2335      1068482 :                 timeline_owned = ancestor;
    2336      1068482 :                 timeline = &*timeline_owned;
    2337      1068482 :                 prev_lsn = Lsn(u64::MAX);
    2338      1068482 :                 continue 'outer;
    2339     52055157 :             }
    2340              : 
    2341              :             #[allow(clippy::never_loop)] // see comment at bottom of this loop
    2342              :             'layer_map_search: loop {
    2343          994 :                 let remote_layer = {
    2344     52056143 :                     let guard = timeline.layers.read().await;
    2345     52056115 :                     let layers = guard.layer_map();
    2346              : 
    2347              :                     // Check the open and frozen in-memory layers first, in order from newest
    2348              :                     // to oldest.
    2349     52056115 :                     if let Some(open_layer) = &layers.open_layer {
    2350     47011144 :                         let start_lsn = open_layer.get_lsn_range().start;
    2351     47011144 :                         if cont_lsn > start_lsn {
    2352              :                             //info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
    2353              :                             // Get all the data needed to reconstruct the page version from this layer.
    2354              :                             // But if we have an older cached page image, no need to go past that.
    2355      5990712 :                             let lsn_floor = max(cached_lsn + 1, start_lsn);
    2356      5990712 :                             result = match open_layer
    2357      5990712 :                                 .get_value_reconstruct_data(
    2358      5990712 :                                     key,
    2359      5990712 :                                     lsn_floor..cont_lsn,
    2360      5990712 :                                     reconstruct_state,
    2361      5990712 :                                     ctx,
    2362      5990712 :                                 )
    2363       347548 :                                 .await
    2364              :                             {
    2365      5990711 :                                 Ok(result) => result,
    2366            0 :                                 Err(e) => return Err(PageReconstructError::from(e)),
    2367              :                             };
    2368      5990711 :                             cont_lsn = lsn_floor;
    2369      5990711 :                             // metrics: open_layer does not count as fs access, so we are not updating `read_count`
    2370      5990711 :                             traversal_path.push((
    2371      5990711 :                                 result,
    2372      5990711 :                                 cont_lsn,
    2373      5990711 :                                 Box::new({
    2374      5990711 :                                     let open_layer = Arc::clone(open_layer);
    2375      5990711 :                                     move || open_layer.traversal_id()
    2376      5990711 :                                 }),
    2377      5990711 :                             ));
    2378      5990711 :                             continue 'outer;
    2379     41020432 :                         }
    2380      5044971 :                     }
    2381     46065403 :                     for frozen_layer in layers.frozen_layers.iter().rev() {
    2382      2038486 :                         let start_lsn = frozen_layer.get_lsn_range().start;
    2383      2038486 :                         if cont_lsn > start_lsn {
    2384              :                             //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
    2385       257399 :                             let lsn_floor = max(cached_lsn + 1, start_lsn);
    2386       257399 :                             result = match frozen_layer
    2387       257399 :                                 .get_value_reconstruct_data(
    2388       257399 :                                     key,
    2389       257399 :                                     lsn_floor..cont_lsn,
    2390       257399 :                                     reconstruct_state,
    2391       257399 :                                     ctx,
    2392       257399 :                                 )
    2393         4230 :                                 .await
    2394              :                             {
    2395       257399 :                                 Ok(result) => result,
    2396            0 :                                 Err(e) => return Err(PageReconstructError::from(e)),
    2397              :                             };
    2398       257399 :                             cont_lsn = lsn_floor;
    2399       257399 :                             // metrics: open_layer does not count as fs access, so we are not updating `read_count`
    2400       257399 :                             traversal_path.push((
    2401       257399 :                                 result,
    2402       257399 :                                 cont_lsn,
    2403       257399 :                                 Box::new({
    2404       257399 :                                     let frozen_layer = Arc::clone(frozen_layer);
    2405       257399 :                                     move || frozen_layer.traversal_id()
    2406       257399 :                                 }),
    2407       257399 :                             ));
    2408       257399 :                             continue 'outer;
    2409      1781087 :                         }
    2410              :                     }
    2411              : 
    2412     45808004 :                     if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
    2413     45696390 :                         let layer = guard.get_from_desc(&layer);
    2414              :                         // If it's a remote layer, download it and retry.
    2415          994 :                         if let Some(remote_layer) =
    2416     45696390 :                             super::storage_layer::downcast_remote_layer(&layer)
    2417              :                         {
    2418              :                             // TODO: push a breadcrumb to 'traversal_path' to record the fact that
    2419              :                             // we downloaded / would need to download this layer.
    2420          994 :                             remote_layer // download happens outside the scope of `layers` guard object
    2421              :                         } else {
    2422              :                             // Get all the data needed to reconstruct the page version from this layer.
    2423              :                             // But if we have an older cached page image, no need to go past that.
    2424     45695396 :                             let lsn_floor = max(cached_lsn + 1, lsn_floor);
    2425     45695396 :                             result = match layer
    2426     45695396 :                                 .get_value_reconstruct_data(
    2427     45695396 :                                     key,
    2428     45695396 :                                     lsn_floor..cont_lsn,
    2429     45695396 :                                     reconstruct_state,
    2430     45695396 :                                     ctx,
    2431     45695396 :                                 )
    2432      1375502 :                                 .await
    2433              :                             {
    2434     45695382 :                                 Ok(result) => result,
    2435           11 :                                 Err(e) => return Err(PageReconstructError::from(e)),
    2436              :                             };
    2437     45695382 :                             cont_lsn = lsn_floor;
    2438     45695382 :                             *read_count += 1;
    2439     45695382 :                             traversal_path.push((
    2440     45695382 :                                 result,
    2441     45695382 :                                 cont_lsn,
    2442     45695382 :                                 Box::new({
    2443     45695382 :                                     let layer = Arc::clone(&layer);
    2444     45695382 :                                     move || layer.traversal_id()
    2445     45695382 :                                 }),
    2446     45695382 :                             ));
    2447     45695382 :                             continue 'outer;
    2448              :                         }
    2449       111614 :                     } else if timeline.ancestor_timeline.is_some() {
    2450              :                         // Nothing on this timeline. Traverse to parent
    2451       111614 :                         result = ValueReconstructResult::Continue;
    2452       111614 :                         cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
    2453       111614 :                         continue 'outer;
    2454              :                     } else {
    2455              :                         // Nothing found
    2456            0 :                         result = ValueReconstructResult::Missing;
    2457            0 :                         continue 'outer;
    2458              :                     }
    2459              :                 };
    2460              :                 // Download the remote_layer and replace it in the layer map.
    2461              :                 // For that, we need to release the mutex. Otherwise, we'd deadlock.
    2462              :                 //
    2463              :                 // The control flow is so weird here because `drop(layers)` inside
    2464              :                 // the if stmt above is not enough for current rustc: it requires
    2465              :                 // that the layers lock guard is not in scope across the download
    2466              :                 // await point.
    2467          994 :                 let remote_layer_as_persistent: Arc<dyn PersistentLayer> =
    2468          994 :                     Arc::clone(&remote_layer) as Arc<dyn PersistentLayer>;
    2469          994 :                 let id = remote_layer_as_persistent.traversal_id();
    2470          993 :                 info!(
    2471          993 :                     "need remote layer {} for task kind {:?}",
    2472          993 :                     id,
    2473          993 :                     ctx.task_kind()
    2474          993 :                 );
    2475              : 
    2476              :                 // The next layer doesn't exist locally. Need to download it.
    2477              :                 // (The control flow is a bit complicated here because we must drop the 'layers'
    2478              :                 // lock before awaiting on the Future.)
    2479          993 :                 match (
    2480          993 :                     ctx.download_behavior(),
    2481          993 :                     self.conf.ondemand_download_behavior_treat_error_as_warn,
    2482          993 :                 ) {
    2483              :                     (DownloadBehavior::Download, _) => {
    2484          993 :                         info!(
    2485          993 :                             "on-demand downloading remote layer {id} for task kind {:?}",
    2486          993 :                             ctx.task_kind()
    2487          993 :                         );
    2488         1388 :                         timeline.download_remote_layer(remote_layer).await?;
    2489          986 :                         continue 'layer_map_search;
    2490              :                     }
    2491              :                     (DownloadBehavior::Warn, _) | (DownloadBehavior::Error, true) => {
    2492            0 :                         warn!(
    2493            0 :                             "unexpectedly on-demand downloading remote layer {} for task kind {:?}",
    2494            0 :                             id,
    2495            0 :                             ctx.task_kind()
    2496            0 :                         );
    2497            0 :                         UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
    2498            0 :                         timeline.download_remote_layer(remote_layer).await?;
    2499            0 :                         continue 'layer_map_search;
    2500              :                     }
    2501              :                     (DownloadBehavior::Error, false) => {
    2502            0 :                         return Err(PageReconstructError::NeedsDownload(
    2503            0 :                             TenantTimelineId::new(self.tenant_id, self.timeline_id),
    2504            0 :                             remote_layer.filename(),
    2505            0 :                         ))
    2506              :                     }
    2507              :                 }
    2508              :             }
    2509              :         }
    2510      7262224 :     }
    2511              : 
    2512      7262300 :     async fn lookup_cached_page(&self, key: &Key, lsn: Lsn) -> Option<(Lsn, Bytes)> {
    2513      7262300 :         let cache = page_cache::get();
    2514              : 
    2515              :         // FIXME: It's pointless to check the cache for things that are not 8kB pages.
    2516              :         // We should look at the key to determine if it's a cacheable object
    2517      7262300 :         let (lsn, read_guard) = cache
    2518      7262300 :             .lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn)
    2519      5521855 :             .await?;
    2520      1740445 :         let img = Bytes::from(read_guard.to_vec());
    2521      1740445 :         Some((lsn, img))
    2522      7262300 :     }
    2523              : 
    2524      1068483 :     fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
    2525      1068483 :         let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
    2526            0 :             format!(
    2527            0 :                 "Ancestor is missing. Timeline id: {} Ancestor id {:?}",
    2528            0 :                 self.timeline_id,
    2529            0 :                 self.get_ancestor_timeline_id(),
    2530            0 :             )
    2531      1068483 :         })?;
    2532      1068483 :         Ok(Arc::clone(ancestor))
    2533      1068483 :     }
    2534              : 
    2535              :     ///
    2536              :     /// Get a handle to the latest layer for appending.
    2537              :     ///
    2538     82563785 :     async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
    2539     82563872 :         let mut guard = self.layers.write().await;
    2540     82563863 :         let layer = guard.get_layer_for_write(
    2541     82563863 :             lsn,
    2542     82563863 :             self.get_last_record_lsn(),
    2543     82563863 :             self.conf,
    2544     82563863 :             self.timeline_id,
    2545     82563863 :             self.tenant_id,
    2546     82563863 :         )?;
    2547     82563863 :         Ok(layer)
    2548     82563863 :     }
    2549              : 
    2550     82545991 :     async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
    2551              :         //info!("PUT: key {} at {}", key, lsn);
    2552     82546078 :         let layer = self.get_layer_for_write(lsn).await?;
    2553     82546069 :         layer.put_value(key, lsn, val).await?;
    2554     82546066 :         Ok(())
    2555     82546066 :     }
    2556              : 
    2557        17794 :     async fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
    2558        17794 :         let layer = self.get_layer_for_write(lsn).await?;
    2559        17794 :         layer.put_tombstone(key_range, lsn).await?;
    2560        17794 :         Ok(())
    2561        17794 :     }
    2562              : 
    2563     74268468 :     fn finish_write(&self, new_lsn: Lsn) {
    2564     74268468 :         assert!(new_lsn.is_aligned());
    2565              : 
    2566     74268468 :         self.metrics.last_record_gauge.set(new_lsn.0 as i64);
    2567     74268468 :         self.last_record_lsn.advance(new_lsn);
    2568     74268468 :     }
    2569              : 
    2570         6753 :     async fn freeze_inmem_layer(&self, write_lock_held: bool) {
    2571              :         // Freeze the current open in-memory layer. It will be written to disk on next
    2572              :         // iteration.
    2573         6753 :         let _write_guard = if write_lock_held {
    2574         4955 :             None
    2575              :         } else {
    2576         1798 :             Some(self.write_lock.lock().await)
    2577              :         };
    2578         6753 :         let mut guard = self.layers.write().await;
    2579         6753 :         guard
    2580         6753 :             .try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at)
    2581           28 :             .await;
    2582         6753 :     }
    2583              : 
    2584              :     /// Layer flusher task's main loop.
    2585         1364 :     async fn flush_loop(
    2586         1364 :         self: &Arc<Self>,
    2587         1364 :         mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
    2588         1364 :         ctx: &RequestContext,
    2589         1364 :     ) {
    2590         1364 :         info!("started flush loop");
    2591              :         loop {
    2592         7304 :             tokio::select! {
    2593        13416 :                 _ = task_mgr::shutdown_watcher() => {
    2594        13416 :                     info!("shutting down layer flush task");
    2595        13416 :                     break;
    2596        13416 :                 },
    2597        13416 :                 _ = layer_flush_start_rx.changed() => {}
    2598        13416 :             }
    2599              : 
    2600            0 :             trace!("waking up");
    2601         5945 :             let timer = self.metrics.flush_time_histo.start_timer();
    2602         5945 :             let flush_counter = *layer_flush_start_rx.borrow();
    2603         5940 :             let result = loop {
    2604        12454 :                 let layer_to_flush = {
    2605        12454 :                     let guard = self.layers.read().await;
    2606        12454 :                     guard.layer_map().frozen_layers.front().cloned()
    2607              :                     // drop 'layers' lock to allow concurrent reads and writes
    2608              :                 };
    2609        12454 :                 let Some(layer_to_flush) = layer_to_flush else {
    2610         5940 :                     break Ok(());
    2611              :                 };
    2612        15251 :                 if let Err(err) = self.flush_frozen_layer(layer_to_flush, ctx).await {
    2613            0 :                     error!("could not flush frozen layer: {err:?}");
    2614            0 :                     break Err(err);
    2615         6509 :                 }
    2616              :             };
    2617              :             // Notify any listeners that we're done
    2618         5940 :             let _ = self
    2619         5940 :                 .layer_flush_done_tx
    2620         5940 :                 .send_replace((flush_counter, result));
    2621         5940 : 
    2622         5940 :             timer.stop_and_record();
    2623              :         }
    2624          527 :     }
    2625              : 
    2626         1798 :     async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
    2627         1798 :         let mut rx = self.layer_flush_done_tx.subscribe();
    2628         1798 : 
    2629         1798 :         // Increment the flush cycle counter and wake up the flush task.
    2630         1798 :         // Remember the new value, so that when we listen for the flush
    2631         1798 :         // to finish, we know when the flush that we initiated has
    2632         1798 :         // finished, instead of some other flush that was started earlier.
    2633         1798 :         let mut my_flush_request = 0;
    2634         1798 : 
    2635         1798 :         let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
    2636         1798 :         if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
    2637           78 :             anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
    2638         1720 :         }
    2639         1720 : 
    2640         1720 :         self.layer_flush_start_tx.send_modify(|counter| {
    2641         1720 :             my_flush_request = *counter + 1;
    2642         1720 :             *counter = my_flush_request;
    2643         1720 :         });
    2644              : 
    2645         3440 :         loop {
    2646         3440 :             {
    2647         3440 :                 let (last_result_counter, last_result) = &*rx.borrow();
    2648         3440 :                 if *last_result_counter >= my_flush_request {
    2649         1720 :                     if let Err(_err) = last_result {
    2650              :                         // We already logged the original error in
    2651              :                         // flush_loop. We cannot propagate it to the caller
    2652              :                         // here, because it might not be Cloneable
    2653            0 :                         anyhow::bail!(
    2654            0 :                             "Could not flush frozen layer. Request id: {}",
    2655            0 :                             my_flush_request
    2656            0 :                         );
    2657              :                     } else {
    2658         1720 :                         return Ok(());
    2659              :                     }
    2660         1720 :                 }
    2661              :             }
    2662            0 :             trace!("waiting for flush to complete");
    2663         1720 :             rx.changed().await?;
    2664            0 :             trace!("done")
    2665              :         }
    2666         1798 :     }
    2667              : 
    2668         4955 :     fn flush_frozen_layers(&self) {
    2669         4955 :         self.layer_flush_start_tx.send_modify(|val| *val += 1);
    2670         4955 :     }
    2671              : 
    2672              :     /// Flush one frozen in-memory layer to disk, as a new delta layer.
    2673         6514 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer))]
    2674              :     async fn flush_frozen_layer(
    2675              :         self: &Arc<Self>,
    2676              :         frozen_layer: Arc<InMemoryLayer>,
    2677              :         ctx: &RequestContext,
    2678              :     ) -> anyhow::Result<()> {
    2679              :         // As a special case, when we have just imported an image into the repository,
    2680              :         // instead of writing out a L0 delta layer, we directly write out image layer
    2681              :         // files instead. This is possible as long as *all* the data imported into the
    2682              :         // repository have the same LSN.
    2683              :         let lsn_range = frozen_layer.get_lsn_range();
    2684              :         let (layer_paths_to_upload, delta_layer_to_add) =
    2685              :             if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
    2686              :                 #[cfg(test)]
    2687              :                 match &mut *self.flush_loop_state.lock().unwrap() {
    2688              :                     FlushLoopState::NotStarted | FlushLoopState::Exited => {
    2689              :                         panic!("flush loop not running")
    2690              :                     }
    2691              :                     FlushLoopState::Running {
    2692              :                         initdb_optimization_count,
    2693              :                         ..
    2694              :                     } => {
    2695              :                         *initdb_optimization_count += 1;
    2696              :                     }
    2697              :                 }
    2698              :                 // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
    2699              :                 // require downloading anything during initial import.
    2700              :                 let (partitioning, _lsn) = self
    2701              :                     .repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx)
    2702              :                     .await?;
    2703              :                 // For image layers, we add them immediately into the layer map.
    2704              :                 (
    2705              :                     self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
    2706              :                         .await?,
    2707              :                     None,
    2708              :                 )
    2709              :             } else {
    2710              :                 #[cfg(test)]
    2711              :                 match &mut *self.flush_loop_state.lock().unwrap() {
    2712              :                     FlushLoopState::NotStarted | FlushLoopState::Exited => {
    2713              :                         panic!("flush loop not running")
    2714              :                     }
    2715              :                     FlushLoopState::Running {
    2716              :                         expect_initdb_optimization,
    2717              :                         ..
    2718              :                     } => {
    2719              :                         assert!(!*expect_initdb_optimization, "expected initdb optimization");
    2720              :                     }
    2721              :                 }
    2722              :                 // Normal case, write out a L0 delta layer file.
    2723              :                 // `create_delta_layer` will not modify the layer map.
    2724              :                 // We will remove frozen layer and add delta layer in one atomic operation later.
    2725              :                 let layer = self.create_delta_layer(&frozen_layer).await?;
    2726              :                 (
    2727              :                     HashMap::from([(
    2728              :                         layer.filename(),
    2729              :                         LayerFileMetadata::new(layer.layer_desc().file_size, self.generation),
    2730              :                     )]),
    2731              :                     Some(layer),
    2732              :                 )
    2733              :             };
    2734              : 
    2735              :         // The new on-disk layers are now in the layer map. We can remove the
    2736              :         // in-memory layer from the map now. The flushed layer is stored in
    2737              :         // the mapping in `create_delta_layer`.
    2738              :         {
    2739              :             let mut guard = self.layers.write().await;
    2740              : 
    2741              :             if let Some(ref l) = delta_layer_to_add {
    2742              :                 // TODO: move access stats, metrics update, etc. into layer manager.
    2743              :                 l.access_stats().record_residence_event(
    2744              :                     LayerResidenceStatus::Resident,
    2745              :                     LayerResidenceEventReason::LayerCreate,
    2746              :                 );
    2747              : 
    2748              :                 // update metrics
    2749              :                 let sz = l.layer_desc().file_size;
    2750              :                 self.metrics.resident_physical_size_gauge.add(sz);
    2751              :                 self.metrics.num_persistent_files_created.inc_by(1);
    2752              :                 self.metrics.persistent_bytes_written.inc_by(sz);
    2753              :             }
    2754              : 
    2755              :             guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer);
    2756              :             // release lock on 'layers'
    2757              :         }
    2758              : 
    2759              :         // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
    2760              :         // a compaction can delete the file and then it won't be available for uploads any more.
    2761              :         // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
    2762              :         // race situation.
    2763              :         // See https://github.com/neondatabase/neon/issues/4526
    2764         6513 :         pausable_failpoint!("flush-frozen-pausable");
    2765              : 
    2766              :         // This failpoint is used by another test case `test_pageserver_recovery`.
    2767            0 :         fail_point!("flush-frozen-exit");
    2768              : 
    2769              :         // Update the metadata file, with new 'disk_consistent_lsn'
    2770              :         //
    2771              :         // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
    2772              :         // *all* the layers, to avoid fsyncing the file multiple times.
    2773              :         let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
    2774              :         let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
    2775              : 
    2776              :         // If we were able to advance 'disk_consistent_lsn', save it the metadata file.
    2777              :         // After crash, we will restart WAL streaming and processing from that point.
    2778              :         if disk_consistent_lsn != old_disk_consistent_lsn {
    2779              :             assert!(disk_consistent_lsn > old_disk_consistent_lsn);
    2780              :             self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload)
    2781              :                 .await
    2782              :                 .context("update_metadata_file")?;
    2783              :             // Also update the in-memory copy
    2784              :             self.disk_consistent_lsn.store(disk_consistent_lsn);
    2785              :         }
    2786              :         Ok(())
    2787              :     }
    2788              : 
    2789              :     /// Update metadata file
    2790         6521 :     async fn update_metadata_file(
    2791         6521 :         &self,
    2792         6521 :         disk_consistent_lsn: Lsn,
    2793         6521 :         layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
    2794         6521 :     ) -> anyhow::Result<()> {
    2795         6521 :         // We can only save a valid 'prev_record_lsn' value on disk if we
    2796         6521 :         // flushed *all* in-memory changes to disk. We only track
    2797         6521 :         // 'prev_record_lsn' in memory for the latest processed record, so we
    2798         6521 :         // don't remember what the correct value that corresponds to some old
    2799         6521 :         // LSN is. But if we flush everything, then the value corresponding
    2800         6521 :         // current 'last_record_lsn' is correct and we can store it on disk.
    2801         6521 :         let RecordLsn {
    2802         6521 :             last: last_record_lsn,
    2803         6521 :             prev: prev_record_lsn,
    2804         6521 :         } = self.last_record_lsn.load();
    2805         6521 :         let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
    2806         1327 :             Some(prev_record_lsn)
    2807              :         } else {
    2808         5194 :             None
    2809              :         };
    2810              : 
    2811         6521 :         let ancestor_timeline_id = self
    2812         6521 :             .ancestor_timeline
    2813         6521 :             .as_ref()
    2814         6521 :             .map(|ancestor| ancestor.timeline_id);
    2815         6521 : 
    2816         6521 :         let metadata = TimelineMetadata::new(
    2817         6521 :             disk_consistent_lsn,
    2818         6521 :             ondisk_prev_record_lsn,
    2819         6521 :             ancestor_timeline_id,
    2820         6521 :             self.ancestor_lsn,
    2821         6521 :             *self.latest_gc_cutoff_lsn.read(),
    2822         6521 :             self.initdb_lsn,
    2823         6521 :             self.pg_version,
    2824         6521 :         );
    2825         6521 : 
    2826         6521 :         fail_point!("checkpoint-before-saving-metadata", |x| bail!(
    2827            0 :             "{}",
    2828            0 :             x.unwrap()
    2829         6521 :         ));
    2830              : 
    2831         6521 :         save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
    2832            0 :             .await
    2833         6521 :             .context("save_metadata")?;
    2834              : 
    2835         6521 :         if let Some(remote_client) = &self.remote_client {
    2836         7883 :             for (path, layer_metadata) in layer_paths_to_upload {
    2837         3939 :                 remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
    2838              :             }
    2839         3944 :             remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
    2840         2577 :         }
    2841              : 
    2842         6521 :         Ok(())
    2843         6521 :     }
    2844              : 
    2845              :     // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
    2846              :     // in layer map immediately. The caller is responsible to put it into the layer map.
    2847         6477 :     async fn create_delta_layer(
    2848         6477 :         self: &Arc<Self>,
    2849         6477 :         frozen_layer: &Arc<InMemoryLayer>,
    2850         6477 :     ) -> anyhow::Result<DeltaLayer> {
    2851         6477 :         let span = tracing::info_span!("blocking");
    2852         6477 :         let new_delta: DeltaLayer = tokio::task::spawn_blocking({
    2853         6477 :             let _g = span.entered();
    2854         6477 :             let self_clone = Arc::clone(self);
    2855         6477 :             let frozen_layer = Arc::clone(frozen_layer);
    2856         6477 :             move || {
    2857              :                 // Write it out
    2858              :                 // Keep this inside `spawn_blocking` and `Handle::current`
    2859              :                 // as long as the write path is still sync and the read impl
    2860              :                 // is still not fully async. Otherwise executor threads would
    2861              :                 // be blocked.
    2862         6477 :                 let new_delta = Handle::current().block_on(frozen_layer.write_to_disk())?;
    2863         6477 :                 let new_delta_path = new_delta.path();
    2864         6477 : 
    2865         6477 :                 // Sync it to disk.
    2866         6477 :                 //
    2867         6477 :                 // We must also fsync the timeline dir to ensure the directory entries for
    2868         6477 :                 // new layer files are durable.
    2869         6477 :                 //
    2870         6477 :                 // NB: timeline dir must be synced _after_ the file contents are durable.
    2871         6477 :                 // So, two separate fsyncs are required, they mustn't be batched.
    2872         6477 :                 //
    2873         6477 :                 // TODO: If we're running inside 'flush_frozen_layers' and there are multiple
    2874         6477 :                 // files to flush, the fsync overhead can be reduces as follows:
    2875         6477 :                 // 1. write them all to temporary file names
    2876         6477 :                 // 2. fsync them
    2877         6477 :                 // 3. rename to the final name
    2878         6477 :                 // 4. fsync the parent directory.
    2879         6477 :                 // Note that (1),(2),(3) today happen inside write_to_disk().
    2880         6477 :                 par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
    2881         6477 :                 par_fsync::par_fsync(&[self_clone
    2882         6477 :                     .conf
    2883         6477 :                     .timeline_path(&self_clone.tenant_id, &self_clone.timeline_id)])
    2884         6477 :                 .context("fsync of timeline dir")?;
    2885              : 
    2886         6476 :                 anyhow::Ok(new_delta)
    2887         6477 :             }
    2888         6477 :         })
    2889         6476 :         .await
    2890         6476 :         .context("spawn_blocking")??;
    2891              : 
    2892         6476 :         Ok(new_delta)
    2893         6476 :     }
    2894              : 
    2895         1438 :     async fn repartition(
    2896         1438 :         &self,
    2897         1438 :         lsn: Lsn,
    2898         1438 :         partition_size: u64,
    2899         1438 :         ctx: &RequestContext,
    2900         1438 :     ) -> anyhow::Result<(KeyPartitioning, Lsn)> {
    2901         1438 :         {
    2902         1438 :             let partitioning_guard = self.partitioning.lock().unwrap();
    2903         1438 :             let distance = lsn.0 - partitioning_guard.1 .0;
    2904         1438 :             if partitioning_guard.1 != Lsn(0) && distance <= self.repartition_threshold {
    2905            0 :                 debug!(
    2906            0 :                     distance,
    2907            0 :                     threshold = self.repartition_threshold,
    2908            0 :                     "no repartitioning needed"
    2909            0 :                 );
    2910          778 :                 return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
    2911          660 :             }
    2912              :         }
    2913       383412 :         let keyspace = self.collect_keyspace(lsn, ctx).await?;
    2914          658 :         let partitioning = keyspace.partition(partition_size);
    2915          658 : 
    2916          658 :         let mut partitioning_guard = self.partitioning.lock().unwrap();
    2917          658 :         if lsn > partitioning_guard.1 {
    2918          658 :             *partitioning_guard = (partitioning, lsn);
    2919          658 :         } else {
    2920            0 :             warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
    2921              :         }
    2922          658 :         Ok((partitioning_guard.0.clone(), partitioning_guard.1))
    2923         1436 :     }
    2924              : 
    2925              :     // Is it time to create a new image layer for the given partition?
    2926        16758 :     async fn time_for_new_image_layer(
    2927        16758 :         &self,
    2928        16758 :         partition: &KeySpace,
    2929        16758 :         lsn: Lsn,
    2930        16758 :     ) -> anyhow::Result<bool> {
    2931        16758 :         let threshold = self.get_image_creation_threshold();
    2932              : 
    2933        16758 :         let guard = self.layers.read().await;
    2934        16758 :         let layers = guard.layer_map();
    2935        16758 : 
    2936        16758 :         let mut max_deltas = 0;
    2937        16758 :         {
    2938        16758 :             let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
    2939        16758 :             if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
    2940         3398 :                 let img_range =
    2941         3398 :                     partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
    2942         3398 :                 if wanted.overlaps(&img_range) {
    2943              :                     //
    2944              :                     // gc_timeline only pays attention to image layers that are older than the GC cutoff,
    2945              :                     // but create_image_layers creates image layers at last-record-lsn.
    2946              :                     // So it's possible that gc_timeline wants a new image layer to be created for a key range,
    2947              :                     // but the range is already covered by image layers at more recent LSNs. Before we
    2948              :                     // create a new image layer, check if the range is already covered at more recent LSNs.
    2949            0 :                     if !layers
    2950            0 :                         .image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))?
    2951              :                     {
    2952            0 :                         debug!(
    2953            0 :                             "Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
    2954            0 :                             img_range.start, img_range.end, cutoff_lsn, lsn
    2955            0 :                         );
    2956            0 :                         return Ok(true);
    2957            0 :                     }
    2958         3398 :                 }
    2959        13360 :             }
    2960              :         }
    2961              : 
    2962      2063764 :         for part_range in &partition.ranges {
    2963      2048082 :             let image_coverage = layers.image_coverage(part_range, lsn)?;
    2964      3979335 :             for (img_range, last_img) in image_coverage {
    2965      1932329 :                 let img_lsn = if let Some(last_img) = last_img {
    2966       177346 :                     last_img.get_lsn_range().end
    2967              :                 } else {
    2968      1754983 :                     Lsn(0)
    2969              :                 };
    2970              :                 // Let's consider an example:
    2971              :                 //
    2972              :                 // delta layer with LSN range 71-81
    2973              :                 // delta layer with LSN range 81-91
    2974              :                 // delta layer with LSN range 91-101
    2975              :                 // image layer at LSN 100
    2976              :                 //
    2977              :                 // If 'lsn' is still 100, i.e. no new WAL has been processed since the last image layer,
    2978              :                 // there's no need to create a new one. We check this case explicitly, to avoid passing
    2979              :                 // a bogus range to count_deltas below, with start > end. It's even possible that there
    2980              :                 // are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
    2981              :                 // after we read last_record_lsn, which is passed here in the 'lsn' argument.
    2982      1932329 :                 if img_lsn < lsn {
    2983      1915119 :                     let num_deltas =
    2984      1915119 :                         layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?;
    2985              : 
    2986      1915119 :                     max_deltas = max_deltas.max(num_deltas);
    2987      1915119 :                     if num_deltas >= threshold {
    2988            0 :                         debug!(
    2989            0 :                             "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
    2990            0 :                             img_range.start, img_range.end, num_deltas, img_lsn, lsn
    2991            0 :                         );
    2992         1076 :                         return Ok(true);
    2993      1914043 :                     }
    2994        17210 :                 }
    2995              :             }
    2996              :         }
    2997              : 
    2998            0 :         debug!(
    2999            0 :             max_deltas,
    3000            0 :             "none of the partitioned ranges had >= {threshold} deltas"
    3001            0 :         );
    3002        15682 :         Ok(false)
    3003        16758 :     }
    3004              : 
    3005         1436 :     async fn create_image_layers(
    3006         1436 :         &self,
    3007         1436 :         partitioning: &KeyPartitioning,
    3008         1436 :         lsn: Lsn,
    3009         1436 :         force: bool,
    3010         1436 :         ctx: &RequestContext,
    3011         1436 :     ) -> Result<HashMap<LayerFileName, LayerFileMetadata>, PageReconstructError> {
    3012         1436 :         let timer = self.metrics.create_images_time_histo.start_timer();
    3013         1436 :         let mut image_layers: Vec<ImageLayer> = Vec::new();
    3014         1436 : 
    3015         1436 :         // We need to avoid holes between generated image layers.
    3016         1436 :         // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
    3017         1436 :         // image layer with hole between them. In this case such layer can not be utilized by GC.
    3018         1436 :         //
    3019         1436 :         // How such hole between partitions can appear?
    3020         1436 :         // if we have relation with relid=1 and size 100 and relation with relid=2 with size 200 then result of
    3021         1436 :         // KeySpace::partition may contain partitions <100000000..100000099> and <200000000..200000199>.
    3022         1436 :         // If there is delta layer <100000000..300000000> then it never be garbage collected because
    3023         1436 :         // image layers  <100000000..100000099> and <200000000..200000199> are not completely covering it.
    3024         1436 :         let mut start = Key::MIN;
    3025              : 
    3026        16795 :         for partition in partitioning.parts.iter() {
    3027        16795 :             let img_range = start..partition.ranges.last().unwrap().end;
    3028        16795 :             start = img_range.end;
    3029        16795 :             if force || self.time_for_new_image_layer(partition, lsn).await? {
    3030         1113 :                 let mut image_layer_writer = ImageLayerWriter::new(
    3031         1113 :                     self.conf,
    3032         1113 :                     self.timeline_id,
    3033         1113 :                     self.tenant_id,
    3034         1113 :                     &img_range,
    3035         1113 :                     lsn,
    3036         1113 :                 )
    3037            0 :                 .await?;
    3038              : 
    3039         1113 :                 fail_point!("image-layer-writer-fail-before-finish", |_| {
    3040            0 :                     Err(PageReconstructError::Other(anyhow::anyhow!(
    3041            0 :                         "failpoint image-layer-writer-fail-before-finish"
    3042            0 :                     )))
    3043         1113 :                 });
    3044        40031 :                 for range in &partition.ranges {
    3045        38919 :                     let mut key = range.start;
    3046       244071 :                     while key < range.end {
    3047       425559 :                         let img = match self.get(key, lsn, ctx).await {
    3048       205153 :                             Ok(img) => img,
    3049            0 :                             Err(err) => {
    3050            0 :                                 // If we fail to reconstruct a VM or FSM page, we can zero the
    3051            0 :                                 // page without losing any actual user data. That seems better
    3052            0 :                                 // than failing repeatedly and getting stuck.
    3053            0 :                                 //
    3054            0 :                                 // We had a bug at one point, where we truncated the FSM and VM
    3055            0 :                                 // in the pageserver, but the Postgres didn't know about that
    3056            0 :                                 // and continued to generate incremental WAL records for pages
    3057            0 :                                 // that didn't exist in the pageserver. Trying to replay those
    3058            0 :                                 // WAL records failed to find the previous image of the page.
    3059            0 :                                 // This special case allows us to recover from that situation.
    3060            0 :                                 // See https://github.com/neondatabase/neon/issues/2601.
    3061            0 :                                 //
    3062            0 :                                 // Unfortunately we cannot do this for the main fork, or for
    3063            0 :                                 // any metadata keys, keys, as that would lead to actual data
    3064            0 :                                 // loss.
    3065            0 :                                 if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
    3066            0 :                                     warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
    3067            0 :                                     ZERO_PAGE.clone()
    3068              :                                 } else {
    3069            0 :                                     return Err(err);
    3070              :                                 }
    3071              :                             }
    3072              :                         };
    3073       205153 :                         image_layer_writer.put_image(key, &img).await?;
    3074       205152 :                         key = key.next();
    3075              :                     }
    3076              :                 }
    3077         1112 :                 let image_layer = image_layer_writer.finish().await?;
    3078         1112 :                 image_layers.push(image_layer);
    3079        15682 :             }
    3080              :         }
    3081              :         // All layers that the GC wanted us to create have now been created.
    3082              :         //
    3083              :         // It's possible that another GC cycle happened while we were compacting, and added
    3084              :         // something new to wanted_image_layers, and we now clear that before processing it.
    3085              :         // That's OK, because the next GC iteration will put it back in.
    3086         1435 :         *self.wanted_image_layers.lock().unwrap() = None;
    3087         1435 : 
    3088         1435 :         // Sync the new layer to disk before adding it to the layer map, to make sure
    3089         1435 :         // we don't garbage collect something based on the new layer, before it has
    3090         1435 :         // reached the disk.
    3091         1435 :         //
    3092         1435 :         // We must also fsync the timeline dir to ensure the directory entries for
    3093         1435 :         // new layer files are durable
    3094         1435 :         //
    3095         1435 :         // Compaction creates multiple image layers. It would be better to create them all
    3096         1435 :         // and fsync them all in parallel.
    3097         1435 :         let all_paths = image_layers
    3098         1435 :             .iter()
    3099         1435 :             .map(|layer| layer.path())
    3100         1435 :             .collect::<Vec<_>>();
    3101         1435 : 
    3102         1435 :         par_fsync::par_fsync_async(&all_paths)
    3103          171 :             .await
    3104         1435 :             .context("fsync of newly created layer files")?;
    3105              : 
    3106         1435 :         par_fsync::par_fsync_async(&[self.conf.timeline_path(&self.tenant_id, &self.timeline_id)])
    3107         1435 :             .await
    3108         1435 :             .context("fsync of timeline dir")?;
    3109              : 
    3110         1435 :         let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
    3111              : 
    3112         1435 :         let mut guard = self.layers.write().await;
    3113         1435 :         let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id);
    3114              : 
    3115         2547 :         for l in &image_layers {
    3116         1112 :             let path = l.filename();
    3117         1112 :             let metadata = timeline_path
    3118         1112 :                 .join(path.file_name())
    3119         1112 :                 .metadata()
    3120         1112 :                 .with_context(|| format!("reading metadata of layer file {}", path.file_name()))?;
    3121              : 
    3122         1112 :             layer_paths_to_upload.insert(
    3123         1112 :                 path,
    3124         1112 :                 LayerFileMetadata::new(metadata.len(), self.generation),
    3125         1112 :             );
    3126         1112 : 
    3127         1112 :             self.metrics
    3128         1112 :                 .resident_physical_size_gauge
    3129         1112 :                 .add(metadata.len());
    3130         1112 :             let l = Arc::new(l);
    3131         1112 :             l.access_stats().record_residence_event(
    3132         1112 :                 LayerResidenceStatus::Resident,
    3133         1112 :                 LayerResidenceEventReason::LayerCreate,
    3134         1112 :             );
    3135              :         }
    3136         1435 :         guard.track_new_image_layers(image_layers);
    3137         1435 :         drop_wlock(guard);
    3138         1435 :         timer.stop_and_record();
    3139         1435 : 
    3140         1435 :         Ok(layer_paths_to_upload)
    3141         1435 :     }
    3142              : }
    3143              : 
    3144         1069 : #[derive(Default)]
    3145              : struct CompactLevel0Phase1Result {
    3146              :     new_layers: Vec<Arc<DeltaLayer>>,
    3147              :     deltas_to_compact: Vec<Arc<PersistentLayerDesc>>,
    3148              : }
    3149              : 
    3150              : /// Top-level failure to compact.
    3151            0 : #[derive(Debug)]
    3152              : enum CompactionError {
    3153              :     /// L0 compaction requires layers to be downloaded.
    3154              :     ///
    3155              :     /// This should not happen repeatedly, but will be retried once by top-level
    3156              :     /// `Timeline::compact`.
    3157              :     DownloadRequired(Vec<Arc<RemoteLayer>>),
    3158              :     /// The timeline or pageserver is shutting down
    3159              :     ShuttingDown,
    3160              :     /// Compaction cannot be done right now; page reconstruction and so on.
    3161              :     Other(anyhow::Error),
    3162              : }
    3163              : 
    3164              : impl From<anyhow::Error> for CompactionError {
    3165            1 :     fn from(value: anyhow::Error) -> Self {
    3166            1 :         CompactionError::Other(value)
    3167            1 :     }
    3168              : }
    3169              : 
    3170              : #[serde_as]
    3171         3990 : #[derive(serde::Serialize)]
    3172              : struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
    3173              : 
    3174         9786 : #[derive(Default)]
    3175              : enum DurationRecorder {
    3176              :     #[default]
    3177              :     NotStarted,
    3178              :     Recorded(RecordedDuration, tokio::time::Instant),
    3179              : }
    3180              : 
    3181              : impl DurationRecorder {
    3182         2844 :     pub fn till_now(&self) -> DurationRecorder {
    3183         2844 :         match self {
    3184              :             DurationRecorder::NotStarted => {
    3185            0 :                 panic!("must only call on recorded measurements")
    3186              :             }
    3187         2844 :             DurationRecorder::Recorded(_, ended) => {
    3188         2844 :                 let now = tokio::time::Instant::now();
    3189         2844 :                 DurationRecorder::Recorded(RecordedDuration(now - *ended), now)
    3190         2844 :             }
    3191         2844 :         }
    3192         2844 :     }
    3193         1995 :     pub fn into_recorded(self) -> Option<RecordedDuration> {
    3194         1995 :         match self {
    3195            0 :             DurationRecorder::NotStarted => None,
    3196         1995 :             DurationRecorder::Recorded(recorded, _) => Some(recorded),
    3197              :         }
    3198         1995 :     }
    3199              : }
    3200              : 
    3201         1398 : #[derive(Default)]
    3202              : struct CompactLevel0Phase1StatsBuilder {
    3203              :     version: Option<u64>,
    3204              :     tenant_id: Option<TenantId>,
    3205              :     timeline_id: Option<TimelineId>,
    3206              :     read_lock_acquisition_micros: DurationRecorder,
    3207              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    3208              :     read_lock_held_key_sort_micros: DurationRecorder,
    3209              :     read_lock_held_prerequisites_micros: DurationRecorder,
    3210              :     read_lock_held_compute_holes_micros: DurationRecorder,
    3211              :     read_lock_drop_micros: DurationRecorder,
    3212              :     write_layer_files_micros: DurationRecorder,
    3213              :     level0_deltas_count: Option<usize>,
    3214              :     new_deltas_count: Option<usize>,
    3215              :     new_deltas_size: Option<u64>,
    3216              : }
    3217              : 
    3218              : #[serde_as]
    3219          285 : #[derive(serde::Serialize)]
    3220              : struct CompactLevel0Phase1Stats {
    3221              :     version: u64,
    3222              :     #[serde_as(as = "serde_with::DisplayFromStr")]
    3223              :     tenant_id: TenantId,
    3224              :     #[serde_as(as = "serde_with::DisplayFromStr")]
    3225              :     timeline_id: TimelineId,
    3226              :     read_lock_acquisition_micros: RecordedDuration,
    3227              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    3228              :     read_lock_held_key_sort_micros: RecordedDuration,
    3229              :     read_lock_held_prerequisites_micros: RecordedDuration,
    3230              :     read_lock_held_compute_holes_micros: RecordedDuration,
    3231              :     read_lock_drop_micros: RecordedDuration,
    3232              :     write_layer_files_micros: RecordedDuration,
    3233              :     level0_deltas_count: usize,
    3234              :     new_deltas_count: usize,
    3235              :     new_deltas_size: u64,
    3236              : }
    3237              : 
    3238              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    3239              :     type Error = anyhow::Error;
    3240              : 
    3241          285 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    3242          285 :         Ok(Self {
    3243          285 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    3244          285 :             tenant_id: value
    3245          285 :                 .tenant_id
    3246          285 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    3247          285 :             timeline_id: value
    3248          285 :                 .timeline_id
    3249          285 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    3250          285 :             read_lock_acquisition_micros: value
    3251          285 :                 .read_lock_acquisition_micros
    3252          285 :                 .into_recorded()
    3253          285 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    3254          285 :             read_lock_held_spawn_blocking_startup_micros: value
    3255          285 :                 .read_lock_held_spawn_blocking_startup_micros
    3256          285 :                 .into_recorded()
    3257          285 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    3258          285 :             read_lock_held_key_sort_micros: value
    3259          285 :                 .read_lock_held_key_sort_micros
    3260          285 :                 .into_recorded()
    3261          285 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    3262          285 :             read_lock_held_prerequisites_micros: value
    3263          285 :                 .read_lock_held_prerequisites_micros
    3264          285 :                 .into_recorded()
    3265          285 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    3266          285 :             read_lock_held_compute_holes_micros: value
    3267          285 :                 .read_lock_held_compute_holes_micros
    3268          285 :                 .into_recorded()
    3269          285 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    3270          285 :             read_lock_drop_micros: value
    3271          285 :                 .read_lock_drop_micros
    3272          285 :                 .into_recorded()
    3273          285 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    3274          285 :             write_layer_files_micros: value
    3275          285 :                 .write_layer_files_micros
    3276          285 :                 .into_recorded()
    3277          285 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    3278          285 :             level0_deltas_count: value
    3279          285 :                 .level0_deltas_count
    3280          285 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    3281          285 :             new_deltas_count: value
    3282          285 :                 .new_deltas_count
    3283          285 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    3284          285 :             new_deltas_size: value
    3285          285 :                 .new_deltas_size
    3286          285 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    3287              :         })
    3288          285 :     }
    3289              : }
    3290              : 
    3291              : impl Timeline {
    3292              :     /// Level0 files first phase of compaction, explained in the [`compact_inner`] comment.
    3293              :     ///
    3294              :     /// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
    3295              :     /// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
    3296              :     /// start of level0 files compaction, the on-demand download should be revisited as well.
    3297              :     ///
    3298              :     /// [`compact_inner`]: Self::compact_inner
    3299         1398 :     async fn compact_level0_phase1(
    3300         1398 :         self: &Arc<Self>,
    3301         1398 :         _layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
    3302         1398 :         guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
    3303         1398 :         mut stats: CompactLevel0Phase1StatsBuilder,
    3304         1398 :         target_file_size: u64,
    3305         1398 :         ctx: &RequestContext,
    3306         1398 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
    3307         1398 :         stats.read_lock_held_spawn_blocking_startup_micros =
    3308         1398 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
    3309         1398 :         let layers = guard.layer_map();
    3310         1398 :         let level0_deltas = layers.get_level0_deltas()?;
    3311         1398 :         let mut level0_deltas = level0_deltas
    3312         1398 :             .into_iter()
    3313        10762 :             .map(|x| guard.get_from_desc(&x))
    3314         1398 :             .collect_vec();
    3315         1398 :         stats.level0_deltas_count = Some(level0_deltas.len());
    3316         1398 :         // Only compact if enough layers have accumulated.
    3317         1398 :         let threshold = self.get_compaction_threshold();
    3318         1398 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
    3319            0 :             debug!(
    3320            0 :                 level0_deltas = level0_deltas.len(),
    3321            0 :                 threshold, "too few deltas to compact"
    3322            0 :             );
    3323         1069 :             return Ok(CompactLevel0Phase1Result::default());
    3324          329 :         }
    3325          329 : 
    3326          329 :         // This failpoint is used together with `test_duplicate_layers` integration test.
    3327          329 :         // It returns the compaction result exactly the same layers as input to compaction.
    3328          329 :         // We want to ensure that this will not cause any problem when updating the layer map
    3329          329 :         // after the compaction is finished.
    3330          329 :         //
    3331          329 :         // Currently, there are two rare edge cases that will cause duplicated layers being
    3332          329 :         // inserted.
    3333          329 :         // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
    3334          329 :         //    is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
    3335          329 :         //    map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
    3336          329 :         //    point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
    3337          329 :         //    and this causes an overwrite. This is acceptable because the content is the same, and we should do a
    3338          329 :         //    layer replace instead of the normal remove / upload process.
    3339          329 :         // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
    3340          329 :         //    size length. Compaction will likely create the same set of n files afterwards.
    3341          329 :         //
    3342          329 :         // This failpoint is a superset of both of the cases.
    3343          329 :         fail_point!("compact-level0-phase1-return-same", |_| {
    3344           37 :             println!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
    3345           37 :             Ok(CompactLevel0Phase1Result {
    3346           37 :                 new_layers: level0_deltas
    3347           37 :                     .iter()
    3348         2839 :                     .map(|x| x.clone().downcast_delta_layer().unwrap())
    3349           37 :                     .collect(),
    3350           37 :                 deltas_to_compact: level0_deltas
    3351           37 :                     .iter()
    3352         2839 :                     .map(|x| x.layer_desc().clone().into())
    3353           37 :                     .collect(),
    3354           37 :             })
    3355          329 :         });
    3356              : 
    3357              :         // Gather the files to compact in this iteration.
    3358              :         //
    3359              :         // Start with the oldest Level 0 delta file, and collect any other
    3360              :         // level 0 files that form a contiguous sequence, such that the end
    3361              :         // LSN of previous file matches the start LSN of the next file.
    3362              :         //
    3363              :         // Note that if the files don't form such a sequence, we might
    3364              :         // "compact" just a single file. That's a bit pointless, but it allows
    3365              :         // us to get rid of the level 0 file, and compact the other files on
    3366              :         // the next iteration. This could probably made smarter, but such
    3367              :         // "gaps" in the sequence of level 0 files should only happen in case
    3368              :         // of a crash, partial download from cloud storage, or something like
    3369              :         // that, so it's not a big deal in practice.
    3370        14666 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
    3371          292 :         let mut level0_deltas_iter = level0_deltas.iter();
    3372          292 : 
    3373          292 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
    3374          292 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
    3375          292 :         let mut deltas_to_compact = vec![Arc::clone(first_level0_delta)];
    3376         4956 :         for l in level0_deltas_iter {
    3377         4664 :             let lsn_range = &l.layer_desc().lsn_range;
    3378         4664 : 
    3379         4664 :             if lsn_range.start != prev_lsn_end {
    3380            0 :                 break;
    3381         4664 :             }
    3382         4664 :             deltas_to_compact.push(Arc::clone(l));
    3383         4664 :             prev_lsn_end = lsn_range.end;
    3384              :         }
    3385          292 :         let lsn_range = Range {
    3386          292 :             start: deltas_to_compact
    3387          292 :                 .first()
    3388          292 :                 .unwrap()
    3389          292 :                 .layer_desc()
    3390          292 :                 .lsn_range
    3391          292 :                 .start,
    3392          292 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
    3393          292 :         };
    3394          292 : 
    3395          292 :         let remotes = deltas_to_compact
    3396          292 :             .iter()
    3397         4956 :             .filter(|l| l.is_remote_layer())
    3398          292 :             .inspect(|l| info!("compact requires download of {l}"))
    3399          292 :             .map(|l| {
    3400            3 :                 l.clone()
    3401            3 :                     .downcast_remote_layer()
    3402            3 :                     .expect("just checked it is remote layer")
    3403          292 :             })
    3404          292 :             .collect::<Vec<_>>();
    3405          292 : 
    3406          292 :         if !remotes.is_empty() {
    3407              :             // caller is holding the lock to layer_removal_cs, and we don't want to download while
    3408              :             // holding that; in future download_remote_layer might take it as well. this is
    3409              :             // regardless of earlier image creation downloading on-demand, while holding the lock.
    3410            1 :             return Err(CompactionError::DownloadRequired(remotes));
    3411          291 :         }
    3412              : 
    3413          291 :         info!(
    3414          291 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
    3415          291 :             lsn_range.start,
    3416          291 :             lsn_range.end,
    3417          291 :             deltas_to_compact.len(),
    3418          291 :             level0_deltas.len()
    3419          291 :         );
    3420              : 
    3421         4953 :         for l in deltas_to_compact.iter() {
    3422         4953 :             info!("compact includes {l}");
    3423              :         }
    3424              : 
    3425              :         // We don't need the original list of layers anymore. Drop it so that
    3426              :         // we don't accidentally use it later in the function.
    3427          291 :         drop(level0_deltas);
    3428          291 : 
    3429          291 :         stats.read_lock_held_prerequisites_micros = stats
    3430          291 :             .read_lock_held_spawn_blocking_startup_micros
    3431          291 :             .till_now();
    3432          291 : 
    3433          291 :         // Determine N largest holes where N is number of compacted layers.
    3434          291 :         let max_holes = deltas_to_compact.len();
    3435          291 :         let last_record_lsn = self.get_last_record_lsn();
    3436          291 :         let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
    3437          291 :         let min_hole_coverage_size = 3; // TODO: something more flexible?
    3438          291 : 
    3439          291 :         // min-heap (reserve space for one more element added before eviction)
    3440          291 :         let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
    3441          291 :         let mut prev: Option<Key> = None;
    3442          291 : 
    3443          291 :         let mut all_keys = Vec::new();
    3444          291 : 
    3445          291 :         let downcast_deltas: Vec<_> = deltas_to_compact
    3446          291 :             .iter()
    3447         4953 :             .map(|l| l.clone().downcast_delta_layer().expect("delta layer"))
    3448          291 :             .collect();
    3449         4953 :         for dl in downcast_deltas.iter() {
    3450              :             // TODO: replace this with an await once we fully go async
    3451         4953 :             all_keys.extend(DeltaLayer::load_keys(dl, ctx).await?);
    3452              :         }
    3453              : 
    3454              :         // The current stdlib sorting implementation is designed in a way where it is
    3455              :         // particularly fast where the slice is made up of sorted sub-ranges.
    3456    536602652 :         all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    3457          291 : 
    3458          291 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
    3459              : 
    3460     40589793 :         for DeltaEntry { key: next_key, .. } in all_keys.iter() {
    3461     40589793 :             let next_key = *next_key;
    3462     40589793 :             if let Some(prev_key) = prev {
    3463              :                 // just first fast filter
    3464     40589503 :                 if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
    3465       196839 :                     let key_range = prev_key..next_key;
    3466              :                     // Measuring hole by just subtraction of i128 representation of key range boundaries
    3467              :                     // has not so much sense, because largest holes will corresponds field1/field2 changes.
    3468              :                     // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
    3469              :                     // That is why it is better to measure size of hole as number of covering image layers.
    3470       196839 :                     let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
    3471       196839 :                     if coverage_size >= min_hole_coverage_size {
    3472           15 :                         heap.push(Hole {
    3473           15 :                             key_range,
    3474           15 :                             coverage_size,
    3475           15 :                         });
    3476           15 :                         if heap.len() > max_holes {
    3477            0 :                             heap.pop(); // remove smallest hole
    3478           15 :                         }
    3479       196824 :                     }
    3480     40392664 :                 }
    3481          290 :             }
    3482     40589793 :             prev = Some(next_key.next());
    3483              :         }
    3484          290 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
    3485          290 :         drop_rlock(guard);
    3486          290 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
    3487          290 :         let mut holes = heap.into_vec();
    3488          290 :         holes.sort_unstable_by_key(|hole| hole.key_range.start);
    3489          290 :         let mut next_hole = 0; // index of next hole in holes vector
    3490          290 : 
    3491          290 :         // This iterator walks through all key-value pairs from all the layers
    3492          290 :         // we're compacting, in key, LSN order.
    3493          290 :         let all_values_iter = all_keys.iter();
    3494          290 : 
    3495          290 :         // This iterator walks through all keys and is needed to calculate size used by each key
    3496          290 :         let mut all_keys_iter = all_keys
    3497          290 :             .iter()
    3498     32498816 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
    3499     32498526 :             .coalesce(|mut prev, cur| {
    3500     32498526 :                 // Coalesce keys that belong to the same key pair.
    3501     32498526 :                 // This ensures that compaction doesn't put them
    3502     32498526 :                 // into different layer files.
    3503     32498526 :                 // Still limit this by the target file size,
    3504     32498526 :                 // so that we keep the size of the files in
    3505     32498526 :                 // check.
    3506     32498526 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
    3507     30746597 :                     prev.2 += cur.2;
    3508     30746597 :                     Ok(prev)
    3509              :                 } else {
    3510      1751929 :                     Err((prev, cur))
    3511              :                 }
    3512     32498526 :             });
    3513          290 : 
    3514          290 :         // Merge the contents of all the input delta layers into a new set
    3515          290 :         // of delta layers, based on the current partitioning.
    3516          290 :         //
    3517          290 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
    3518          290 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
    3519          290 :         // would be too large. In that case, we also split on the LSN dimension.
    3520          290 :         //
    3521          290 :         // LSN
    3522          290 :         //  ^
    3523          290 :         //  |
    3524          290 :         //  | +-----------+            +--+--+--+--+
    3525          290 :         //  | |           |            |  |  |  |  |
    3526          290 :         //  | +-----------+            |  |  |  |  |
    3527          290 :         //  | |           |            |  |  |  |  |
    3528          290 :         //  | +-----------+     ==>    |  |  |  |  |
    3529          290 :         //  | |           |            |  |  |  |  |
    3530          290 :         //  | +-----------+            |  |  |  |  |
    3531          290 :         //  | |           |            |  |  |  |  |
    3532          290 :         //  | +-----------+            +--+--+--+--+
    3533          290 :         //  |
    3534          290 :         //  +--------------> key
    3535          290 :         //
    3536          290 :         //
    3537          290 :         // If one key (X) has a lot of page versions:
    3538          290 :         //
    3539          290 :         // LSN
    3540          290 :         //  ^
    3541          290 :         //  |                                 (X)
    3542          290 :         //  | +-----------+            +--+--+--+--+
    3543          290 :         //  | |           |            |  |  |  |  |
    3544          290 :         //  | +-----------+            |  |  +--+  |
    3545          290 :         //  | |           |            |  |  |  |  |
    3546          290 :         //  | +-----------+     ==>    |  |  |  |  |
    3547          290 :         //  | |           |            |  |  +--+  |
    3548          290 :         //  | +-----------+            |  |  |  |  |
    3549          290 :         //  | |           |            |  |  |  |  |
    3550          290 :         //  | +-----------+            +--+--+--+--+
    3551          290 :         //  |
    3552          290 :         //  +--------------> key
    3553          290 :         // TODO: this actually divides the layers into fixed-size chunks, not
    3554          290 :         // based on the partitioning.
    3555          290 :         //
    3556          290 :         // TODO: we should also opportunistically materialize and
    3557          290 :         // garbage collect what we can.
    3558          290 :         let mut new_layers = Vec::new();
    3559          290 :         let mut prev_key: Option<Key> = None;
    3560          290 :         let mut writer: Option<DeltaLayerWriter> = None;
    3561          290 :         let mut key_values_total_size = 0u64;
    3562          290 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
    3563          290 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
    3564              : 
    3565              :         for &DeltaEntry {
    3566     32493263 :             key, lsn, ref val, ..
    3567     32493548 :         } in all_values_iter
    3568              :         {
    3569     32493263 :             let value = val.load().await?;
    3570     32493259 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
    3571     32493259 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    3572     32493259 :             if !same_key || lsn == dup_end_lsn {
    3573      1752209 :                 let mut next_key_size = 0u64;
    3574      1752209 :                 let is_dup_layer = dup_end_lsn.is_valid();
    3575      1752209 :                 dup_start_lsn = Lsn::INVALID;
    3576      1752209 :                 if !same_key {
    3577      1752149 :                     dup_end_lsn = Lsn::INVALID;
    3578      1752149 :                 }
    3579              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    3580      1752214 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    3581      1752214 :                     next_key_size = next_size;
    3582      1752214 :                     if key != next_key {
    3583      1751864 :                         if dup_end_lsn.is_valid() {
    3584           23 :                             // We are writting segment with duplicates:
    3585           23 :                             // place all remaining values of this key in separate segment
    3586           23 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    3587           23 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    3588      1751841 :                         }
    3589      1751864 :                         break;
    3590          350 :                     }
    3591          350 :                     key_values_total_size += next_size;
    3592          350 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    3593          350 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    3594          350 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    3595              :                         // Split key between multiple layers: such layer can contain only single key
    3596           60 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    3597           37 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    3598              :                         } else {
    3599           23 :                             lsn // start with the first LSN for this key
    3600              :                         };
    3601           60 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    3602           60 :                         break;
    3603          290 :                     }
    3604              :                 }
    3605              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    3606      1752209 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    3607            0 :                     dup_start_lsn = dup_end_lsn;
    3608            0 :                     dup_end_lsn = lsn_range.end;
    3609      1752209 :                 }
    3610      1752209 :                 if writer.is_some() {
    3611      1751919 :                     let written_size = writer.as_mut().unwrap().size();
    3612      1751919 :                     let contains_hole =
    3613      1751919 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    3614              :                     // check if key cause layer overflow or contains hole...
    3615      1751919 :                     if is_dup_layer
    3616      1751836 :                         || dup_end_lsn.is_valid()
    3617      1751818 :                         || written_size + key_values_total_size > target_file_size
    3618      1743185 :                         || contains_hole
    3619              :                     {
    3620              :                         // ... if so, flush previous layer and prepare to write new one
    3621              :                         new_layers.push(Arc::new(
    3622         8747 :                             writer
    3623         8747 :                                 .take()
    3624         8747 :                                 .unwrap()
    3625         8747 :                                 .finish(prev_key.unwrap().next())
    3626            0 :                                 .await?,
    3627              :                         ));
    3628         8747 :                         writer = None;
    3629         8747 : 
    3630         8747 :                         if contains_hole {
    3631           15 :                             // skip hole
    3632           15 :                             next_hole += 1;
    3633         8732 :                         }
    3634      1743172 :                     }
    3635          290 :                 }
    3636              :                 // Remember size of key value because at next iteration we will access next item
    3637      1752209 :                 key_values_total_size = next_key_size;
    3638     30741049 :             }
    3639     32493258 :             if writer.is_none() {
    3640         9037 :                 // Create writer if not initiaized yet
    3641         9037 :                 writer = Some(
    3642              :                     DeltaLayerWriter::new(
    3643         9037 :                         self.conf,
    3644         9037 :                         self.timeline_id,
    3645         9037 :                         self.tenant_id,
    3646         9037 :                         key,
    3647         9037 :                         if dup_end_lsn.is_valid() {
    3648              :                             // this is a layer containing slice of values of the same key
    3649            0 :                             debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    3650           83 :                             dup_start_lsn..dup_end_lsn
    3651              :                         } else {
    3652            0 :                             debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    3653         8954 :                             lsn_range.clone()
    3654              :                         },
    3655              :                     )
    3656            0 :                     .await?,
    3657              :                 );
    3658     32484221 :             }
    3659              : 
    3660     32493258 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    3661            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    3662            0 :                     "failpoint delta-layer-writer-fail-before-finish"
    3663            0 :                 )))
    3664     32493258 :             });
    3665              : 
    3666     32493258 :             writer.as_mut().unwrap().put_value(key, lsn, value).await?;
    3667     32493258 :             prev_key = Some(key);
    3668              :         }
    3669          285 :         if let Some(writer) = writer {
    3670          285 :             new_layers.push(Arc::new(writer.finish(prev_key.unwrap().next()).await?));
    3671            0 :         }
    3672              : 
    3673              :         // Sync layers
    3674          285 :         if !new_layers.is_empty() {
    3675              :             // Print a warning if the created layer is larger than double the target size
    3676              :             // Add two pages for potential overhead. This should in theory be already
    3677              :             // accounted for in the target calculation, but for very small targets,
    3678              :             // we still might easily hit the limit otherwise.
    3679          285 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    3680         8738 :             for layer in new_layers.iter() {
    3681         8738 :                 if layer.layer_desc().file_size > warn_limit {
    3682            0 :                     warn!(
    3683            0 :                         %layer,
    3684            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    3685            0 :                     );
    3686         8738 :                 }
    3687              :             }
    3688         8738 :             let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
    3689          285 : 
    3690          285 :             // Fsync all the layer files and directory using multiple threads to
    3691          285 :             // minimize latency.
    3692          285 :             par_fsync::par_fsync(&layer_paths).context("fsync all new layers")?;
    3693              : 
    3694          285 :             par_fsync::par_fsync(&[self.conf.timeline_path(&self.tenant_id, &self.timeline_id)])
    3695          285 :                 .context("fsync of timeline dir")?;
    3696              : 
    3697          285 :             layer_paths.pop().unwrap();
    3698            0 :         }
    3699              : 
    3700          285 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    3701          285 :         stats.new_deltas_count = Some(new_layers.len());
    3702         8738 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    3703          285 : 
    3704          285 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    3705          285 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    3706              :         {
    3707          285 :             Ok(stats_json) => {
    3708          285 :                 info!(
    3709          285 :                     stats_json = stats_json.as_str(),
    3710          285 :                     "compact_level0_phase1 stats available"
    3711          285 :                 )
    3712              :             }
    3713            0 :             Err(e) => {
    3714            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    3715              :             }
    3716              :         }
    3717              : 
    3718          285 :         Ok(CompactLevel0Phase1Result {
    3719          285 :             new_layers,
    3720          285 :             deltas_to_compact: deltas_to_compact
    3721          285 :                 .into_iter()
    3722         4405 :                 .map(|x| Arc::new(x.layer_desc().clone()))
    3723          285 :                 .collect(),
    3724          285 :         })
    3725         1392 :     }
    3726              : 
    3727              :     ///
    3728              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
    3729              :     /// as Level 1 files.
    3730              :     ///
    3731         1398 :     async fn compact_level0(
    3732         1398 :         self: &Arc<Self>,
    3733         1398 :         layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
    3734         1398 :         target_file_size: u64,
    3735         1398 :         ctx: &RequestContext,
    3736         1398 :     ) -> Result<(), CompactionError> {
    3737              :         let CompactLevel0Phase1Result {
    3738         1391 :             new_layers,
    3739         1391 :             deltas_to_compact,
    3740              :         } = {
    3741         1398 :             let phase1_span = info_span!("compact_level0_phase1");
    3742         1398 :             let ctx = ctx.attached_child();
    3743         1398 :             let mut stats = CompactLevel0Phase1StatsBuilder {
    3744         1398 :                 version: Some(2),
    3745         1398 :                 tenant_id: Some(self.tenant_id),
    3746         1398 :                 timeline_id: Some(self.timeline_id),
    3747         1398 :                 ..Default::default()
    3748         1398 :             };
    3749         1398 : 
    3750         1398 :             let begin = tokio::time::Instant::now();
    3751         1398 :             let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
    3752         1398 :             let now = tokio::time::Instant::now();
    3753         1398 :             stats.read_lock_acquisition_micros =
    3754         1398 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
    3755         1398 :             let layer_removal_cs = layer_removal_cs.clone();
    3756         1398 :             self.compact_level0_phase1(
    3757         1398 :                 layer_removal_cs,
    3758         1398 :                 phase1_layers_locked,
    3759         1398 :                 stats,
    3760         1398 :                 target_file_size,
    3761         1398 :                 &ctx,
    3762         1398 :             )
    3763         1398 :             .instrument(phase1_span)
    3764       265395 :             .await?
    3765              :         };
    3766              : 
    3767         1391 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
    3768              :             // nothing to do
    3769         1069 :             return Ok(());
    3770          322 :         }
    3771              : 
    3772              :         // Before deleting any layers, we need to wait for their upload ops to finish.
    3773              :         // See remote_timeline_client module level comment on consistency.
    3774              :         // Do it here because we don't want to hold self.layers.write() while waiting.
    3775          322 :         if let Some(remote_client) = &self.remote_client {
    3776            0 :             debug!("waiting for upload ops to complete");
    3777          203 :             remote_client
    3778          203 :                 .wait_completion()
    3779           29 :                 .await
    3780          203 :                 .context("wait for layer upload ops to complete")?;
    3781          119 :         }
    3782              : 
    3783          321 :         let mut guard = self.layers.write().await;
    3784          321 :         let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
    3785          321 : 
    3786          321 :         // In some rare cases, we may generate a file with exactly the same key range / LSN as before the compaction.
    3787          321 :         // We should move to numbering the layer files instead of naming them using key range / LSN some day. But for
    3788          321 :         // now, we just skip the file to avoid unintentional modification to files on the disk and in the layer map.
    3789          321 :         let mut duplicated_layers = HashSet::new();
    3790          321 : 
    3791          321 :         let mut insert_layers = Vec::new();
    3792          321 :         let mut remove_layers = Vec::new();
    3793              : 
    3794        11362 :         for l in new_layers {
    3795        11041 :             let new_delta_path = l.path();
    3796              : 
    3797        11041 :             let metadata = new_delta_path.metadata().with_context(|| {
    3798            0 :                 format!(
    3799            0 :                     "read file metadata for new created layer {}",
    3800            0 :                     new_delta_path.display()
    3801            0 :                 )
    3802        11041 :             })?;
    3803              : 
    3804        11041 :             if let Some(remote_client) = &self.remote_client {
    3805         6879 :                 remote_client.schedule_layer_file_upload(
    3806         6879 :                     &l.filename(),
    3807         6879 :                     &LayerFileMetadata::new(metadata.len(), self.generation),
    3808         6879 :                 )?;
    3809         4162 :             }
    3810              : 
    3811              :             // update the timeline's physical size
    3812        11041 :             self.metrics
    3813        11041 :                 .resident_physical_size_gauge
    3814        11041 :                 .add(metadata.len());
    3815        11041 : 
    3816        11041 :             new_layer_paths.insert(
    3817        11041 :                 new_delta_path,
    3818        11041 :                 LayerFileMetadata::new(metadata.len(), self.generation),
    3819        11041 :             );
    3820        11041 :             l.access_stats().record_residence_event(
    3821        11041 :                 LayerResidenceStatus::Resident,
    3822        11041 :                 LayerResidenceEventReason::LayerCreate,
    3823        11041 :             );
    3824        11041 :             let l = l as Arc<dyn PersistentLayer>;
    3825        11041 :             if guard.contains(&l) {
    3826         2839 :                 duplicated_layers.insert(l.layer_desc().key());
    3827         2839 :             } else {
    3828         8202 :                 if LayerMap::is_l0(l.layer_desc()) {
    3829            0 :                     return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
    3830         8202 :                 }
    3831         8202 :                 insert_layers.push(l);
    3832              :             }
    3833              :         }
    3834              : 
    3835              :         // Now that we have reshuffled the data to set of new delta layers, we can
    3836              :         // delete the old ones
    3837          321 :         let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
    3838         7556 :         for ldesc in deltas_to_compact {
    3839         7235 :             if duplicated_layers.contains(&ldesc.key()) {
    3840              :                 // skip duplicated layers, they will not be removed; we have already overwritten them
    3841              :                 // with new layers in the compaction phase 1.
    3842         2839 :                 continue;
    3843         4396 :             }
    3844         4396 :             layer_names_to_delete.push(ldesc.filename());
    3845         4396 :             remove_layers.push(guard.get_from_desc(&ldesc));
    3846              :         }
    3847              : 
    3848          321 :         guard.finish_compact_l0(
    3849          321 :             layer_removal_cs,
    3850          321 :             remove_layers,
    3851          321 :             insert_layers,
    3852          321 :             &self.metrics,
    3853          321 :         )?;
    3854              : 
    3855          321 :         drop_wlock(guard);
    3856              : 
    3857              :         // Also schedule the deletions in remote storage
    3858          321 :         if let Some(remote_client) = &self.remote_client {
    3859          202 :             remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
    3860          119 :         }
    3861              : 
    3862          321 :         Ok(())
    3863         1392 :     }
    3864              : 
    3865              :     /// Update information about which layer files need to be retained on
    3866              :     /// garbage collection. This is separate from actually performing the GC,
    3867              :     /// and is updated more frequently, so that compaction can remove obsolete
    3868              :     /// page versions more aggressively.
    3869              :     ///
    3870              :     /// TODO: that's wishful thinking, compaction doesn't actually do that
    3871              :     /// currently.
    3872              :     ///
    3873              :     /// The caller specifies how much history is needed with the 3 arguments:
    3874              :     ///
    3875              :     /// retain_lsns: keep a version of each page at these LSNs
    3876              :     /// cutoff_horizon: also keep everything newer than this LSN
    3877              :     /// pitr: the time duration required to keep data for PITR
    3878              :     ///
    3879              :     /// The 'retain_lsns' list is currently used to prevent removing files that
    3880              :     /// are needed by child timelines. In the future, the user might be able to
    3881              :     /// name additional points in time to retain. The caller is responsible for
    3882              :     /// collecting that information.
    3883              :     ///
    3884              :     /// The 'cutoff_horizon' point is used to retain recent versions that might still be
    3885              :     /// needed by read-only nodes. (As of this writing, the caller just passes
    3886              :     /// the latest LSN subtracted by a constant, and doesn't do anything smart
    3887              :     /// to figure out what read-only nodes might actually need.)
    3888              :     ///
    3889              :     /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
    3890              :     /// whether a record is needed for PITR.
    3891              :     ///
    3892              :     /// NOTE: This function holds a short-lived lock to protect the 'gc_info'
    3893              :     /// field, so that the three values passed as argument are stored
    3894              :     /// atomically. But the caller is responsible for ensuring that no new
    3895              :     /// branches are created that would need to be included in 'retain_lsns',
    3896              :     /// for example. The caller should hold `Tenant::gc_cs` lock to ensure
    3897              :     /// that.
    3898              :     ///
    3899         2679 :     #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
    3900              :     pub(super) async fn update_gc_info(
    3901              :         &self,
    3902              :         retain_lsns: Vec<Lsn>,
    3903              :         cutoff_horizon: Lsn,
    3904              :         pitr: Duration,
    3905              :         ctx: &RequestContext,
    3906              :     ) -> anyhow::Result<()> {
    3907              :         // First, calculate pitr_cutoff_timestamp and then convert it to LSN.
    3908              :         //
    3909              :         // Some unit tests depend on garbage-collection working even when
    3910              :         // CLOG data is missing, so that find_lsn_for_timestamp() doesn't
    3911              :         // work, so avoid calling it altogether if time-based retention is not
    3912              :         // configured. It would be pointless anyway.
    3913              :         let pitr_cutoff = if pitr != Duration::ZERO {
    3914              :             let now = SystemTime::now();
    3915              :             if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
    3916              :                 let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
    3917              : 
    3918              :                 match self.find_lsn_for_timestamp(pitr_timestamp, ctx).await? {
    3919              :                     LsnForTimestamp::Present(lsn) => lsn,
    3920              :                     LsnForTimestamp::Future(lsn) => {
    3921              :                         // The timestamp is in the future. That sounds impossible,
    3922              :                         // but what it really means is that there hasn't been
    3923              :                         // any commits since the cutoff timestamp.
    3924            0 :                         debug!("future({})", lsn);
    3925              :                         cutoff_horizon
    3926              :                     }
    3927              :                     LsnForTimestamp::Past(lsn) => {
    3928            0 :                         debug!("past({})", lsn);
    3929              :                         // conservative, safe default is to remove nothing, when we
    3930              :                         // have no commit timestamp data available
    3931              :                         *self.get_latest_gc_cutoff_lsn()
    3932              :                     }
    3933              :                     LsnForTimestamp::NoData(lsn) => {
    3934            0 :                         debug!("nodata({})", lsn);
    3935              :                         // conservative, safe default is to remove nothing, when we
    3936              :                         // have no commit timestamp data available
    3937              :                         *self.get_latest_gc_cutoff_lsn()
    3938              :                     }
    3939              :                 }
    3940              :             } else {
    3941              :                 // If we don't have enough data to convert to LSN,
    3942              :                 // play safe and don't remove any layers.
    3943              :                 *self.get_latest_gc_cutoff_lsn()
    3944              :             }
    3945              :         } else {
    3946              :             // No time-based retention was configured. Set time-based cutoff to
    3947              :             // same as LSN based.
    3948              :             cutoff_horizon
    3949              :         };
    3950              : 
    3951              :         // Grab the lock and update the values
    3952              :         *self.gc_info.write().unwrap() = GcInfo {
    3953              :             retain_lsns,
    3954              :             horizon_cutoff: cutoff_horizon,
    3955              :             pitr_cutoff,
    3956              :         };
    3957              : 
    3958              :         Ok(())
    3959              :     }
    3960              : 
    3961              :     ///
    3962              :     /// Garbage collect layer files on a timeline that are no longer needed.
    3963              :     ///
    3964              :     /// Currently, we don't make any attempt at removing unneeded page versions
    3965              :     /// within a layer file. We can only remove the whole file if it's fully
    3966              :     /// obsolete.
    3967              :     ///
    3968          836 :     pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
    3969          836 :         let timer = self.metrics.garbage_collect_histo.start_timer();
    3970          836 : 
    3971          836 :         fail_point!("before-timeline-gc");
    3972              : 
    3973          836 :         let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
    3974              :         // Is the timeline being deleted?
    3975          836 :         if self.is_stopping() {
    3976            0 :             anyhow::bail!("timeline is Stopping");
    3977          836 :         }
    3978          836 : 
    3979          836 :         let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
    3980          836 :             let gc_info = self.gc_info.read().unwrap();
    3981          836 : 
    3982          836 :             let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
    3983          836 :             let pitr_cutoff = gc_info.pitr_cutoff;
    3984          836 :             let retain_lsns = gc_info.retain_lsns.clone();
    3985          836 :             (horizon_cutoff, pitr_cutoff, retain_lsns)
    3986          836 :         };
    3987          836 : 
    3988          836 :         let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
    3989              : 
    3990          836 :         let res = self
    3991          836 :             .gc_timeline(
    3992          836 :                 layer_removal_cs.clone(),
    3993          836 :                 horizon_cutoff,
    3994          836 :                 pitr_cutoff,
    3995          836 :                 retain_lsns,
    3996          836 :                 new_gc_cutoff,
    3997          836 :             )
    3998          836 :             .instrument(
    3999          836 :                 info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
    4000              :             )
    4001          261 :             .await?;
    4002              : 
    4003              :         // only record successes
    4004          831 :         timer.stop_and_record();
    4005          831 : 
    4006          831 :         Ok(res)
    4007          831 :     }
    4008              : 
    4009          836 :     async fn gc_timeline(
    4010          836 :         &self,
    4011          836 :         layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
    4012          836 :         horizon_cutoff: Lsn,
    4013          836 :         pitr_cutoff: Lsn,
    4014          836 :         retain_lsns: Vec<Lsn>,
    4015          836 :         new_gc_cutoff: Lsn,
    4016          836 :     ) -> anyhow::Result<GcResult> {
    4017          836 :         let now = SystemTime::now();
    4018          836 :         let mut result: GcResult = GcResult::default();
    4019          836 : 
    4020          836 :         // Nothing to GC. Return early.
    4021          836 :         let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
    4022          836 :         if latest_gc_cutoff >= new_gc_cutoff {
    4023          105 :             info!(
    4024          105 :                 "Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
    4025          105 :             );
    4026          105 :             return Ok(result);
    4027          731 :         }
    4028          731 : 
    4029          731 :         // We need to ensure that no one tries to read page versions or create
    4030          731 :         // branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
    4031          731 :         // for details. This will block until the old value is no longer in use.
    4032          731 :         //
    4033          731 :         // The GC cutoff should only ever move forwards.
    4034          731 :         {
    4035          731 :             let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
    4036          731 :             ensure!(
    4037          731 :                 *write_guard <= new_gc_cutoff,
    4038            0 :                 "Cannot move GC cutoff LSN backwards (was {}, new {})",
    4039            0 :                 *write_guard,
    4040              :                 new_gc_cutoff
    4041              :             );
    4042          731 :             write_guard.store_and_unlock(new_gc_cutoff).wait();
    4043              :         }
    4044              : 
    4045          731 :         info!("GC starting");
    4046              : 
    4047            0 :         debug!("retain_lsns: {:?}", retain_lsns);
    4048              : 
    4049              :         // Before deleting any layers, we need to wait for their upload ops to finish.
    4050              :         // See storage_sync module level comment on consistency.
    4051              :         // Do it here because we don't want to hold self.layers.write() while waiting.
    4052          731 :         if let Some(remote_client) = &self.remote_client {
    4053            0 :             debug!("waiting for upload ops to complete");
    4054          231 :             remote_client
    4055          231 :                 .wait_completion()
    4056          158 :                 .await
    4057          231 :                 .context("wait for layer upload ops to complete")?;
    4058          500 :         }
    4059              : 
    4060          731 :         let mut layers_to_remove = Vec::new();
    4061          731 :         let mut wanted_image_layers = KeySpaceRandomAccum::default();
    4062              : 
    4063              :         // Scan all layers in the timeline (remote or on-disk).
    4064              :         //
    4065              :         // Garbage collect the layer if all conditions are satisfied:
    4066              :         // 1. it is older than cutoff LSN;
    4067              :         // 2. it is older than PITR interval;
    4068              :         // 3. it doesn't need to be retained for 'retain_lsns';
    4069              :         // 4. newer on-disk image layers cover the layer's whole key range
    4070              :         //
    4071              :         // TODO holding a write lock is too agressive and avoidable
    4072          731 :         let mut guard = self.layers.write().await;
    4073          731 :         let layers = guard.layer_map();
    4074        15537 :         'outer: for l in layers.iter_historic_layers() {
    4075        15537 :             result.layers_total += 1;
    4076        15537 : 
    4077        15537 :             // 1. Is it newer than GC horizon cutoff point?
    4078        15537 :             if l.get_lsn_range().end > horizon_cutoff {
    4079            0 :                 debug!(
    4080            0 :                     "keeping {} because it's newer than horizon_cutoff {}",
    4081            0 :                     l.filename(),
    4082            0 :                     horizon_cutoff,
    4083            0 :                 );
    4084         1887 :                 result.layers_needed_by_cutoff += 1;
    4085         1887 :                 continue 'outer;
    4086        13650 :             }
    4087        13650 : 
    4088        13650 :             // 2. It is newer than PiTR cutoff point?
    4089        13650 :             if l.get_lsn_range().end > pitr_cutoff {
    4090            0 :                 debug!(
    4091            0 :                     "keeping {} because it's newer than pitr_cutoff {}",
    4092            0 :                     l.filename(),
    4093            0 :                     pitr_cutoff,
    4094            0 :                 );
    4095          152 :                 result.layers_needed_by_pitr += 1;
    4096          152 :                 continue 'outer;
    4097        13498 :             }
    4098              : 
    4099              :             // 3. Is it needed by a child branch?
    4100              :             // NOTE With that we would keep data that
    4101              :             // might be referenced by child branches forever.
    4102              :             // We can track this in child timeline GC and delete parent layers when
    4103              :             // they are no longer needed. This might be complicated with long inheritance chains.
    4104              :             //
    4105              :             // TODO Vec is not a great choice for `retain_lsns`
    4106        13542 :             for retain_lsn in &retain_lsns {
    4107              :                 // start_lsn is inclusive
    4108          537 :                 if &l.get_lsn_range().start <= retain_lsn {
    4109            0 :                     debug!(
    4110            0 :                         "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
    4111            0 :                         l.filename(),
    4112            0 :                         retain_lsn,
    4113            0 :                         l.is_incremental(),
    4114            0 :                     );
    4115          493 :                     result.layers_needed_by_branches += 1;
    4116          493 :                     continue 'outer;
    4117           44 :                 }
    4118              :             }
    4119              : 
    4120              :             // 4. Is there a later on-disk layer for this relation?
    4121              :             //
    4122              :             // The end-LSN is exclusive, while disk_consistent_lsn is
    4123              :             // inclusive. For example, if disk_consistent_lsn is 100, it is
    4124              :             // OK for a delta layer to have end LSN 101, but if the end LSN
    4125              :             // is 102, then it might not have been fully flushed to disk
    4126              :             // before crash.
    4127              :             //
    4128              :             // For example, imagine that the following layers exist:
    4129              :             //
    4130              :             // 1000      - image (A)
    4131              :             // 1000-2000 - delta (B)
    4132              :             // 2000      - image (C)
    4133              :             // 2000-3000 - delta (D)
    4134              :             // 3000      - image (E)
    4135              :             //
    4136              :             // If GC horizon is at 2500, we can remove layers A and B, but
    4137              :             // we cannot remove C, even though it's older than 2500, because
    4138              :             // the delta layer 2000-3000 depends on it.
    4139        13005 :             if !layers
    4140        13005 :                 .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))?
    4141              :             {
    4142            0 :                 debug!("keeping {} because it is the latest layer", l.filename());
    4143              :                 // Collect delta key ranges that need image layers to allow garbage
    4144              :                 // collecting the layers.
    4145              :                 // It is not so obvious whether we need to propagate information only about
    4146              :                 // delta layers. Image layers can form "stairs" preventing old image from been deleted.
    4147              :                 // But image layers are in any case less sparse than delta layers. Also we need some
    4148              :                 // protection from replacing recent image layers with new one after each GC iteration.
    4149        12272 :                 if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
    4150            0 :                     wanted_image_layers.add_range(l.get_key_range());
    4151        12272 :                 }
    4152        12272 :                 result.layers_not_updated += 1;
    4153        12272 :                 continue 'outer;
    4154          733 :             }
    4155              : 
    4156              :             // We didn't find any reason to keep this file, so remove it.
    4157            0 :             debug!(
    4158            0 :                 "garbage collecting {} is_dropped: xx is_incremental: {}",
    4159            0 :                 l.filename(),
    4160            0 :                 l.is_incremental(),
    4161            0 :             );
    4162          733 :             layers_to_remove.push(Arc::clone(&l));
    4163              :         }
    4164          731 :         self.wanted_image_layers
    4165          731 :             .lock()
    4166          731 :             .unwrap()
    4167          731 :             .replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
    4168          731 : 
    4169          731 :         if !layers_to_remove.is_empty() {
    4170              :             // Persist the new GC cutoff value in the metadata file, before
    4171              :             // we actually remove anything.
    4172           12 :             self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())
    4173            0 :                 .await?;
    4174              : 
    4175              :             // Actually delete the layers from disk and remove them from the map.
    4176              :             // (couldn't do this in the loop above, because you cannot modify a collection
    4177              :             // while iterating it. BTreeMap::retain() would be another option)
    4178           12 :             let mut layer_names_to_delete = Vec::with_capacity(layers_to_remove.len());
    4179           12 :             let gc_layers = layers_to_remove
    4180           12 :                 .iter()
    4181          733 :                 .map(|x| guard.get_from_desc(x))
    4182           12 :                 .collect();
    4183          745 :             for doomed_layer in layers_to_remove {
    4184          733 :                 layer_names_to_delete.push(doomed_layer.filename());
    4185          733 :                 result.layers_removed += 1;
    4186          733 :             }
    4187           12 :             let apply = guard.finish_gc_timeline(layer_removal_cs, gc_layers, &self.metrics)?;
    4188              : 
    4189           12 :             if result.layers_removed != 0 {
    4190           12 :                 fail_point!("after-timeline-gc-removed-layers");
    4191           12 :             }
    4192              : 
    4193           12 :             if let Some(remote_client) = &self.remote_client {
    4194           10 :                 remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
    4195            2 :             }
    4196              : 
    4197            7 :             apply.flush();
    4198          719 :         }
    4199              : 
    4200          726 :         info!(
    4201          726 :             "GC completed removing {} layers, cutoff {}",
    4202          726 :             result.layers_removed, new_gc_cutoff
    4203          726 :         );
    4204              : 
    4205          726 :         result.elapsed = now.elapsed()?;
    4206          726 :         Ok(result)
    4207          831 :     }
    4208              : 
    4209              :     ///
    4210              :     /// Reconstruct a value, using the given base image and WAL records in 'data'.
    4211              :     ///
    4212      7262202 :     async fn reconstruct_value(
    4213      7262202 :         &self,
    4214      7262202 :         key: Key,
    4215      7262202 :         request_lsn: Lsn,
    4216      7262202 :         mut data: ValueReconstructState,
    4217      7262205 :     ) -> Result<Bytes, PageReconstructError> {
    4218      7262205 :         // Perform WAL redo if needed
    4219      7262205 :         data.records.reverse();
    4220      7262205 : 
    4221      7262205 :         // If we have a page image, and no WAL, we're all set
    4222      7262205 :         if data.records.is_empty() {
    4223      4500032 :             if let Some((img_lsn, img)) = &data.img {
    4224            0 :                 trace!(
    4225            0 :                     "found page image for key {} at {}, no WAL redo required, req LSN {}",
    4226            0 :                     key,
    4227            0 :                     img_lsn,
    4228            0 :                     request_lsn,
    4229            0 :                 );
    4230      4500032 :                 Ok(img.clone())
    4231              :             } else {
    4232            0 :                 Err(PageReconstructError::from(anyhow!(
    4233            0 :                     "base image for {key} at {request_lsn} not found"
    4234            0 :                 )))
    4235              :             }
    4236              :         } else {
    4237              :             // We need to do WAL redo.
    4238              :             //
    4239              :             // If we don't have a base image, then the oldest WAL record better initialize
    4240              :             // the page
    4241      2762173 :             if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
    4242            0 :                 Err(PageReconstructError::from(anyhow!(
    4243            0 :                     "Base image for {} at {} not found, but got {} WAL records",
    4244            0 :                     key,
    4245            0 :                     request_lsn,
    4246            0 :                     data.records.len()
    4247            0 :                 )))
    4248              :             } else {
    4249      2762173 :                 if data.img.is_some() {
    4250            0 :                     trace!(
    4251            0 :                         "found {} WAL records and a base image for {} at {}, performing WAL redo",
    4252            0 :                         data.records.len(),
    4253            0 :                         key,
    4254            0 :                         request_lsn
    4255            0 :                     );
    4256              :                 } else {
    4257            0 :                     trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
    4258              :                 };
    4259              : 
    4260      2762173 :                 let last_rec_lsn = data.records.last().unwrap().0;
    4261              : 
    4262      2762173 :                 let img = match self
    4263      2762173 :                     .walredo_mgr
    4264      2762173 :                     .request_redo(key, request_lsn, data.img, data.records, self.pg_version)
    4265      2762173 :                     .context("Failed to reconstruct a page image:")
    4266              :                 {
    4267      2762173 :                     Ok(img) => img,
    4268            0 :                     Err(e) => return Err(PageReconstructError::from(e)),
    4269              :                 };
    4270              : 
    4271      2762173 :                 if img.len() == page_cache::PAGE_SZ {
    4272      2758635 :                     let cache = page_cache::get();
    4273      2758635 :                     if let Err(e) = cache
    4274      2758635 :                         .memorize_materialized_page(
    4275      2758635 :                             self.tenant_id,
    4276      2758635 :                             self.timeline_id,
    4277      2758635 :                             key,
    4278      2758635 :                             last_rec_lsn,
    4279      2758635 :                             &img,
    4280      2758635 :                         )
    4281          734 :                         .await
    4282      2758635 :                         .context("Materialized page memoization failed")
    4283              :                     {
    4284            0 :                         return Err(PageReconstructError::from(e));
    4285      2758635 :                     }
    4286         3538 :                 }
    4287              : 
    4288      2762173 :                 Ok(img)
    4289              :             }
    4290              :         }
    4291      7262205 :     }
    4292              : 
    4293              :     /// Download a layer file from remote storage and insert it into the layer map.
    4294              :     ///
    4295              :     /// It's safe to call this function for the same layer concurrently. In that case:
    4296              :     /// - If the layer has already been downloaded, `OK(...)` is returned.
    4297              :     /// - If the layer is currently being downloaded, we wait until that download succeeded / failed.
    4298              :     ///     - If it succeeded, we return `Ok(...)`.
    4299              :     ///     - If it failed, we or another concurrent caller will initiate a new download attempt.
    4300              :     ///
    4301              :     /// Download errors are classified and retried if appropriate by the underlying RemoteTimelineClient function.
    4302              :     /// It has an internal limit for the maximum number of retries and prints appropriate log messages.
    4303              :     /// If we exceed the limit, it returns an error, and this function passes it through.
    4304              :     /// The caller _could_ retry further by themselves by calling this function again, but _should not_ do it.
    4305              :     /// The reason is that they cannot distinguish permanent errors from temporary ones, whereas
    4306              :     /// the underlying RemoteTimelineClient can.
    4307              :     ///
    4308              :     /// There is no internal timeout or slowness detection.
    4309              :     /// If the caller has a deadline or needs a timeout, they can simply stop polling:
    4310              :     /// we're **cancellation-safe** because the download happens in a separate task_mgr task.
    4311              :     /// So, the current download attempt will run to completion even if we stop polling.
    4312         3189 :     #[instrument(skip_all, fields(layer=%remote_layer))]
    4313              :     pub async fn download_remote_layer(
    4314              :         &self,
    4315              :         remote_layer: Arc<RemoteLayer>,
    4316              :     ) -> anyhow::Result<()> {
    4317              :         span::debug_assert_current_span_has_tenant_and_timeline_id();
    4318              : 
    4319              :         use std::sync::atomic::Ordering::Relaxed;
    4320              : 
    4321              :         let permit = match Arc::clone(&remote_layer.ongoing_download)
    4322              :             .acquire_owned()
    4323              :             .await
    4324              :         {
    4325              :             Ok(permit) => permit,
    4326              :             Err(_closed) => {
    4327              :                 if remote_layer.download_replacement_failure.load(Relaxed) {
    4328              :                     // this path will be hit often, in case there are upper retries. however
    4329              :                     // hitting this error will prevent a busy loop between get_reconstruct_data and
    4330              :                     // download, so an error is prefered.
    4331              :                     //
    4332              :                     // TODO: we really should poison the timeline, but panicking is not yet
    4333              :                     // supported. Related: https://github.com/neondatabase/neon/issues/3621
    4334              :                     anyhow::bail!("an earlier download succeeded but LayerMap::replace failed")
    4335              :                 } else {
    4336           18 :                     info!("download of layer has already finished");
    4337              :                     return Ok(());
    4338              :                 }
    4339              :             }
    4340              :         };
    4341              : 
    4342              :         let (sender, receiver) = tokio::sync::oneshot::channel();
    4343              :         // Spawn a task so that download does not outlive timeline when we detach tenant / delete timeline.
    4344              :         let self_clone = self.myself.upgrade().expect("timeline is gone");
    4345              :         task_mgr::spawn(
    4346              :             &tokio::runtime::Handle::current(),
    4347              :             TaskKind::RemoteDownloadTask,
    4348              :             Some(self.tenant_id),
    4349              :             Some(self.timeline_id),
    4350              :             &format!("download layer {}", remote_layer),
    4351              :             false,
    4352         1043 :             async move {
    4353         1043 :                 let remote_client = self_clone.remote_client.as_ref().unwrap();
    4354              : 
    4355              :                 // Does retries + exponential back-off internally.
    4356              :                 // When this fails, don't layer further retry attempts here.
    4357         1043 :                 let result = remote_client
    4358         1043 :                     .download_layer_file(&remote_layer.filename(), &remote_layer.layer_metadata)
    4359       360525 :                     .await;
    4360              : 
    4361         1040 :                 if let Ok(size) = &result {
    4362         1008 :                     info!("layer file download finished");
    4363              : 
    4364              :                     // XXX the temp file is still around in Err() case
    4365              :                     // and consumes space until we clean up upon pageserver restart.
    4366         1008 :                     self_clone.metrics.resident_physical_size_gauge.add(*size);
    4367              : 
    4368              :                     // Download complete. Replace the RemoteLayer with the corresponding
    4369              :                     // Delta- or ImageLayer in the layer map.
    4370         1008 :                     let mut guard = self_clone.layers.write().await;
    4371         1008 :                     let new_layer =
    4372         1008 :                         remote_layer.create_downloaded_layer(&guard, self_clone.conf, *size);
    4373         1008 :                     {
    4374         1008 :                         let l: Arc<dyn PersistentLayer> = remote_layer.clone();
    4375         1008 :                         let failure = match guard.replace_and_verify(l, new_layer) {
    4376         1007 :                             Ok(()) => false,
    4377            1 :                             Err(e) => {
    4378            1 :                                 // this is a precondition failure, the layer filename derived
    4379            1 :                                 // attributes didn't match up, which doesn't seem likely.
    4380            1 :                                 error!("replacing downloaded layer into layermap failed: {e:#?}");
    4381            1 :                                 true
    4382              :                             }
    4383              :                         };
    4384              : 
    4385         1008 :                         if failure {
    4386            1 :                             // mark the remote layer permanently failed; the timeline is most
    4387            1 :                             // likely unusable after this. sadly we cannot just poison the layermap
    4388            1 :                             // lock with panic, because that would create an issue with shutdown.
    4389            1 :                             //
    4390            1 :                             // this does not change the retry semantics on failed downloads.
    4391            1 :                             //
    4392            1 :                             // use of Relaxed is valid because closing of the semaphore gives
    4393            1 :                             // happens-before and wakes up any waiters; we write this value before
    4394            1 :                             // and any waiters (or would be waiters) will load it after closing
    4395            1 :                             // semaphore.
    4396            1 :                             //
    4397            1 :                             // See: https://github.com/neondatabase/neon/issues/3533
    4398            1 :                             remote_layer
    4399            1 :                                 .download_replacement_failure
    4400            1 :                                 .store(true, Relaxed);
    4401         1007 :                         }
    4402              :                     }
    4403         1008 :                     drop_wlock(guard);
    4404         1008 : 
    4405         1008 :                     info!("on-demand download successful");
    4406              : 
    4407              :                     // Now that we've inserted the download into the layer map,
    4408              :                     // close the semaphore. This will make other waiters for
    4409              :                     // this download return Ok(()).
    4410         1008 :                     assert!(!remote_layer.ongoing_download.is_closed());
    4411         1008 :                     remote_layer.ongoing_download.close();
    4412              :                 } else {
    4413              :                     // Keep semaphore open. We'll drop the permit at the end of the function.
    4414           32 :                     error!(
    4415           32 :                         "layer file download failed: {:?}",
    4416           32 :                         result.as_ref().unwrap_err()
    4417           32 :                     );
    4418              :                 }
    4419              : 
    4420              :                 // Don't treat it as an error if the task that triggered the download
    4421              :                 // is no longer interested in the result.
    4422         1040 :                 sender.send(result.map(|_sz| ())).ok();
    4423         1040 : 
    4424         1040 :                 // In case we failed and there are other waiters, this will make one
    4425         1040 :                 // of them retry the download in a new task.
    4426         1040 :                 // XXX: This resets the exponential backoff because it's a new call to
    4427         1040 :                 // download_layer file.
    4428         1040 :                 drop(permit);
    4429         1040 : 
    4430         1040 :                 Ok(())
    4431         1040 :             }
    4432              :             .in_current_span(),
    4433              :         );
    4434              : 
    4435              :         receiver.await.context("download task cancelled")?
    4436              :     }
    4437              : 
    4438            3 :     pub async fn spawn_download_all_remote_layers(
    4439            3 :         self: Arc<Self>,
    4440            3 :         request: DownloadRemoteLayersTaskSpawnRequest,
    4441            3 :     ) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
    4442            3 :         let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
    4443            3 :         if let Some(st) = &*status_guard {
    4444            1 :             match &st.state {
    4445              :                 DownloadRemoteLayersTaskState::Running => {
    4446            0 :                     return Err(st.clone());
    4447              :                 }
    4448              :                 DownloadRemoteLayersTaskState::ShutDown
    4449            1 :                 | DownloadRemoteLayersTaskState::Completed => {
    4450            1 :                     *status_guard = None;
    4451            1 :                 }
    4452              :             }
    4453            2 :         }
    4454              : 
    4455            3 :         let self_clone = Arc::clone(&self);
    4456            3 :         let task_id = task_mgr::spawn(
    4457            3 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    4458            3 :             task_mgr::TaskKind::DownloadAllRemoteLayers,
    4459            3 :             Some(self.tenant_id),
    4460            3 :             Some(self.timeline_id),
    4461            3 :             "download all remote layers task",
    4462              :             false,
    4463            3 :             async move {
    4464           36 :                 self_clone.download_all_remote_layers(request).await;
    4465            3 :                 let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
    4466            3 :                  match &mut *status_guard {
    4467              :                     None => {
    4468            0 :                         warn!("tasks status is supposed to be Some(), since we are running");
    4469              :                     }
    4470            3 :                     Some(st) => {
    4471            3 :                         let exp_task_id = format!("{}", task_mgr::current_task_id().unwrap());
    4472            3 :                         if st.task_id != exp_task_id {
    4473            0 :                             warn!("task id changed while we were still running, expecting {} but have {}", exp_task_id, st.task_id);
    4474            3 :                         } else {
    4475            3 :                             st.state = DownloadRemoteLayersTaskState::Completed;
    4476            3 :                         }
    4477              :                     }
    4478              :                 };
    4479            3 :                 Ok(())
    4480            3 :             }
    4481            3 :             .instrument(info_span!(parent: None, "download_all_remote_layers", tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))
    4482              :         );
    4483              : 
    4484            3 :         let initial_info = DownloadRemoteLayersTaskInfo {
    4485            3 :             task_id: format!("{task_id}"),
    4486            3 :             state: DownloadRemoteLayersTaskState::Running,
    4487            3 :             total_layer_count: 0,
    4488            3 :             successful_download_count: 0,
    4489            3 :             failed_download_count: 0,
    4490            3 :         };
    4491            3 :         *status_guard = Some(initial_info.clone());
    4492            3 : 
    4493            3 :         Ok(initial_info)
    4494            3 :     }
    4495              : 
    4496            3 :     async fn download_all_remote_layers(
    4497            3 :         self: &Arc<Self>,
    4498            3 :         request: DownloadRemoteLayersTaskSpawnRequest,
    4499            3 :     ) {
    4500            3 :         let mut downloads = Vec::new();
    4501              :         {
    4502            3 :             let guard = self.layers.read().await;
    4503            3 :             let layers = guard.layer_map();
    4504            3 :             layers
    4505            3 :                 .iter_historic_layers()
    4506           64 :                 .map(|l| guard.get_from_desc(&l))
    4507           64 :                 .filter_map(|l| l.downcast_remote_layer())
    4508           60 :                 .map(|l| self.download_remote_layer(l))
    4509           60 :                 .for_each(|dl| downloads.push(dl))
    4510            3 :         }
    4511            3 :         let total_layer_count = downloads.len();
    4512            3 :         // limit download concurrency as specified in request
    4513            3 :         let downloads = futures::stream::iter(downloads);
    4514            3 :         let mut downloads = downloads.buffer_unordered(request.max_concurrent_downloads.get());
    4515            3 : 
    4516            3 :         macro_rules! lock_status {
    4517            3 :             ($st:ident) => {
    4518            3 :                 let mut st = self.download_all_remote_layers_task_info.write().unwrap();
    4519            3 :                 let st = st
    4520            3 :                     .as_mut()
    4521            3 :                     .expect("this function is only called after the task has been spawned");
    4522            3 :                 assert_eq!(
    4523            3 :                     st.task_id,
    4524            3 :                     format!(
    4525            3 :                         "{}",
    4526            3 :                         task_mgr::current_task_id().expect("we run inside a task_mgr task")
    4527            3 :                     )
    4528            3 :                 );
    4529            3 :                 let $st = st;
    4530            3 :             };
    4531            3 :         }
    4532            3 : 
    4533            3 :         {
    4534            3 :             lock_status!(st);
    4535            3 :             st.total_layer_count = total_layer_count as u64;
    4536              :         }
    4537           63 :         loop {
    4538           99 :             tokio::select! {
    4539           63 :                 dl = downloads.next() => {
    4540              :                     lock_status!(st);
    4541              :                     match dl {
    4542              :                         None => break,
    4543              :                         Some(Ok(())) => {
    4544              :                             st.successful_download_count += 1;
    4545              :                         },
    4546              :                         Some(Err(e)) => {
    4547           30 :                             error!(error = %e, "layer download failed");
    4548              :                             st.failed_download_count += 1;
    4549              :                         }
    4550              :                     }
    4551              :                 }
    4552              :                 _ = task_mgr::shutdown_watcher() => {
    4553              :                     // Kind of pointless to watch for shutdowns here,
    4554              :                     // as download_remote_layer spawns other task_mgr tasks internally.
    4555              :                     lock_status!(st);
    4556              :                     st.state = DownloadRemoteLayersTaskState::ShutDown;
    4557              :                 }
    4558           63 :             }
    4559           63 :         }
    4560              :         {
    4561            3 :             lock_status!(st);
    4562            3 :             st.state = DownloadRemoteLayersTaskState::Completed;
    4563            3 :         }
    4564            3 :     }
    4565              : 
    4566           21 :     pub fn get_download_all_remote_layers_task_info(&self) -> Option<DownloadRemoteLayersTaskInfo> {
    4567           21 :         self.download_all_remote_layers_task_info
    4568           21 :             .read()
    4569           21 :             .unwrap()
    4570           21 :             .clone()
    4571           21 :     }
    4572              : }
    4573              : 
    4574              : pub struct DiskUsageEvictionInfo {
    4575              :     /// Timeline's largest layer (remote or resident)
    4576              :     pub max_layer_size: Option<u64>,
    4577              :     /// Timeline's resident layers
    4578              :     pub resident_layers: Vec<LocalLayerInfoForDiskUsageEviction>,
    4579              : }
    4580              : 
    4581              : pub struct LocalLayerInfoForDiskUsageEviction {
    4582              :     pub layer: Arc<dyn PersistentLayer>,
    4583              :     pub last_activity_ts: SystemTime,
    4584              : }
    4585              : 
    4586              : impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction {
    4587            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    4588            0 :         // format the tv_sec, tv_nsec into rfc3339 in case someone is looking at it
    4589            0 :         // having to allocate a string to this is bad, but it will rarely be formatted
    4590            0 :         let ts = chrono::DateTime::<chrono::Utc>::from(self.last_activity_ts);
    4591            0 :         let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
    4592            0 :         f.debug_struct("LocalLayerInfoForDiskUsageEviction")
    4593            0 :             .field("layer", &self.layer)
    4594            0 :             .field("last_activity", &ts)
    4595            0 :             .finish()
    4596            0 :     }
    4597              : }
    4598              : 
    4599              : impl LocalLayerInfoForDiskUsageEviction {
    4600          250 :     pub fn file_size(&self) -> u64 {
    4601          250 :         self.layer.layer_desc().file_size
    4602          250 :     }
    4603              : }
    4604              : 
    4605              : impl Timeline {
    4606              :     /// Returns non-remote layers for eviction.
    4607           13 :     pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
    4608           13 :         let guard = self.layers.read().await;
    4609           13 :         let layers = guard.layer_map();
    4610           13 : 
    4611           13 :         let mut max_layer_size: Option<u64> = None;
    4612           13 :         let mut resident_layers = Vec::new();
    4613              : 
    4614          250 :         for l in layers.iter_historic_layers() {
    4615          250 :             let file_size = l.file_size();
    4616          250 :             max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
    4617          250 : 
    4618          250 :             let l = guard.get_from_desc(&l);
    4619          250 : 
    4620          250 :             if l.is_remote_layer() {
    4621            0 :                 continue;
    4622          250 :             }
    4623          250 : 
    4624          250 :             let last_activity_ts = l.access_stats().latest_activity().unwrap_or_else(|| {
    4625            0 :                 // We only use this fallback if there's an implementation error.
    4626            0 :                 // `latest_activity` already does rate-limited warn!() log.
    4627            0 :                 debug!(layer=%l, "last_activity returns None, using SystemTime::now");
    4628            0 :                 SystemTime::now()
    4629          250 :             });
    4630          250 : 
    4631          250 :             resident_layers.push(LocalLayerInfoForDiskUsageEviction {
    4632          250 :                 layer: l,
    4633          250 :                 last_activity_ts,
    4634          250 :             });
    4635              :         }
    4636              : 
    4637           13 :         DiskUsageEvictionInfo {
    4638           13 :             max_layer_size,
    4639           13 :             resident_layers,
    4640           13 :         }
    4641           13 :     }
    4642              : }
    4643              : 
    4644              : type TraversalPathItem = (
    4645              :     ValueReconstructResult,
    4646              :     Lsn,
    4647              :     Box<dyn Send + FnOnce() -> TraversalId>,
    4648              : );
    4649              : 
    4650              : /// Helper function for get_reconstruct_data() to add the path of layers traversed
    4651              : /// to an error, as anyhow context information.
    4652            2 : fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructError {
    4653            2 :     // We want the original 'msg' to be the outermost context. The outermost context
    4654            2 :     // is the most high-level information, which also gets propagated to the client.
    4655            2 :     let mut msg_iter = path
    4656            2 :         .into_iter()
    4657            4 :         .map(|(r, c, l)| {
    4658            4 :             format!(
    4659            4 :                 "layer traversal: result {:?}, cont_lsn {}, layer: {}",
    4660            4 :                 r,
    4661            4 :                 c,
    4662            4 :                 l(),
    4663            4 :             )
    4664            4 :         })
    4665            2 :         .chain(std::iter::once(msg));
    4666            2 :     // Construct initial message from the first traversed layer
    4667            2 :     let err = anyhow!(msg_iter.next().unwrap());
    4668            2 : 
    4669            2 :     // Append all subsequent traversals, and the error message 'msg', as contexts.
    4670            4 :     let msg = msg_iter.fold(err, |err, msg| err.context(msg));
    4671            2 :     PageReconstructError::from(msg)
    4672            2 : }
    4673              : 
    4674              : /// Various functions to mutate the timeline.
    4675              : // TODO Currently, Deref is used to allow easy access to read methods from this trait.
    4676              : // This is probably considered a bad practice in Rust and should be fixed eventually,
    4677              : // but will cause large code changes.
    4678              : pub struct TimelineWriter<'a> {
    4679              :     tl: &'a Timeline,
    4680              :     _write_guard: tokio::sync::MutexGuard<'a, ()>,
    4681              : }
    4682              : 
    4683              : impl Deref for TimelineWriter<'_> {
    4684              :     type Target = Timeline;
    4685              : 
    4686            0 :     fn deref(&self) -> &Self::Target {
    4687            0 :         self.tl
    4688            0 :     }
    4689              : }
    4690              : 
    4691              : impl<'a> TimelineWriter<'a> {
    4692              :     /// Put a new page version that can be constructed from a WAL record
    4693              :     ///
    4694              :     /// This will implicitly extend the relation, if the page is beyond the
    4695              :     /// current end-of-file.
    4696     82545991 :     pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
    4697     82546079 :         self.tl.put_value(key, lsn, value).await
    4698     82546068 :     }
    4699              : 
    4700        17794 :     pub async fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
    4701        17794 :         self.tl.put_tombstone(key_range, lsn).await
    4702        17794 :     }
    4703              : 
    4704              :     /// Track the end of the latest digested WAL record.
    4705              :     /// Remember the (end of) last valid WAL record remembered in the timeline.
    4706              :     ///
    4707              :     /// Call this after you have finished writing all the WAL up to 'lsn'.
    4708              :     ///
    4709              :     /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for
    4710              :     /// the 'lsn' or anything older. The previous last record LSN is stored alongside
    4711              :     /// the latest and can be read.
    4712     74268468 :     pub fn finish_write(&self, new_lsn: Lsn) {
    4713     74268468 :         self.tl.finish_write(new_lsn);
    4714     74268468 :     }
    4715              : 
    4716      1094354 :     pub fn update_current_logical_size(&self, delta: i64) {
    4717      1094354 :         self.tl.update_current_logical_size(delta)
    4718      1094354 :     }
    4719              : }
    4720              : 
    4721              : // We need TimelineWriter to be send in upcoming conversion of
    4722              : // Timeline::layers to tokio::sync::RwLock.
    4723            1 : #[test]
    4724            1 : fn is_send() {
    4725            1 :     fn _assert_send<T: Send>() {}
    4726            1 :     _assert_send::<TimelineWriter<'_>>();
    4727            1 : }
    4728              : 
    4729              : /// Add a suffix to a layer file's name: .{num}.old
    4730              : /// Uses the first available num (starts at 0)
    4731            2 : fn rename_to_backup(path: &Path) -> anyhow::Result<()> {
    4732            2 :     let filename = path
    4733            2 :         .file_name()
    4734            2 :         .ok_or_else(|| anyhow!("Path {} don't have a file name", path.display()))?
    4735            2 :         .to_string_lossy();
    4736            2 :     let mut new_path = path.to_owned();
    4737              : 
    4738            2 :     for i in 0u32.. {
    4739            2 :         new_path.set_file_name(format!("{filename}.{i}.old"));
    4740            2 :         if !new_path.exists() {
    4741            2 :             std::fs::rename(path, &new_path)
    4742            2 :                 .with_context(|| format!("rename {path:?} to {new_path:?}"))?;
    4743            2 :             return Ok(());
    4744            0 :         }
    4745              :     }
    4746              : 
    4747            0 :     bail!("couldn't find an unused backup number for {:?}", path)
    4748            2 : }
    4749              : 
    4750              : /// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
    4751              : ///
    4752              : /// Returns `true` if the two `Arc` point to the same layer, false otherwise.
    4753              : ///
    4754              : /// If comparing persistent layers, ALWAYS compare the layer descriptor key.
    4755              : #[inline(always)]
    4756         7732 : pub fn compare_arced_layers<L: ?Sized>(left: &Arc<L>, right: &Arc<L>) -> bool {
    4757         7732 :     // "dyn Trait" objects are "fat pointers" in that they have two components:
    4758         7732 :     // - pointer to the object
    4759         7732 :     // - pointer to the vtable
    4760         7732 :     //
    4761         7732 :     // rust does not provide a guarantee that these vtables are unique, but however
    4762         7732 :     // `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
    4763         7732 :     // pointer and the vtable need to be equal.
    4764         7732 :     //
    4765         7732 :     // See: https://github.com/rust-lang/rust/issues/103763
    4766         7732 :     //
    4767         7732 :     // A future version of rust will most likely use this form below, where we cast each
    4768         7732 :     // pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
    4769         7732 :     // not affect the comparison.
    4770         7732 :     //
    4771         7732 :     // See: https://github.com/rust-lang/rust/pull/106450
    4772         7732 :     let left = Arc::as_ptr(left) as *const ();
    4773         7732 :     let right = Arc::as_ptr(right) as *const ();
    4774         7732 : 
    4775         7732 :     left == right
    4776         7732 : }
    4777              : 
    4778              : #[cfg(test)]
    4779              : mod tests {
    4780              :     use std::sync::Arc;
    4781              : 
    4782              :     use utils::{id::TimelineId, lsn::Lsn};
    4783              : 
    4784              :     use crate::tenant::{harness::TenantHarness, storage_layer::PersistentLayer};
    4785              : 
    4786              :     use super::{EvictionError, Timeline};
    4787              : 
    4788            1 :     #[tokio::test]
    4789            1 :     async fn two_layer_eviction_attempts_at_the_same_time() {
    4790            1 :         let harness =
    4791            1 :             TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
    4792            1 : 
    4793            1 :         let ctx = any_context();
    4794            1 :         let tenant = harness.try_load(&ctx).await.unwrap();
    4795            1 :         let timeline = tenant
    4796            1 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
    4797            2 :             .await
    4798            1 :             .unwrap();
    4799            1 : 
    4800            1 :         let rc = timeline
    4801            1 :             .remote_client
    4802            1 :             .clone()
    4803            1 :             .expect("just configured this");
    4804              : 
    4805            1 :         let layer = find_some_layer(&timeline).await;
    4806              : 
    4807            1 :         let cancel = tokio_util::sync::CancellationToken::new();
    4808            1 :         let batch = [layer];
    4809            1 : 
    4810            1 :         let first = {
    4811            1 :             let cancel = cancel.clone();
    4812            1 :             async {
    4813            1 :                 timeline
    4814            1 :                     .evict_layer_batch(&rc, &batch, cancel)
    4815            0 :                     .await
    4816            1 :                     .unwrap()
    4817            1 :             }
    4818              :         };
    4819            1 :         let second = async {
    4820            1 :             timeline
    4821            1 :                 .evict_layer_batch(&rc, &batch, cancel)
    4822            0 :                 .await
    4823            1 :                 .unwrap()
    4824            1 :         };
    4825              : 
    4826            1 :         let (first, second) = tokio::join!(first, second);
    4827              : 
    4828            1 :         let (first, second) = (only_one(first), only_one(second));
    4829            1 : 
    4830            1 :         match (first, second) {
    4831              :             (Ok(()), Err(EvictionError::FileNotFound))
    4832            1 :             | (Err(EvictionError::FileNotFound), Ok(())) => {
    4833            1 :                 // one of the evictions gets to do it,
    4834            1 :                 // other one gets FileNotFound. all is good.
    4835            1 :             }
    4836            0 :             other => unreachable!("unexpected {:?}", other),
    4837              :         }
    4838              :     }
    4839              : 
    4840            1 :     #[tokio::test]
    4841            1 :     async fn layer_eviction_aba_fails() {
    4842            1 :         let harness = TenantHarness::create("layer_eviction_aba_fails").unwrap();
    4843            1 : 
    4844            1 :         let ctx = any_context();
    4845            1 :         let tenant = harness.try_load(&ctx).await.unwrap();
    4846            1 :         let timeline = tenant
    4847            1 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
    4848            2 :             .await
    4849            1 :             .unwrap();
    4850              : 
    4851            1 :         let _e = tracing::info_span!("foobar", tenant_id = %tenant.tenant_id, timeline_id = %timeline.timeline_id).entered();
    4852            1 : 
    4853            1 :         let rc = timeline.remote_client.clone().unwrap();
    4854              : 
    4855              :         // TenantHarness allows uploads to happen given GenericRemoteStorage is configured
    4856            1 :         let layer = find_some_layer(&timeline).await;
    4857              : 
    4858            1 :         let cancel = tokio_util::sync::CancellationToken::new();
    4859            1 :         let batch = [layer];
    4860            1 : 
    4861            1 :         let first = {
    4862            1 :             let cancel = cancel.clone();
    4863            1 :             async {
    4864            1 :                 timeline
    4865            1 :                     .evict_layer_batch(&rc, &batch, cancel)
    4866            0 :                     .await
    4867            1 :                     .unwrap()
    4868            1 :             }
    4869              :         };
    4870              : 
    4871              :         // lets imagine this is stuck somehow, still referencing the original `Arc<dyn PersistentLayer>`
    4872            1 :         let second = {
    4873            1 :             let cancel = cancel.clone();
    4874            1 :             async {
    4875            1 :                 timeline
    4876            1 :                     .evict_layer_batch(&rc, &batch, cancel)
    4877            0 :                     .await
    4878            1 :                     .unwrap()
    4879            1 :             }
    4880              :         };
    4881              : 
    4882              :         // while it's stuck, we evict and end up redownloading it
    4883            1 :         only_one(first.await).expect("eviction succeeded");
    4884              : 
    4885            1 :         let layer = find_some_layer(&timeline).await;
    4886            1 :         let layer = layer.downcast_remote_layer().unwrap();
    4887            1 :         timeline.download_remote_layer(layer).await.unwrap();
    4888              : 
    4889            1 :         let res = only_one(second.await);
    4890              : 
    4891            1 :         assert!(
    4892            1 :             matches!(res, Err(EvictionError::LayerNotFound(_))),
    4893            0 :             "{res:?}"
    4894              :         );
    4895              : 
    4896              :         // no more specific asserting, outside of preconds this is the only valid replacement
    4897              :         // failure
    4898              :     }
    4899              : 
    4900            2 :     fn any_context() -> crate::context::RequestContext {
    4901            2 :         use crate::context::*;
    4902            2 :         use crate::task_mgr::*;
    4903            2 :         RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
    4904            2 :     }
    4905              : 
    4906            4 :     fn only_one<T>(mut input: Vec<Option<T>>) -> T {
    4907            4 :         assert_eq!(1, input.len());
    4908            4 :         input
    4909            4 :             .pop()
    4910            4 :             .expect("length just checked")
    4911            4 :             .expect("no cancellation")
    4912            4 :     }
    4913              : 
    4914            3 :     async fn find_some_layer(timeline: &Timeline) -> Arc<dyn PersistentLayer> {
    4915            3 :         let layers = timeline.layers.read().await;
    4916            3 :         let desc = layers
    4917            3 :             .layer_map()
    4918            3 :             .iter_historic_layers()
    4919            3 :             .next()
    4920            3 :             .expect("must find one layer to evict");
    4921            3 : 
    4922            3 :         layers.get_from_desc(&desc)
    4923            3 :     }
    4924              : }
        

Generated by: LCOV version 2.1-beta