LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant - timeline.rs (source / functions) Coverage Total Hit LBC UBC GBC GIC CBC ECB
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 88.0 % 2995 2635 8 352 2 2633
Current Date: 2023-10-19 02:04:12 Functions: 70.5 % 376 265 1 110 1 264 1
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : pub mod delete;
       2                 : mod eviction_task;
       3                 : mod init;
       4                 : pub mod layer_manager;
       5                 : mod logical_size;
       6                 : pub mod span;
       7                 : pub mod uninit;
       8                 : mod walreceiver;
       9                 : 
      10                 : use anyhow::{anyhow, bail, ensure, Context, Result};
      11                 : use bytes::Bytes;
      12                 : use camino::{Utf8Path, Utf8PathBuf};
      13                 : use fail::fail_point;
      14                 : use futures::StreamExt;
      15                 : use itertools::Itertools;
      16                 : use pageserver_api::models::{
      17                 :     DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
      18                 :     DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus,
      19                 :     TimelineState,
      20                 : };
      21                 : use remote_storage::GenericRemoteStorage;
      22                 : use serde_with::serde_as;
      23                 : use storage_broker::BrokerClientChannel;
      24                 : use tokio::runtime::Handle;
      25                 : use tokio::sync::{oneshot, watch, TryAcquireError};
      26                 : use tokio_util::sync::CancellationToken;
      27                 : use tracing::*;
      28                 : use utils::id::TenantTimelineId;
      29                 : 
      30                 : use std::cmp::{max, min, Ordering};
      31                 : use std::collections::{BinaryHeap, HashMap, HashSet};
      32                 : use std::ops::{Deref, Range};
      33                 : use std::pin::pin;
      34                 : use std::sync::atomic::Ordering as AtomicOrdering;
      35                 : use std::sync::{Arc, Mutex, RwLock, Weak};
      36                 : use std::time::{Duration, Instant, SystemTime};
      37                 : 
      38                 : use crate::context::{
      39                 :     AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
      40                 : };
      41                 : use crate::deletion_queue::DeletionQueueClient;
      42                 : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
      43                 : use crate::tenant::storage_layer::delta_layer::DeltaEntry;
      44                 : use crate::tenant::storage_layer::{
      45                 :     DeltaLayerWriter, ImageLayerWriter, InMemoryLayer, LayerAccessStats, LayerFileName, RemoteLayer,
      46                 : };
      47                 : use crate::tenant::tasks::{BackgroundLoopKind, RateLimitError};
      48                 : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
      49                 : use crate::tenant::{
      50                 :     layer_map::{LayerMap, SearchResult},
      51                 :     metadata::{save_metadata, TimelineMetadata},
      52                 :     par_fsync,
      53                 :     storage_layer::{PersistentLayer, ValueReconstructResult, ValueReconstructState},
      54                 : };
      55                 : 
      56                 : use crate::config::PageServerConf;
      57                 : use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
      58                 : use crate::metrics::{
      59                 :     TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
      60                 :     UNEXPECTED_ONDEMAND_DOWNLOADS,
      61                 : };
      62                 : use crate::pgdatadir_mapping::LsnForTimestamp;
      63                 : use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
      64                 : use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
      65                 : use crate::tenant::config::{EvictionPolicy, TenantConfOpt};
      66                 : use pageserver_api::reltag::RelTag;
      67                 : 
      68                 : use postgres_connection::PgConnectionConfig;
      69                 : use postgres_ffi::to_pg_timestamp;
      70                 : use utils::{
      71                 :     completion,
      72                 :     generation::Generation,
      73                 :     id::{TenantId, TimelineId},
      74                 :     lsn::{AtomicLsn, Lsn, RecordLsn},
      75                 :     seqwait::SeqWait,
      76                 :     simple_rcu::{Rcu, RcuReadGuard},
      77                 : };
      78                 : 
      79                 : use crate::page_cache;
      80                 : use crate::repository::GcResult;
      81                 : use crate::repository::{Key, Value};
      82                 : use crate::task_mgr;
      83                 : use crate::task_mgr::TaskKind;
      84                 : use crate::ZERO_PAGE;
      85                 : 
      86                 : use self::delete::DeleteTimelineFlow;
      87                 : pub(super) use self::eviction_task::EvictionTaskTenantState;
      88                 : use self::eviction_task::EvictionTaskTimelineState;
      89                 : use self::layer_manager::LayerManager;
      90                 : use self::logical_size::LogicalSize;
      91                 : use self::walreceiver::{WalReceiver, WalReceiverConf};
      92                 : 
      93                 : use super::config::TenantConf;
      94                 : use super::remote_timeline_client::index::IndexPart;
      95                 : use super::remote_timeline_client::RemoteTimelineClient;
      96                 : use super::storage_layer::{
      97                 :     AsLayerDesc, DeltaLayer, ImageLayer, LayerAccessStatsReset, PersistentLayerDesc,
      98                 : };
      99                 : use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
     100                 : 
     101 CBC          59 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
     102                 : pub(super) enum FlushLoopState {
     103                 :     NotStarted,
     104                 :     Running {
     105                 :         #[cfg(test)]
     106                 :         expect_initdb_optimization: bool,
     107                 :         #[cfg(test)]
     108                 :         initdb_optimization_count: usize,
     109                 :     },
     110                 :     Exited,
     111                 : }
     112                 : 
     113                 : /// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
     114 UBC           0 : #[derive(Debug, Clone, PartialEq, Eq)]
     115                 : pub struct Hole {
     116                 :     key_range: Range<Key>,
     117                 :     coverage_size: usize,
     118                 : }
     119                 : 
     120                 : impl Ord for Hole {
     121 CBC          31 :     fn cmp(&self, other: &Self) -> Ordering {
     122              31 :         other.coverage_size.cmp(&self.coverage_size) // inverse order
     123              31 :     }
     124                 : }
     125                 : 
     126                 : impl PartialOrd for Hole {
     127              31 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     128              31 :         Some(self.cmp(other))
     129              31 :     }
     130                 : }
     131                 : 
     132                 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
     133                 : /// Can be removed after all refactors are done.
     134             306 : fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
     135             306 :     drop(rlock)
     136             306 : }
     137                 : 
     138                 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
     139                 : /// Can be removed after all refactors are done.
     140            3418 : fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
     141            3418 :     drop(rlock)
     142            3418 : }
     143                 : 
     144                 : /// The outward-facing resources required to build a Timeline
     145                 : pub struct TimelineResources {
     146                 :     pub remote_client: Option<RemoteTimelineClient>,
     147                 :     pub deletion_queue_client: DeletionQueueClient,
     148                 : }
     149                 : 
     150                 : pub struct Timeline {
     151                 :     conf: &'static PageServerConf,
     152                 :     tenant_conf: Arc<RwLock<AttachedTenantConf>>,
     153                 : 
     154                 :     myself: Weak<Self>,
     155                 : 
     156                 :     pub tenant_id: TenantId,
     157                 :     pub timeline_id: TimelineId,
     158                 : 
     159                 :     /// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
     160                 :     /// Never changes for the lifetime of this [`Timeline`] object.
     161                 :     ///
     162                 :     /// This duplicates the generation stored in LocationConf, but that structure is mutable:
     163                 :     /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
     164                 :     generation: Generation,
     165                 : 
     166                 :     pub pg_version: u32,
     167                 : 
     168                 :     /// The tuple has two elements.
     169                 :     /// 1. `LayerFileManager` keeps track of the various physical representations of the layer files (inmem, local, remote).
     170                 :     /// 2. `LayerMap`, the acceleration data structure for `get_reconstruct_data`.
     171                 :     ///
     172                 :     /// `LayerMap` maps out the `(PAGE,LSN) / (KEY,LSN)` space, which is composed of `(KeyRange, LsnRange)` rectangles.
     173                 :     /// We describe these rectangles through the `PersistentLayerDesc` struct.
     174                 :     ///
     175                 :     /// When we want to reconstruct a page, we first find the `PersistentLayerDesc`'s that we need for page reconstruction,
     176                 :     /// using `LayerMap`. Then, we use `LayerFileManager` to get the `PersistentLayer`'s that correspond to the
     177                 :     /// `PersistentLayerDesc`'s.
     178                 :     ///
     179                 :     /// Hence, it's important to keep things coherent. The `LayerFileManager` must always have an entry for all
     180                 :     /// `PersistentLayerDesc`'s in the `LayerMap`. If it doesn't, `LayerFileManager::get_from_desc` will panic at
     181                 :     /// runtime, e.g., during page reconstruction.
     182                 :     ///
     183                 :     /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
     184                 :     /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
     185                 :     pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
     186                 : 
     187                 :     /// Set of key ranges which should be covered by image layers to
     188                 :     /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
     189                 :     /// It is used by compaction task when it checks if new image layer should be created.
     190                 :     /// Newly created image layer doesn't help to remove the delta layer, until the
     191                 :     /// newly created image layer falls off the PITR horizon. So on next GC cycle,
     192                 :     /// gc_timeline may still want the new image layer to be created. To avoid redundant
     193                 :     /// image layers creation we should check if image layer exists but beyond PITR horizon.
     194                 :     /// This is why we need remember GC cutoff LSN.
     195                 :     ///
     196                 :     wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
     197                 : 
     198                 :     last_freeze_at: AtomicLsn,
     199                 :     // Atomic would be more appropriate here.
     200                 :     last_freeze_ts: RwLock<Instant>,
     201                 : 
     202                 :     // WAL redo manager
     203                 :     walredo_mgr: Arc<super::WalRedoManager>,
     204                 : 
     205                 :     /// Remote storage client.
     206                 :     /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
     207                 :     pub remote_client: Option<Arc<RemoteTimelineClient>>,
     208                 : 
     209                 :     // What page versions do we hold in the repository? If we get a
     210                 :     // request > last_record_lsn, we need to wait until we receive all
     211                 :     // the WAL up to the request. The SeqWait provides functions for
     212                 :     // that. TODO: If we get a request for an old LSN, such that the
     213                 :     // versions have already been garbage collected away, we should
     214                 :     // throw an error, but we don't track that currently.
     215                 :     //
     216                 :     // last_record_lsn.load().last points to the end of last processed WAL record.
     217                 :     //
     218                 :     // We also remember the starting point of the previous record in
     219                 :     // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
     220                 :     // first WAL record when the node is started up. But here, we just
     221                 :     // keep track of it.
     222                 :     last_record_lsn: SeqWait<RecordLsn, Lsn>,
     223                 : 
     224                 :     // All WAL records have been processed and stored durably on files on
     225                 :     // local disk, up to this LSN. On crash and restart, we need to re-process
     226                 :     // the WAL starting from this point.
     227                 :     //
     228                 :     // Some later WAL records might have been processed and also flushed to disk
     229                 :     // already, so don't be surprised to see some, but there's no guarantee on
     230                 :     // them yet.
     231                 :     disk_consistent_lsn: AtomicLsn,
     232                 : 
     233                 :     // Parent timeline that this timeline was branched from, and the LSN
     234                 :     // of the branch point.
     235                 :     ancestor_timeline: Option<Arc<Timeline>>,
     236                 :     ancestor_lsn: Lsn,
     237                 : 
     238                 :     pub(super) metrics: TimelineMetrics,
     239                 : 
     240                 :     /// Ensures layers aren't frozen by checkpointer between
     241                 :     /// [`Timeline::get_layer_for_write`] and layer reads.
     242                 :     /// Locked automatically by [`TimelineWriter`] and checkpointer.
     243                 :     /// Must always be acquired before the layer map/individual layer lock
     244                 :     /// to avoid deadlock.
     245                 :     write_lock: tokio::sync::Mutex<()>,
     246                 : 
     247                 :     /// Used to avoid multiple `flush_loop` tasks running
     248                 :     pub(super) flush_loop_state: Mutex<FlushLoopState>,
     249                 : 
     250                 :     /// layer_flush_start_tx can be used to wake up the layer-flushing task.
     251                 :     /// The value is a counter, incremented every time a new flush cycle is requested.
     252                 :     /// The flush cycle counter is sent back on the layer_flush_done channel when
     253                 :     /// the flush finishes. You can use that to wait for the flush to finish.
     254                 :     layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
     255                 :     /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
     256                 :     layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>,
     257                 : 
     258                 :     /// Layer removal lock.
     259                 :     /// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
     260                 :     /// This lock is acquired in [`Timeline::gc`] and [`Timeline::compact`].
     261                 :     /// This is an `Arc<Mutex>` lock because we need an owned
     262                 :     /// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
     263                 :     /// Note that [`DeleteTimelineFlow`] uses `delete_progress` field.
     264                 :     pub(super) layer_removal_cs: Arc<tokio::sync::Mutex<()>>,
     265                 : 
     266                 :     // Needed to ensure that we can't create a branch at a point that was already garbage collected
     267                 :     pub latest_gc_cutoff_lsn: Rcu<Lsn>,
     268                 : 
     269                 :     // List of child timelines and their branch points. This is needed to avoid
     270                 :     // garbage collecting data that is still needed by the child timelines.
     271                 :     pub gc_info: std::sync::RwLock<GcInfo>,
     272                 : 
     273                 :     // It may change across major versions so for simplicity
     274                 :     // keep it after running initdb for a timeline.
     275                 :     // It is needed in checks when we want to error on some operations
     276                 :     // when they are requested for pre-initdb lsn.
     277                 :     // It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn",
     278                 :     // though let's keep them both for better error visibility.
     279                 :     pub initdb_lsn: Lsn,
     280                 : 
     281                 :     /// When did we last calculate the partitioning?
     282                 :     partitioning: Mutex<(KeyPartitioning, Lsn)>,
     283                 : 
     284                 :     /// Configuration: how often should the partitioning be recalculated.
     285                 :     repartition_threshold: u64,
     286                 : 
     287                 :     /// Current logical size of the "datadir", at the last LSN.
     288                 :     current_logical_size: LogicalSize,
     289                 : 
     290                 :     /// Information about the last processed message by the WAL receiver,
     291                 :     /// or None if WAL receiver has not received anything for this timeline
     292                 :     /// yet.
     293                 :     pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
     294                 :     pub walreceiver: Mutex<Option<WalReceiver>>,
     295                 : 
     296                 :     /// Relation size cache
     297                 :     pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
     298                 : 
     299                 :     download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
     300                 : 
     301                 :     state: watch::Sender<TimelineState>,
     302                 : 
     303                 :     /// Prevent two tasks from deleting the timeline at the same time. If held, the
     304                 :     /// timeline is being deleted. If 'true', the timeline has already been deleted.
     305                 :     pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
     306                 : 
     307                 :     eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
     308                 : 
     309                 :     /// Barrier to wait before doing initial logical size calculation. Used only during startup.
     310                 :     initial_logical_size_can_start: Option<completion::Barrier>,
     311                 : 
     312                 :     /// Completion shared between all timelines loaded during startup; used to delay heavier
     313                 :     /// background tasks until some logical sizes have been calculated.
     314                 :     initial_logical_size_attempt: Mutex<Option<completion::Completion>>,
     315                 : 
     316                 :     /// Load or creation time information about the disk_consistent_lsn and when the loading
     317                 :     /// happened. Used for consumption metrics.
     318                 :     pub(crate) loaded_at: (Lsn, SystemTime),
     319                 : }
     320                 : 
     321                 : pub struct WalReceiverInfo {
     322                 :     pub wal_source_connconf: PgConnectionConfig,
     323                 :     pub last_received_msg_lsn: Lsn,
     324                 :     pub last_received_msg_ts: u128,
     325                 : }
     326                 : 
     327                 : ///
     328                 : /// Information about how much history needs to be retained, needed by
     329                 : /// Garbage Collection.
     330                 : ///
     331                 : pub struct GcInfo {
     332                 :     /// Specific LSNs that are needed.
     333                 :     ///
     334                 :     /// Currently, this includes all points where child branches have
     335                 :     /// been forked off from. In the future, could also include
     336                 :     /// explicit user-defined snapshot points.
     337                 :     pub retain_lsns: Vec<Lsn>,
     338                 : 
     339                 :     /// In addition to 'retain_lsns', keep everything newer than this
     340                 :     /// point.
     341                 :     ///
     342                 :     /// This is calculated by subtracting 'gc_horizon' setting from
     343                 :     /// last-record LSN
     344                 :     ///
     345                 :     /// FIXME: is this inclusive or exclusive?
     346                 :     pub horizon_cutoff: Lsn,
     347                 : 
     348                 :     /// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this
     349                 :     /// point.
     350                 :     ///
     351                 :     /// This is calculated by finding a number such that a record is needed for PITR
     352                 :     /// if only if its LSN is larger than 'pitr_cutoff'.
     353                 :     pub pitr_cutoff: Lsn,
     354                 : }
     355                 : 
     356                 : /// An error happened in a get() operation.
     357              26 : #[derive(thiserror::Error)]
     358                 : pub enum PageReconstructError {
     359                 :     #[error(transparent)]
     360                 :     Other(#[from] anyhow::Error),
     361                 : 
     362                 :     /// The operation would require downloading a layer that is missing locally.
     363                 :     NeedsDownload(TenantTimelineId, LayerFileName),
     364                 : 
     365                 :     /// The operation was cancelled
     366                 :     Cancelled,
     367                 : 
     368                 :     /// The ancestor of this is being stopped
     369                 :     AncestorStopping(TimelineId),
     370                 : 
     371                 :     /// An error happened replaying WAL records
     372                 :     #[error(transparent)]
     373                 :     WalRedo(anyhow::Error),
     374                 : }
     375                 : 
     376                 : impl std::fmt::Debug for PageReconstructError {
     377 UBC           0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
     378               0 :         match self {
     379               0 :             Self::Other(err) => err.fmt(f),
     380               0 :             Self::NeedsDownload(tenant_timeline_id, layer_file_name) => {
     381               0 :                 write!(
     382               0 :                     f,
     383               0 :                     "layer {}/{} needs download",
     384               0 :                     tenant_timeline_id,
     385               0 :                     layer_file_name.file_name()
     386               0 :                 )
     387                 :             }
     388               0 :             Self::Cancelled => write!(f, "cancelled"),
     389               0 :             Self::AncestorStopping(timeline_id) => {
     390               0 :                 write!(f, "ancestor timeline {timeline_id} is being stopped")
     391                 :             }
     392               0 :             Self::WalRedo(err) => err.fmt(f),
     393                 :         }
     394               0 :     }
     395                 : }
     396                 : 
     397                 : impl std::fmt::Display for PageReconstructError {
     398 CBC          20 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
     399              20 :         match self {
     400              20 :             Self::Other(err) => err.fmt(f),
     401 UBC           0 :             Self::NeedsDownload(tenant_timeline_id, layer_file_name) => {
     402               0 :                 write!(
     403               0 :                     f,
     404               0 :                     "layer {}/{} needs download",
     405               0 :                     tenant_timeline_id,
     406               0 :                     layer_file_name.file_name()
     407               0 :                 )
     408                 :             }
     409               0 :             Self::Cancelled => write!(f, "cancelled"),
     410               0 :             Self::AncestorStopping(timeline_id) => {
     411               0 :                 write!(f, "ancestor timeline {timeline_id} is being stopped")
     412                 :             }
     413               0 :             Self::WalRedo(err) => err.fmt(f),
     414                 :         }
     415 CBC          20 :     }
     416                 : }
     417                 : 
     418 UBC           0 : #[derive(Clone, Copy)]
     419                 : pub enum LogicalSizeCalculationCause {
     420                 :     Initial,
     421                 :     ConsumptionMetricsSyntheticSize,
     422                 :     EvictionTaskImitation,
     423                 :     TenantSizeHandler,
     424                 : }
     425                 : 
     426                 : /// Public interface functions
     427                 : impl Timeline {
     428                 :     /// Get the LSN where this branch was created
     429 CBC        3151 :     pub fn get_ancestor_lsn(&self) -> Lsn {
     430            3151 :         self.ancestor_lsn
     431            3151 :     }
     432                 : 
     433                 :     /// Get the ancestor's timeline id
     434            4946 :     pub fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
     435            4946 :         self.ancestor_timeline
     436            4946 :             .as_ref()
     437            4946 :             .map(|ancestor| ancestor.timeline_id)
     438            4946 :     }
     439                 : 
     440                 :     /// Lock and get timeline's GC cuttof
     441         3783797 :     pub fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
     442         3783797 :         self.latest_gc_cutoff_lsn.read()
     443         3783797 :     }
     444                 : 
     445                 :     /// Look up given page version.
     446                 :     ///
     447                 :     /// If a remote layer file is needed, it is downloaded as part of this
     448                 :     /// call.
     449                 :     ///
     450                 :     /// NOTE: It is considered an error to 'get' a key that doesn't exist. The
     451                 :     /// abstraction above this needs to store suitable metadata to track what
     452                 :     /// data exists with what keys, in separate metadata entries. If a
     453                 :     /// non-existent key is requested, we may incorrectly return a value from
     454                 :     /// an ancestor branch, for example, or waste a lot of cycles chasing the
     455                 :     /// non-existing key.
     456                 :     ///
     457         6372322 :     pub async fn get(
     458         6372322 :         &self,
     459         6372322 :         key: Key,
     460         6372322 :         lsn: Lsn,
     461         6372322 :         ctx: &RequestContext,
     462         6372323 :     ) -> Result<Bytes, PageReconstructError> {
     463         6372323 :         if !lsn.is_valid() {
     464 GBC           1 :             return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
     465 CBC     6372322 :         }
     466                 : 
     467                 :         // XXX: structured stats collection for layer eviction here.
     468 UBC           0 :         trace!(
     469               0 :             "get page request for {}@{} from task kind {:?}",
     470               0 :             key,
     471               0 :             lsn,
     472               0 :             ctx.task_kind()
     473               0 :         );
     474                 : 
     475                 :         // Check the page cache. We will get back the most recent page with lsn <= `lsn`.
     476                 :         // The cached image can be returned directly if there is no WAL between the cached image
     477                 :         // and requested LSN. The cached image can also be used to reduce the amount of WAL needed
     478                 :         // for redo.
     479 CBC     6372322 :         let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
     480         1297801 :             Some((cached_lsn, cached_img)) => {
     481         1297801 :                 match cached_lsn.cmp(&lsn) {
     482         1297785 :                     Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
     483                 :                     Ordering::Equal => {
     484              16 :                         MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
     485              16 :                         return Ok(cached_img); // exact LSN match, return the image
     486                 :                     }
     487                 :                     Ordering::Greater => {
     488 UBC           0 :                         unreachable!("the returned lsn should never be after the requested lsn")
     489                 :                     }
     490                 :                 }
     491 CBC     1297785 :                 Some((cached_lsn, cached_img))
     492                 :             }
     493         5074521 :             None => None,
     494                 :         };
     495                 : 
     496         6372306 :         let mut reconstruct_state = ValueReconstructState {
     497         6372306 :             records: Vec::new(),
     498         6372306 :             img: cached_page_img,
     499         6372306 :         };
     500         6372306 : 
     501         6372306 :         let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
     502         6372306 :         let path = self
     503         6372306 :             .get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
     504         6964680 :             .await?;
     505         6372249 :         timer.stop_and_record();
     506         6372249 : 
     507         6372249 :         let start = Instant::now();
     508         6372249 :         let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
     509         6372247 :         let elapsed = start.elapsed();
     510         6372247 :         crate::metrics::RECONSTRUCT_TIME
     511         6372247 :             .for_result(&res)
     512         6372247 :             .observe(elapsed.as_secs_f64());
     513                 : 
     514         6372247 :         if cfg!(feature = "testing") && res.is_err() {
     515                 :             // it can only be walredo issue
     516                 :             use std::fmt::Write;
     517                 : 
     518 UBC           0 :             let mut msg = String::new();
     519               0 : 
     520               0 :             path.into_iter().for_each(|(res, cont_lsn, layer)| {
     521               0 :                 writeln!(
     522               0 :                     msg,
     523               0 :                     "- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
     524               0 :                     layer(),
     525               0 :                 )
     526               0 :                 .expect("string grows")
     527               0 :             });
     528                 : 
     529                 :             // this is to rule out or provide evidence that we could in some cases read a duplicate
     530                 :             // walrecord
     531               0 :             tracing::info!("walredo failed, path:\n{msg}");
     532 CBC     6372247 :         }
     533                 : 
     534         6372247 :         res
     535         6372286 :     }
     536                 : 
     537                 :     /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
     538        82259045 :     pub fn get_last_record_lsn(&self) -> Lsn {
     539        82259045 :         self.last_record_lsn.load().last
     540        82259045 :     }
     541                 : 
     542            2671 :     pub fn get_prev_record_lsn(&self) -> Lsn {
     543            2671 :         self.last_record_lsn.load().prev
     544            2671 :     }
     545                 : 
     546                 :     /// Atomically get both last and prev.
     547             999 :     pub fn get_last_record_rlsn(&self) -> RecordLsn {
     548             999 :         self.last_record_lsn.load()
     549             999 :     }
     550                 : 
     551          781547 :     pub fn get_disk_consistent_lsn(&self) -> Lsn {
     552          781547 :         self.disk_consistent_lsn.load()
     553          781547 :     }
     554                 : 
     555                 :     /// remote_consistent_lsn from the perspective of the tenant's current generation,
     556                 :     /// not validated with control plane yet.
     557                 :     /// See [`Self::get_remote_consistent_lsn_visible`].
     558                 :     pub fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
     559            2671 :         if let Some(remote_client) = &self.remote_client {
     560            2671 :             remote_client.remote_consistent_lsn_projected()
     561                 :         } else {
     562 UBC           0 :             None
     563                 :         }
     564 CBC        2671 :     }
     565                 : 
     566                 :     /// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
     567                 :     /// i.e. a value of remote_consistent_lsn_projected which has undergone
     568                 :     /// generation validation in the deletion queue.
     569                 :     pub fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
     570          779573 :         if let Some(remote_client) = &self.remote_client {
     571          779573 :             remote_client.remote_consistent_lsn_visible()
     572                 :         } else {
     573 UBC           0 :             None
     574                 :         }
     575 CBC      779573 :     }
     576                 : 
     577                 :     /// The sum of the file size of all historic layers in the layer map.
     578                 :     /// This method makes no distinction between local and remote layers.
     579                 :     /// Hence, the result **does not represent local filesystem usage**.
     580            3137 :     pub async fn layer_size_sum(&self) -> u64 {
     581            3137 :         let guard = self.layers.read().await;
     582            3137 :         let layer_map = guard.layer_map();
     583            3137 :         let mut size = 0;
     584           94649 :         for l in layer_map.iter_historic_layers() {
     585           94649 :             size += l.file_size();
     586           94649 :         }
     587            3137 :         size
     588            3137 :     }
     589                 : 
     590              12 :     pub fn resident_physical_size(&self) -> u64 {
     591              12 :         self.metrics.resident_physical_size_get()
     592              12 :     }
     593                 : 
     594                 :     ///
     595                 :     /// Wait until WAL has been received and processed up to this LSN.
     596                 :     ///
     597                 :     /// You should call this before any of the other get_* or list_* functions. Calling
     598                 :     /// those functions with an LSN that has been processed yet is an error.
     599                 :     ///
     600         1278620 :     pub async fn wait_lsn(
     601         1278620 :         &self,
     602         1278620 :         lsn: Lsn,
     603         1278620 :         _ctx: &RequestContext, /* Prepare for use by cancellation */
     604         1278620 :     ) -> anyhow::Result<()> {
     605         1278620 :         anyhow::ensure!(self.is_active(), "Cannot wait for Lsn on inactive timeline");
     606                 : 
     607                 :         // This should never be called from the WAL receiver, because that could lead
     608                 :         // to a deadlock.
     609         1278620 :         anyhow::ensure!(
     610         1278620 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
     611 UBC           0 :             "wait_lsn cannot be called in WAL receiver"
     612                 :         );
     613 CBC     1278620 :         anyhow::ensure!(
     614         1278620 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
     615 UBC           0 :             "wait_lsn cannot be called in WAL receiver"
     616                 :         );
     617 CBC     1278620 :         anyhow::ensure!(
     618         1278620 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
     619 UBC           0 :             "wait_lsn cannot be called in WAL receiver"
     620                 :         );
     621                 : 
     622 CBC     1278620 :         let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
     623         1278620 : 
     624         1278620 :         match self
     625         1278620 :             .last_record_lsn
     626         1278620 :             .wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
     627          145151 :             .await
     628                 :         {
     629         1278615 :             Ok(()) => Ok(()),
     630               4 :             Err(e) => {
     631               4 :                 // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
     632               4 :                 drop(_timer);
     633               4 :                 let walreceiver_status = self.walreceiver_status();
     634               4 :                 Err(anyhow::Error::new(e).context({
     635               4 :                     format!(
     636               4 :                         "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
     637               4 :                         lsn,
     638               4 :                         self.get_last_record_lsn(),
     639               4 :                         self.get_disk_consistent_lsn(),
     640               4 :                         walreceiver_status,
     641               4 :                     )
     642               4 :                 }))
     643                 :             }
     644                 :         }
     645         1278619 :     }
     646                 : 
     647            2675 :     pub(crate) fn walreceiver_status(&self) -> String {
     648            2675 :         match &*self.walreceiver.lock().unwrap() {
     649             162 :             None => "stopping or stopped".to_string(),
     650            2513 :             Some(walreceiver) => match walreceiver.status() {
     651            1316 :                 Some(status) => status.to_human_readable_string(),
     652            1197 :                 None => "Not active".to_string(),
     653                 :             },
     654                 :         }
     655            2675 :     }
     656                 : 
     657                 :     /// Check that it is valid to request operations with that lsn.
     658             591 :     pub fn check_lsn_is_in_scope(
     659             591 :         &self,
     660             591 :         lsn: Lsn,
     661             591 :         latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
     662             591 :     ) -> anyhow::Result<()> {
     663             591 :         ensure!(
     664             591 :             lsn >= **latest_gc_cutoff_lsn,
     665               9 :             "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
     666               9 :             lsn,
     667               9 :             **latest_gc_cutoff_lsn,
     668                 :         );
     669             582 :         Ok(())
     670             591 :     }
     671                 : 
     672                 :     /// Flush to disk all data that was written with the put_* functions
     673            6368 :     #[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
     674                 :     pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
     675                 :         self.freeze_inmem_layer(false).await;
     676                 :         self.flush_frozen_layers_and_wait().await
     677                 :     }
     678                 : 
     679                 :     /// Outermost timeline compaction operation; downloads needed layers.
     680            1262 :     pub async fn compact(
     681            1262 :         self: &Arc<Self>,
     682            1262 :         cancel: &CancellationToken,
     683            1262 :         ctx: &RequestContext,
     684            1262 :     ) -> anyhow::Result<()> {
     685                 :         const ROUNDS: usize = 2;
     686                 : 
     687                 :         // this wait probably never needs any "long time spent" logging, because we already nag if
     688                 :         // compaction task goes over it's period (20s) which is quite often in production.
     689            1262 :         let _permit = match super::tasks::concurrent_background_tasks_rate_limit(
     690            1262 :             BackgroundLoopKind::Compaction,
     691            1262 :             ctx,
     692            1262 :             cancel,
     693            1262 :         )
     694 GBC           1 :         .await
     695                 :         {
     696 CBC        1262 :             Ok(permit) => permit,
     697 UBC           0 :             Err(RateLimitError::Cancelled) => return Ok(()),
     698                 :         };
     699                 : 
     700 CBC        1262 :         let last_record_lsn = self.get_last_record_lsn();
     701            1262 : 
     702            1262 :         // Last record Lsn could be zero in case the timeline was just created
     703            1262 :         if !last_record_lsn.is_valid() {
     704 UBC           0 :             warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
     705               0 :             return Ok(());
     706 CBC        1262 :         }
     707                 : 
     708                 :         // retry two times to allow first round to find layers which need to be downloaded, then
     709                 :         // download them, then retry compaction
     710            1263 :         for round in 0..ROUNDS {
     711                 :             // should we error out with the most specific error?
     712            1263 :             let last_round = round == ROUNDS - 1;
     713                 : 
     714         1498331 :             let res = self.compact_inner(ctx).await;
     715                 : 
     716                 :             // If `create_image_layers' or `compact_level0` scheduled any
     717                 :             // uploads or deletions, but didn't update the index file yet,
     718                 :             // do it now.
     719                 :             //
     720                 :             // This isn't necessary for correctness, the remote state is
     721                 :             // consistent without the uploads and deletions, and we would
     722                 :             // update the index file on next flush iteration too. But it
     723                 :             // could take a while until that happens.
     724                 :             //
     725                 :             // Additionally, only do this once before we return from this function.
     726            1254 :             if last_round || res.is_ok() {
     727            1252 :                 if let Some(remote_client) = &self.remote_client {
     728            1252 :                     remote_client.schedule_index_upload_for_file_changes()?;
     729 UBC           0 :                 }
     730 CBC           2 :             }
     731                 : 
     732               1 :             let rls = match res {
     733            1252 :                 Ok(()) => return Ok(()),
     734               1 :                 Err(CompactionError::DownloadRequired(rls)) if !last_round => {
     735               1 :                     // this can be done at most one time before exiting, waiting
     736               1 :                     rls
     737                 :                 }
     738 UBC           0 :                 Err(CompactionError::DownloadRequired(rls)) => {
     739               0 :                     anyhow::bail!("Compaction requires downloading multiple times (last was {} layers), possibly battling against eviction", rls.len())
     740                 :                 }
     741                 :                 Err(CompactionError::ShuttingDown) => {
     742               0 :                     return Ok(());
     743                 :                 }
     744 CBC           1 :                 Err(CompactionError::Other(e)) => {
     745               1 :                     return Err(e);
     746                 :                 }
     747                 :             };
     748                 : 
     749                 :             // this path can be visited in the second round of retrying, if first one found that we
     750                 :             // must first download some remote layers
     751               1 :             let total = rls.len();
     752               1 : 
     753               1 :             let mut downloads = rls
     754               1 :                 .into_iter()
     755               3 :                 .map(|rl| self.download_remote_layer(rl))
     756               1 :                 .collect::<futures::stream::FuturesUnordered<_>>();
     757               1 : 
     758               1 :             let mut failed = 0;
     759                 : 
     760                 :             loop {
     761               4 :                 tokio::select! {
     762                 :                     _ = cancel.cancelled() => anyhow::bail!("Cancelled while downloading remote layers"),
     763               4 :                     res = downloads.next() => {
     764                 :                         match res {
     765                 :                             Some(Ok(())) => {},
     766                 :                             Some(Err(e)) => {
     767 UBC           0 :                                 warn!("Downloading remote layer for compaction failed: {e:#}");
     768                 :                                 failed += 1;
     769                 :                             }
     770                 :                             None => break,
     771                 :                         }
     772                 :                     }
     773                 :                 }
     774                 :             }
     775                 : 
     776 CBC           1 :             if failed != 0 {
     777 UBC           0 :                 anyhow::bail!("{failed} out of {total} layers failed to download, retrying later");
     778 CBC           1 :             }
     779                 : 
     780                 :             // if everything downloaded fine, lets try again
     781                 :         }
     782                 : 
     783 UBC           0 :         unreachable!("retry loop exits")
     784 CBC        1253 :     }
     785                 : 
     786                 :     /// Compaction which might need to be retried after downloading remote layers.
     787            1263 :     async fn compact_inner(self: &Arc<Self>, ctx: &RequestContext) -> Result<(), CompactionError> {
     788                 :         //
     789                 :         // High level strategy for compaction / image creation:
     790                 :         //
     791                 :         // 1. First, calculate the desired "partitioning" of the
     792                 :         // currently in-use key space. The goal is to partition the
     793                 :         // key space into roughly fixed-size chunks, but also take into
     794                 :         // account any existing image layers, and try to align the
     795                 :         // chunk boundaries with the existing image layers to avoid
     796                 :         // too much churn. Also try to align chunk boundaries with
     797                 :         // relation boundaries.  In principle, we don't know about
     798                 :         // relation boundaries here, we just deal with key-value
     799                 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
     800                 :         // map relations into key-value pairs. But in practice we know
     801                 :         // that 'field6' is the block number, and the fields 1-5
     802                 :         // identify a relation. This is just an optimization,
     803                 :         // though.
     804                 :         //
     805                 :         // 2. Once we know the partitioning, for each partition,
     806                 :         // decide if it's time to create a new image layer. The
     807                 :         // criteria is: there has been too much "churn" since the last
     808                 :         // image layer? The "churn" is fuzzy concept, it's a
     809                 :         // combination of too many delta files, or too much WAL in
     810                 :         // total in the delta file. Or perhaps: if creating an image
     811                 :         // file would allow to delete some older files.
     812                 :         //
     813                 :         // 3. After that, we compact all level0 delta files if there
     814                 :         // are too many of them.  While compacting, we also garbage
     815                 :         // collect any page versions that are no longer needed because
     816                 :         // of the new image layers we created in step 2.
     817                 :         //
     818                 :         // TODO: This high level strategy hasn't been implemented yet.
     819                 :         // Below are functions compact_level0() and create_image_layers()
     820                 :         // but they are a bit ad hoc and don't quite work like it's explained
     821                 :         // above. Rewrite it.
     822            1263 :         let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
     823                 :         // Is the timeline being deleted?
     824            1263 :         if self.is_stopping() {
     825 UBC           0 :             trace!("Dropping out of compaction on timeline shutdown");
     826               0 :             return Err(CompactionError::ShuttingDown);
     827 CBC        1263 :         }
     828            1263 : 
     829            1263 :         let target_file_size = self.get_checkpoint_distance();
     830            1263 : 
     831            1263 :         // Define partitioning schema if needed
     832            1263 : 
     833            1263 :         match self
     834            1263 :             .repartition(
     835            1263 :                 self.get_last_record_lsn(),
     836            1263 :                 self.get_compaction_target_size(),
     837            1263 :                 ctx,
     838            1263 :             )
     839          279450 :             .await
     840                 :         {
     841            1262 :             Ok((partitioning, lsn)) => {
     842            1262 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     843            1262 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     844            1262 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     845            1262 :                     .build();
     846                 : 
     847                 :                 // 2. Create new image layers for partitions that have been modified
     848                 :                 // "enough".
     849            1262 :                 let layer_paths_to_upload = self
     850            1262 :                     .create_image_layers(&partitioning, lsn, false, &image_ctx)
     851          741358 :                     .await
     852            1257 :                     .map_err(anyhow::Error::from)?;
     853            1257 :                 if let Some(remote_client) = &self.remote_client {
     854            4620 :                     for (path, layer_metadata) in layer_paths_to_upload {
     855            3363 :                         remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
     856                 :                     }
     857 UBC           0 :                 }
     858                 : 
     859                 :                 // 3. Compact
     860 CBC        1257 :                 let timer = self.metrics.compact_time_histo.start_timer();
     861            1257 :                 self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx)
     862          477519 :                     .await?;
     863            1252 :                 timer.stop_and_record();
     864                 :             }
     865 UBC           0 :             Err(err) => {
     866                 :                 // no partitioning? This is normal, if the timeline was just created
     867                 :                 // as an empty timeline. Also in unit tests, when we use the timeline
     868                 :                 // as a simple key-value store, ignoring the datadir layout. Log the
     869                 :                 // error but continue.
     870               0 :                 error!("could not compact, repartitioning keyspace failed: {err:?}");
     871                 :             }
     872                 :         };
     873                 : 
     874 CBC        1252 :         Ok(())
     875            1254 :     }
     876                 : 
     877                 :     /// Mutate the timeline with a [`TimelineWriter`].
     878        69330484 :     pub async fn writer(&self) -> TimelineWriter<'_> {
     879                 :         TimelineWriter {
     880        69330519 :             tl: self,
     881        69330519 :             _write_guard: self.write_lock.lock().await,
     882                 :         }
     883        69330519 :     }
     884                 : 
     885                 :     /// Retrieve current logical size of the timeline.
     886                 :     ///
     887                 :     /// The size could be lagging behind the actual number, in case
     888                 :     /// the initial size calculation has not been run (gets triggered on the first size access).
     889                 :     ///
     890                 :     /// return size and boolean flag that shows if the size is exact
     891          779585 :     pub fn get_current_logical_size(
     892          779585 :         self: &Arc<Self>,
     893          779585 :         ctx: &RequestContext,
     894          779585 :     ) -> anyhow::Result<(u64, bool)> {
     895          779585 :         let current_size = self.current_logical_size.current_size()?;
     896          779585 :         debug!("Current size: {current_size:?}");
     897                 : 
     898          779585 :         let mut is_exact = true;
     899          779585 :         let size = current_size.size();
     900             717 :         if let (CurrentLogicalSize::Approximate(_), Some(initial_part_end)) =
     901          779585 :             (current_size, self.current_logical_size.initial_part_end)
     902             717 :         {
     903             717 :             is_exact = false;
     904             717 :             self.try_spawn_size_init_task(initial_part_end, ctx);
     905          778868 :         }
     906                 : 
     907          779585 :         Ok((size, is_exact))
     908          779585 :     }
     909                 : 
     910                 :     /// Check if more than 'checkpoint_distance' of WAL has been accumulated in
     911                 :     /// the in-memory layer, and initiate flushing it if so.
     912                 :     ///
     913                 :     /// Also flush after a period of time without new data -- it helps
     914                 :     /// safekeepers to regard pageserver as caught up and suspend activity.
     915          776902 :     pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
     916          776902 :         let last_lsn = self.get_last_record_lsn();
     917          776346 :         let open_layer_size = {
     918          776902 :             let guard = self.layers.read().await;
     919          776902 :             let layers = guard.layer_map();
     920          776902 :             let Some(open_layer) = layers.open_layer.as_ref() else {
     921             556 :                 return Ok(());
     922                 :             };
     923          776346 :             open_layer.size().await?
     924                 :         };
     925          776346 :         let last_freeze_at = self.last_freeze_at.load();
     926          776346 :         let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
     927          776346 :         let distance = last_lsn.widening_sub(last_freeze_at);
     928          776346 :         // Checkpointing the open layer can be triggered by layer size or LSN range.
     929          776346 :         // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
     930          776346 :         // we want to stay below that with a big margin.  The LSN distance determines how
     931          776346 :         // much WAL the safekeepers need to store.
     932          776346 :         if distance >= self.get_checkpoint_distance().into()
     933          774724 :             || open_layer_size > self.get_checkpoint_distance()
     934          772363 :             || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
     935                 :         {
     936            3983 :             info!(
     937            3983 :                 "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
     938            3983 :                 distance,
     939            3983 :                 open_layer_size,
     940            3983 :                 last_freeze_ts.elapsed()
     941            3983 :             );
     942                 : 
     943            3983 :             self.freeze_inmem_layer(true).await;
     944            3983 :             self.last_freeze_at.store(last_lsn);
     945            3983 :             *(self.last_freeze_ts.write().unwrap()) = Instant::now();
     946            3983 : 
     947            3983 :             // Wake up the layer flusher
     948            3983 :             self.flush_frozen_layers();
     949          772363 :         }
     950          776346 :         Ok(())
     951          776902 :     }
     952                 : 
     953            1103 :     pub fn activate(
     954            1103 :         self: &Arc<Self>,
     955            1103 :         broker_client: BrokerClientChannel,
     956            1103 :         background_jobs_can_start: Option<&completion::Barrier>,
     957            1103 :         ctx: &RequestContext,
     958            1103 :     ) {
     959            1103 :         self.launch_wal_receiver(ctx, broker_client);
     960            1103 :         self.set_state(TimelineState::Active);
     961            1103 :         self.launch_eviction_task(background_jobs_can_start);
     962            1103 :     }
     963                 : 
     964             864 :     #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
     965                 :     pub async fn shutdown(self: &Arc<Self>, freeze_and_flush: bool) {
     966                 :         debug_assert_current_span_has_tenant_and_timeline_id();
     967                 : 
     968                 :         // prevent writes to the InMemoryLayer
     969                 :         task_mgr::shutdown_tasks(
     970                 :             Some(TaskKind::WalReceiverManager),
     971                 :             Some(self.tenant_id),
     972                 :             Some(self.timeline_id),
     973                 :         )
     974                 :         .await;
     975                 : 
     976                 :         // now all writers to InMemory layer are gone, do the final flush if requested
     977                 :         if freeze_and_flush {
     978                 :             match self.freeze_and_flush().await {
     979                 :                 Ok(()) => {}
     980                 :                 Err(e) => {
     981              59 :                     warn!("failed to freeze and flush: {e:#}");
     982                 :                     return; // TODO: should probably drain remote timeline client anyways?
     983                 :                 }
     984                 :             }
     985                 : 
     986                 :             // drain the upload queue
     987                 :             let res = if let Some(client) = self.remote_client.as_ref() {
     988                 :                 // if we did not wait for completion here, it might be our shutdown process
     989                 :                 // didn't wait for remote uploads to complete at all, as new tasks can forever
     990                 :                 // be spawned.
     991                 :                 //
     992                 :                 // what is problematic is the shutting down of RemoteTimelineClient, because
     993                 :                 // obviously it does not make sense to stop while we wait for it, but what
     994                 :                 // about corner cases like s3 suddenly hanging up?
     995                 :                 client.wait_completion().await
     996                 :             } else {
     997                 :                 Ok(())
     998                 :             };
     999                 : 
    1000                 :             if let Err(e) = res {
    1001 UBC           0 :                 warn!("failed to await for frozen and flushed uploads: {e:#}");
    1002                 :             }
    1003                 :         }
    1004                 :     }
    1005                 : 
    1006                 :     pub fn set_state(&self, new_state: TimelineState) {
    1007 CBC        1853 :         match (self.current_state(), new_state) {
    1008            1853 :             (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
    1009             117 :                 info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
    1010                 :             }
    1011 UBC           0 :             (st, TimelineState::Loading) => {
    1012               0 :                 error!("ignoring transition from {st:?} into Loading state");
    1013                 :             }
    1014 CBC          14 :             (TimelineState::Broken { .. }, new_state) => {
    1015              14 :                 error!("Ignoring state update {new_state:?} for broken timeline");
    1016                 :             }
    1017                 :             (TimelineState::Stopping, TimelineState::Active) => {
    1018 UBC           0 :                 error!("Not activating a Stopping timeline");
    1019                 :             }
    1020 CBC        1722 :             (_, new_state) => {
    1021            1246 :                 if matches!(
    1022            1722 :                     new_state,
    1023                 :                     TimelineState::Stopping | TimelineState::Broken { .. }
    1024             476 :                 ) {
    1025             476 :                     // drop the completion guard, if any; it might be holding off the completion
    1026             476 :                     // forever needlessly
    1027             476 :                     self.initial_logical_size_attempt
    1028             476 :                         .lock()
    1029             476 :                         .unwrap_or_else(|e| e.into_inner())
    1030             476 :                         .take();
    1031            1246 :                 }
    1032            1722 :                 self.state.send_replace(new_state);
    1033                 :             }
    1034                 :         }
    1035            1853 :     }
    1036                 : 
    1037              34 :     pub fn set_broken(&self, reason: String) {
    1038              34 :         let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
    1039              34 :         let broken_state = TimelineState::Broken {
    1040              34 :             reason,
    1041              34 :             backtrace: backtrace_str,
    1042              34 :         };
    1043              34 :         self.set_state(broken_state)
    1044              34 :     }
    1045                 : 
    1046         2042347 :     pub fn current_state(&self) -> TimelineState {
    1047         2042347 :         self.state.borrow().clone()
    1048         2042347 :     }
    1049                 : 
    1050             666 :     pub fn is_broken(&self) -> bool {
    1051             666 :         matches!(&*self.state.borrow(), TimelineState::Broken { .. })
    1052             666 :     }
    1053                 : 
    1054         1285867 :     pub fn is_active(&self) -> bool {
    1055         1285867 :         self.current_state() == TimelineState::Active
    1056         1285867 :     }
    1057                 : 
    1058            2259 :     pub fn is_stopping(&self) -> bool {
    1059            2259 :         self.current_state() == TimelineState::Stopping
    1060            2259 :     }
    1061                 : 
    1062            1514 :     pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
    1063            1514 :         self.state.subscribe()
    1064            1514 :     }
    1065                 : 
    1066         1063161 :     pub async fn wait_to_become_active(
    1067         1063161 :         &self,
    1068         1063161 :         _ctx: &RequestContext, // Prepare for use by cancellation
    1069         1063161 :     ) -> Result<(), TimelineState> {
    1070         1063161 :         let mut receiver = self.state.subscribe();
    1071         1063167 :         loop {
    1072         1063167 :             let current_state = receiver.borrow().clone();
    1073         1063167 :             match current_state {
    1074                 :                 TimelineState::Loading => {
    1075               6 :                     receiver
    1076               6 :                         .changed()
    1077               6 :                         .await
    1078               6 :                         .expect("holding a reference to self");
    1079                 :                 }
    1080                 :                 TimelineState::Active { .. } => {
    1081         1063159 :                     return Ok(());
    1082                 :                 }
    1083                 :                 TimelineState::Broken { .. } | TimelineState::Stopping => {
    1084                 :                     // There's no chance the timeline can transition back into ::Active
    1085               2 :                     return Err(current_state);
    1086                 :                 }
    1087                 :             }
    1088                 :         }
    1089         1063161 :     }
    1090                 : 
    1091              85 :     pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
    1092              85 :         let guard = self.layers.read().await;
    1093              85 :         let layer_map = guard.layer_map();
    1094              85 :         let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
    1095              85 :         if let Some(open_layer) = &layer_map.open_layer {
    1096              21 :             in_memory_layers.push(open_layer.info());
    1097              64 :         }
    1098              85 :         for frozen_layer in &layer_map.frozen_layers {
    1099 UBC           0 :             in_memory_layers.push(frozen_layer.info());
    1100               0 :         }
    1101                 : 
    1102 CBC          85 :         let mut historic_layers = Vec::new();
    1103            1775 :         for historic_layer in layer_map.iter_historic_layers() {
    1104            1775 :             let historic_layer = guard.get_from_desc(&historic_layer);
    1105            1775 :             historic_layers.push(historic_layer.info(reset));
    1106            1775 :         }
    1107                 : 
    1108              85 :         LayerMapInfo {
    1109              85 :             in_memory_layers,
    1110              85 :             historic_layers,
    1111              85 :         }
    1112              85 :     }
    1113                 : 
    1114              24 :     #[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))]
    1115                 :     pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
    1116                 :         let Some(layer) = self.find_layer(layer_file_name).await else {
    1117                 :             return Ok(None);
    1118                 :         };
    1119                 :         let Some(remote_layer) = layer.downcast_remote_layer() else {
    1120                 :             return Ok(Some(false));
    1121                 :         };
    1122                 :         if self.remote_client.is_none() {
    1123                 :             return Ok(Some(false));
    1124                 :         }
    1125                 : 
    1126                 :         self.download_remote_layer(remote_layer).await?;
    1127                 :         Ok(Some(true))
    1128                 :     }
    1129                 : 
    1130                 :     /// Like [`evict_layer_batch`](Self::evict_layer_batch), but for just one layer.
    1131                 :     /// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
    1132             407 :     pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
    1133             407 :         let Some(local_layer) = self.find_layer(layer_file_name).await else {
    1134 UBC           0 :             return Ok(None);
    1135                 :         };
    1136 CBC         407 :         let remote_client = self
    1137             407 :             .remote_client
    1138             407 :             .as_ref()
    1139             407 :             .ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?;
    1140                 : 
    1141             407 :         let cancel = CancellationToken::new();
    1142             407 :         let results = self
    1143             407 :             .evict_layer_batch(remote_client, &[local_layer], cancel)
    1144               6 :             .await?;
    1145             407 :         assert_eq!(results.len(), 1);
    1146             407 :         let result: Option<Result<(), EvictionError>> = results.into_iter().next().unwrap();
    1147             407 :         match result {
    1148 UBC           0 :             None => anyhow::bail!("task_mgr shutdown requested"),
    1149 CBC         407 :             Some(Ok(())) => Ok(Some(true)),
    1150 UBC           0 :             Some(Err(e)) => Err(anyhow::Error::new(e)),
    1151                 :         }
    1152 CBC         407 :     }
    1153                 : 
    1154                 :     /// Evict a batch of layers.
    1155                 :     ///
    1156                 :     /// GenericRemoteStorage reference is required as a (witness)[witness_article] for "remote storage is configured."
    1157                 :     ///
    1158                 :     /// [witness_article]: https://willcrichton.net/rust-api-type-patterns/witnesses.html
    1159              11 :     pub(crate) async fn evict_layers(
    1160              11 :         &self,
    1161              11 :         _: &GenericRemoteStorage,
    1162              11 :         layers_to_evict: &[Arc<dyn PersistentLayer>],
    1163              11 :         cancel: CancellationToken,
    1164              11 :     ) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
    1165              11 :         let remote_client = self.remote_client.clone().expect(
    1166              11 :             "GenericRemoteStorage is configured, so timeline must have RemoteTimelineClient",
    1167              11 :         );
    1168              11 : 
    1169              11 :         self.evict_layer_batch(&remote_client, layers_to_evict, cancel)
    1170 UBC           0 :             .await
    1171 CBC          11 :     }
    1172                 : 
    1173                 :     /// Evict multiple layers at once, continuing through errors.
    1174                 :     ///
    1175                 :     /// Try to evict the given `layers_to_evict` by
    1176                 :     ///
    1177                 :     /// 1. Replacing the given layer object in the layer map with a corresponding [`RemoteLayer`] object.
    1178                 :     /// 2. Deleting the now unreferenced layer file from disk.
    1179                 :     ///
    1180                 :     /// The `remote_client` should be this timeline's `self.remote_client`.
    1181                 :     /// We make the caller provide it so that they are responsible for handling the case
    1182                 :     /// where someone wants to evict the layer but no remote storage is configured.
    1183                 :     ///
    1184                 :     /// Returns either `Err()` or `Ok(results)` where `results.len() == layers_to_evict.len()`.
    1185                 :     /// If `Err()` is returned, no eviction was attempted.
    1186                 :     /// Each position of `Ok(results)` corresponds to the layer in `layers_to_evict`.
    1187                 :     /// Meaning of each `result[i]`:
    1188                 :     /// - `Some(Err(...))` if layer replacement failed for an unexpected reason
    1189                 :     /// - `Some(Ok(true))` if everything went well.
    1190                 :     /// - `Some(Ok(false))` if there was an expected reason why the layer could not be replaced, e.g.:
    1191                 :     ///    - evictee was not yet downloaded
    1192                 :     ///    - replacement failed for an expectable reason (e.g., layer removed by GC before we grabbed all locks)
    1193                 :     /// - `None` if no eviction attempt was made for the layer because `cancel.is_cancelled() == true`.
    1194             448 :     async fn evict_layer_batch(
    1195             448 :         &self,
    1196             448 :         remote_client: &Arc<RemoteTimelineClient>,
    1197             448 :         layers_to_evict: &[Arc<dyn PersistentLayer>],
    1198             448 :         cancel: CancellationToken,
    1199             448 :     ) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
    1200             448 :         // ensure that the layers have finished uploading
    1201             448 :         // (don't hold the layer_removal_cs while we do it, we're not removing anything yet)
    1202             448 :         remote_client
    1203             448 :             .wait_completion()
    1204               4 :             .await
    1205             448 :             .context("wait for layer upload ops to complete")?;
    1206                 : 
    1207                 :         // now lock out layer removal (compaction, gc, timeline deletion)
    1208             448 :         let layer_removal_guard = self.layer_removal_cs.lock().await;
    1209                 : 
    1210                 :         {
    1211                 :             // to avoid racing with detach and delete_timeline
    1212             448 :             let state = self.current_state();
    1213             448 :             anyhow::ensure!(
    1214             448 :                 state == TimelineState::Active,
    1215 UBC           0 :                 "timeline is not active but {state:?}"
    1216                 :             );
    1217                 :         }
    1218                 : 
    1219                 :         // start the batch update
    1220 CBC         448 :         let mut guard = self.layers.write().await;
    1221             448 :         let mut results = Vec::with_capacity(layers_to_evict.len());
    1222                 : 
    1223             592 :         for l in layers_to_evict.iter() {
    1224             592 :             let res = if cancel.is_cancelled() {
    1225 UBC           0 :                 None
    1226                 :             } else {
    1227 CBC         592 :                 Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut guard))
    1228                 :             };
    1229             592 :             results.push(res);
    1230                 :         }
    1231                 : 
    1232                 :         // commit the updates & release locks
    1233             448 :         drop_wlock(guard);
    1234             448 :         drop(layer_removal_guard);
    1235             448 : 
    1236             448 :         assert_eq!(results.len(), layers_to_evict.len());
    1237             448 :         Ok(results)
    1238             448 :     }
    1239                 : 
    1240             592 :     fn evict_layer_batch_impl(
    1241             592 :         &self,
    1242             592 :         _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
    1243             592 :         local_layer: &Arc<dyn PersistentLayer>,
    1244             592 :         layer_mgr: &mut LayerManager,
    1245             592 :     ) -> Result<(), EvictionError> {
    1246             592 :         if local_layer.is_remote_layer() {
    1247 UBC           0 :             return Err(EvictionError::CannotEvictRemoteLayer);
    1248 CBC         592 :         }
    1249             592 : 
    1250             592 :         let layer_file_size = local_layer.layer_desc().file_size;
    1251                 : 
    1252             592 :         let local_layer_mtime = local_layer
    1253             592 :             .local_path()
    1254             592 :             .expect("local layer should have a local path")
    1255             592 :             .metadata()
    1256             592 :             // when the eviction fails because we have already deleted the layer in compaction for
    1257             592 :             // example, a NotFound error bubbles up from here.
    1258             592 :             .map_err(|e| {
    1259               1 :                 if e.kind() == std::io::ErrorKind::NotFound {
    1260               1 :                     EvictionError::FileNotFound
    1261                 :                 } else {
    1262 UBC           0 :                     EvictionError::StatFailed(e)
    1263                 :                 }
    1264 CBC         592 :             })?
    1265             591 :             .modified()
    1266             591 :             .map_err(EvictionError::StatFailed)?;
    1267                 : 
    1268             591 :         let local_layer_residence_duration =
    1269             591 :             match SystemTime::now().duration_since(local_layer_mtime) {
    1270 UBC           0 :                 Err(e) => {
    1271               0 :                     warn!(layer = %local_layer, "layer mtime is in the future: {}", e);
    1272               0 :                     None
    1273                 :                 }
    1274 CBC         591 :                 Ok(delta) => Some(delta),
    1275                 :             };
    1276                 : 
    1277                 :         // RemoteTimelineClient holds the metadata on layers' remote generations, so
    1278                 :         // query it to construct a RemoteLayer.
    1279             591 :         let layer_metadata = self
    1280             591 :             .remote_client
    1281             591 :             .as_ref()
    1282             591 :             .expect("Eviction is not called without remote storage")
    1283             591 :             .get_layer_metadata(&local_layer.filename())
    1284             591 :             .map_err(EvictionError::LayerNotFound)?
    1285             591 :             .ok_or_else(|| {
    1286 UBC           0 :                 EvictionError::LayerNotFound(anyhow::anyhow!("Layer not in remote metadata"))
    1287 CBC         591 :             })?;
    1288             591 :         if layer_metadata.file_size() != layer_file_size {
    1289 UBC           0 :             return Err(EvictionError::MetadataInconsistency(format!(
    1290               0 :                 "Layer size {layer_file_size} doesn't match remote metadata file size {}",
    1291               0 :                 layer_metadata.file_size()
    1292               0 :             )));
    1293 CBC         591 :         }
    1294                 : 
    1295             591 :         let new_remote_layer = Arc::new(match local_layer.filename() {
    1296             381 :             LayerFileName::Image(image_name) => RemoteLayer::new_img(
    1297             381 :                 self.tenant_id,
    1298             381 :                 self.timeline_id,
    1299             381 :                 &image_name,
    1300             381 :                 &layer_metadata,
    1301             381 :                 local_layer
    1302             381 :                     .access_stats()
    1303             381 :                     .clone_for_residence_change(LayerResidenceStatus::Evicted),
    1304             381 :             ),
    1305             210 :             LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
    1306             210 :                 self.tenant_id,
    1307             210 :                 self.timeline_id,
    1308             210 :                 &delta_name,
    1309             210 :                 &layer_metadata,
    1310             210 :                 local_layer
    1311             210 :                     .access_stats()
    1312             210 :                     .clone_for_residence_change(LayerResidenceStatus::Evicted),
    1313             210 :             ),
    1314                 :         });
    1315                 : 
    1316             591 :         assert_eq!(local_layer.layer_desc(), new_remote_layer.layer_desc());
    1317                 : 
    1318             591 :         layer_mgr
    1319             591 :             .replace_and_verify(local_layer.clone(), new_remote_layer)
    1320             591 :             .map_err(EvictionError::LayerNotFound)?;
    1321                 : 
    1322             590 :         if let Err(e) = local_layer.delete_resident_layer_file() {
    1323                 :             // this should never happen, because of layer_removal_cs usage and above stat
    1324                 :             // access for mtime
    1325 UBC           0 :             error!("failed to remove layer file on evict after replacement: {e:#?}");
    1326 CBC         590 :         }
    1327                 :         // Always decrement the physical size gauge, even if we failed to delete the file.
    1328                 :         // Rationale: we already replaced the layer with a remote layer in the layer map,
    1329                 :         // and any subsequent download_remote_layer will
    1330                 :         // 1. overwrite the file on disk and
    1331                 :         // 2. add the downloaded size to the resident size gauge.
    1332                 :         //
    1333                 :         // If there is no re-download, and we restart the pageserver, then load_layer_map
    1334                 :         // will treat the file as a local layer again, count it towards resident size,
    1335                 :         // and it'll be like the layer removal never happened.
    1336                 :         // The bump in resident size is perhaps unexpected but overall a robust behavior.
    1337             590 :         self.metrics.resident_physical_size_sub(layer_file_size);
    1338             590 :         self.metrics.evictions.inc();
    1339                 : 
    1340             590 :         if let Some(delta) = local_layer_residence_duration {
    1341             590 :             self.metrics
    1342             590 :                 .evictions_with_low_residence_duration
    1343             590 :                 .read()
    1344             590 :                 .unwrap()
    1345             590 :                 .observe(delta);
    1346             590 :             info!(layer=%local_layer, residence_millis=delta.as_millis(), "evicted layer after known residence period");
    1347                 :         } else {
    1348 UBC           0 :             info!(layer=%local_layer, "evicted layer after unknown residence period");
    1349                 :         }
    1350                 : 
    1351 CBC         590 :         Ok(())
    1352             592 :     }
    1353                 : }
    1354                 : 
    1355 UBC           0 : #[derive(Debug, thiserror::Error)]
    1356                 : pub(crate) enum EvictionError {
    1357                 :     #[error("cannot evict a remote layer")]
    1358                 :     CannotEvictRemoteLayer,
    1359                 :     /// Most likely the to-be evicted layer has been deleted by compaction or gc which use the same
    1360                 :     /// locks, so they got to execute before the eviction.
    1361                 :     #[error("file backing the layer has been removed already")]
    1362                 :     FileNotFound,
    1363                 :     #[error("stat failed")]
    1364                 :     StatFailed(#[source] std::io::Error),
    1365                 :     /// In practice, this can be a number of things, but lets assume it means only this.
    1366                 :     ///
    1367                 :     /// This case includes situations such as the Layer was evicted and redownloaded in between,
    1368                 :     /// because the file existed before an replacement attempt was made but now the Layers are
    1369                 :     /// different objects in memory.
    1370                 :     #[error("layer was no longer part of LayerMap")]
    1371                 :     LayerNotFound(#[source] anyhow::Error),
    1372                 : 
    1373                 :     /// This should never happen
    1374                 :     #[error("Metadata inconsistency")]
    1375                 :     MetadataInconsistency(String),
    1376                 : }
    1377                 : 
    1378                 : /// Number of times we will compute partition within a checkpoint distance.
    1379                 : const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
    1380                 : 
    1381                 : // Private functions
    1382                 : impl Timeline {
    1383 CBC     1553635 :     fn get_checkpoint_distance(&self) -> u64 {
    1384         1553635 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1385         1553635 :         tenant_conf
    1386         1553635 :             .checkpoint_distance
    1387         1553635 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
    1388         1553635 :     }
    1389                 : 
    1390          772363 :     fn get_checkpoint_timeout(&self) -> Duration {
    1391          772363 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1392          772363 :         tenant_conf
    1393          772363 :             .checkpoint_timeout
    1394          772363 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
    1395          772363 :     }
    1396                 : 
    1397            1302 :     fn get_compaction_target_size(&self) -> u64 {
    1398            1302 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1399            1302 :         tenant_conf
    1400            1302 :             .compaction_target_size
    1401            1302 :             .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
    1402            1302 :     }
    1403                 : 
    1404            1257 :     fn get_compaction_threshold(&self) -> usize {
    1405            1257 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1406            1257 :         tenant_conf
    1407            1257 :             .compaction_threshold
    1408            1257 :             .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
    1409            1257 :     }
    1410                 : 
    1411           23824 :     fn get_image_creation_threshold(&self) -> usize {
    1412           23824 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1413           23824 :         tenant_conf
    1414           23824 :             .image_creation_threshold
    1415           23824 :             .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
    1416           23824 :     }
    1417                 : 
    1418            1958 :     fn get_eviction_policy(&self) -> EvictionPolicy {
    1419            1958 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1420            1958 :         tenant_conf
    1421            1958 :             .eviction_policy
    1422            1958 :             .unwrap_or(self.conf.default_tenant_conf.eviction_policy)
    1423            1958 :     }
    1424                 : 
    1425            1331 :     fn get_evictions_low_residence_duration_metric_threshold(
    1426            1331 :         tenant_conf: &TenantConfOpt,
    1427            1331 :         default_tenant_conf: &TenantConf,
    1428            1331 :     ) -> Duration {
    1429            1331 :         tenant_conf
    1430            1331 :             .evictions_low_residence_duration_metric_threshold
    1431            1331 :             .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
    1432            1331 :     }
    1433                 : 
    1434           11289 :     fn get_gc_feedback(&self) -> bool {
    1435           11289 :         let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
    1436           11289 :         tenant_conf
    1437           11289 :             .gc_feedback
    1438           11289 :             .unwrap_or(self.conf.default_tenant_conf.gc_feedback)
    1439           11289 :     }
    1440                 : 
    1441              29 :     pub(super) fn tenant_conf_updated(&self) {
    1442              29 :         // NB: Most tenant conf options are read by background loops, so,
    1443              29 :         // changes will automatically be picked up.
    1444              29 : 
    1445              29 :         // The threshold is embedded in the metric. So, we need to update it.
    1446              29 :         {
    1447              29 :             let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
    1448              29 :                 &self.tenant_conf.read().unwrap().tenant_conf,
    1449              29 :                 &self.conf.default_tenant_conf,
    1450              29 :             );
    1451              29 :             let tenant_id_str = self.tenant_id.to_string();
    1452              29 :             let timeline_id_str = self.timeline_id.to_string();
    1453              29 :             self.metrics
    1454              29 :                 .evictions_with_low_residence_duration
    1455              29 :                 .write()
    1456              29 :                 .unwrap()
    1457              29 :                 .change_threshold(&tenant_id_str, &timeline_id_str, new_threshold);
    1458              29 :         }
    1459              29 :     }
    1460                 : 
    1461                 :     /// Open a Timeline handle.
    1462                 :     ///
    1463                 :     /// Loads the metadata for the timeline into memory, but not the layer map.
    1464                 :     #[allow(clippy::too_many_arguments)]
    1465            1302 :     pub(super) fn new(
    1466            1302 :         conf: &'static PageServerConf,
    1467            1302 :         tenant_conf: Arc<RwLock<AttachedTenantConf>>,
    1468            1302 :         metadata: &TimelineMetadata,
    1469            1302 :         ancestor: Option<Arc<Timeline>>,
    1470            1302 :         timeline_id: TimelineId,
    1471            1302 :         tenant_id: TenantId,
    1472            1302 :         generation: Generation,
    1473            1302 :         walredo_mgr: Arc<super::WalRedoManager>,
    1474            1302 :         resources: TimelineResources,
    1475            1302 :         pg_version: u32,
    1476            1302 :         initial_logical_size_can_start: Option<completion::Barrier>,
    1477            1302 :         initial_logical_size_attempt: Option<completion::Completion>,
    1478            1302 :         state: TimelineState,
    1479            1302 :     ) -> Arc<Self> {
    1480            1302 :         let disk_consistent_lsn = metadata.disk_consistent_lsn();
    1481            1302 :         let (state, _) = watch::channel(state);
    1482            1302 : 
    1483            1302 :         let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
    1484            1302 :         let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
    1485            1302 : 
    1486            1302 :         let tenant_conf_guard = tenant_conf.read().unwrap();
    1487            1302 : 
    1488            1302 :         let evictions_low_residence_duration_metric_threshold =
    1489            1302 :             Self::get_evictions_low_residence_duration_metric_threshold(
    1490            1302 :                 &tenant_conf_guard.tenant_conf,
    1491            1302 :                 &conf.default_tenant_conf,
    1492            1302 :             );
    1493            1302 :         drop(tenant_conf_guard);
    1494            1302 : 
    1495            1302 :         Arc::new_cyclic(|myself| {
    1496            1302 :             let mut result = Timeline {
    1497            1302 :                 conf,
    1498            1302 :                 tenant_conf,
    1499            1302 :                 myself: myself.clone(),
    1500            1302 :                 timeline_id,
    1501            1302 :                 tenant_id,
    1502            1302 :                 generation,
    1503            1302 :                 pg_version,
    1504            1302 :                 layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())),
    1505            1302 :                 wanted_image_layers: Mutex::new(None),
    1506            1302 : 
    1507            1302 :                 walredo_mgr,
    1508            1302 :                 walreceiver: Mutex::new(None),
    1509            1302 : 
    1510            1302 :                 remote_client: resources.remote_client.map(Arc::new),
    1511            1302 : 
    1512            1302 :                 // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
    1513            1302 :                 last_record_lsn: SeqWait::new(RecordLsn {
    1514            1302 :                     last: disk_consistent_lsn,
    1515            1302 :                     prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
    1516            1302 :                 }),
    1517            1302 :                 disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
    1518            1302 : 
    1519            1302 :                 last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
    1520            1302 :                 last_freeze_ts: RwLock::new(Instant::now()),
    1521            1302 : 
    1522            1302 :                 loaded_at: (disk_consistent_lsn, SystemTime::now()),
    1523            1302 : 
    1524            1302 :                 ancestor_timeline: ancestor,
    1525            1302 :                 ancestor_lsn: metadata.ancestor_lsn(),
    1526            1302 : 
    1527            1302 :                 metrics: TimelineMetrics::new(
    1528            1302 :                     &tenant_id,
    1529            1302 :                     &timeline_id,
    1530            1302 :                     crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
    1531            1302 :                         "mtime",
    1532            1302 :                         evictions_low_residence_duration_metric_threshold,
    1533            1302 :                     ),
    1534            1302 :                 ),
    1535            1302 : 
    1536            1302 :                 flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
    1537            1302 : 
    1538            1302 :                 layer_flush_start_tx,
    1539            1302 :                 layer_flush_done_tx,
    1540            1302 : 
    1541            1302 :                 write_lock: tokio::sync::Mutex::new(()),
    1542            1302 :                 layer_removal_cs: Default::default(),
    1543            1302 : 
    1544            1302 :                 gc_info: std::sync::RwLock::new(GcInfo {
    1545            1302 :                     retain_lsns: Vec::new(),
    1546            1302 :                     horizon_cutoff: Lsn(0),
    1547            1302 :                     pitr_cutoff: Lsn(0),
    1548            1302 :                 }),
    1549            1302 : 
    1550            1302 :                 latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
    1551            1302 :                 initdb_lsn: metadata.initdb_lsn(),
    1552            1302 : 
    1553            1302 :                 current_logical_size: if disk_consistent_lsn.is_valid() {
    1554                 :                     // we're creating timeline data with some layer files existing locally,
    1555                 :                     // need to recalculate timeline's logical size based on data in the layers.
    1556             685 :                     LogicalSize::deferred_initial(disk_consistent_lsn)
    1557                 :                 } else {
    1558                 :                     // we're creating timeline data without any layers existing locally,
    1559                 :                     // initial logical size is 0.
    1560             617 :                     LogicalSize::empty_initial()
    1561                 :                 },
    1562            1302 :                 partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
    1563            1302 :                 repartition_threshold: 0,
    1564            1302 : 
    1565            1302 :                 last_received_wal: Mutex::new(None),
    1566            1302 :                 rel_size_cache: RwLock::new(HashMap::new()),
    1567            1302 : 
    1568            1302 :                 download_all_remote_layers_task_info: RwLock::new(None),
    1569            1302 : 
    1570            1302 :                 state,
    1571            1302 : 
    1572            1302 :                 eviction_task_timeline_state: tokio::sync::Mutex::new(
    1573            1302 :                     EvictionTaskTimelineState::default(),
    1574            1302 :                 ),
    1575            1302 :                 delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
    1576            1302 : 
    1577            1302 :                 initial_logical_size_can_start,
    1578            1302 :                 initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt),
    1579            1302 :             };
    1580            1302 :             result.repartition_threshold =
    1581            1302 :                 result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
    1582            1302 :             result
    1583            1302 :                 .metrics
    1584            1302 :                 .last_record_gauge
    1585            1302 :                 .set(disk_consistent_lsn.0 as i64);
    1586            1302 :             result
    1587            1302 :         })
    1588            1302 :     }
    1589                 : 
    1590            1888 :     pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
    1591            1888 :         let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
    1592            1888 :         match *flush_loop_state {
    1593            1277 :             FlushLoopState::NotStarted => (),
    1594                 :             FlushLoopState::Running { .. } => {
    1595             611 :                 info!(
    1596             611 :                     "skipping attempt to start flush_loop twice {}/{}",
    1597             611 :                     self.tenant_id, self.timeline_id
    1598             611 :                 );
    1599             611 :                 return;
    1600                 :             }
    1601                 :             FlushLoopState::Exited => {
    1602 UBC           0 :                 warn!(
    1603               0 :                     "ignoring attempt to restart exited flush_loop {}/{}",
    1604               0 :                     self.tenant_id, self.timeline_id
    1605               0 :                 );
    1606               0 :                 return;
    1607                 :             }
    1608                 :         }
    1609                 : 
    1610 CBC        1277 :         let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
    1611            1277 :         let self_clone = Arc::clone(self);
    1612            1277 : 
    1613            1277 :         debug!("spawning flush loop");
    1614            1277 :         *flush_loop_state = FlushLoopState::Running {
    1615            1277 :             #[cfg(test)]
    1616            1277 :             expect_initdb_optimization: false,
    1617            1277 :             #[cfg(test)]
    1618            1277 :             initdb_optimization_count: 0,
    1619            1277 :         };
    1620            1277 :         task_mgr::spawn(
    1621            1277 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    1622            1277 :             task_mgr::TaskKind::LayerFlushTask,
    1623            1277 :             Some(self.tenant_id),
    1624            1277 :             Some(self.timeline_id),
    1625            1277 :             "layer flush task",
    1626                 :             false,
    1627            1277 :             async move {
    1628            1277 :                 let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
    1629           19473 :                 self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
    1630             443 :                 let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
    1631             443 :                 assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
    1632             443 :                 *flush_loop_state  = FlushLoopState::Exited;
    1633             443 :                 Ok(())
    1634             443 :             }
    1635            1277 :             .instrument(info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))
    1636                 :         );
    1637            1888 :     }
    1638                 : 
    1639                 :     /// Creates and starts the wal receiver.
    1640                 :     ///
    1641                 :     /// This function is expected to be called at most once per Timeline's lifecycle
    1642                 :     /// when the timeline is activated.
    1643            1103 :     fn launch_wal_receiver(
    1644            1103 :         self: &Arc<Self>,
    1645            1103 :         ctx: &RequestContext,
    1646            1103 :         broker_client: BrokerClientChannel,
    1647            1103 :     ) {
    1648            1103 :         info!(
    1649            1103 :             "launching WAL receiver for timeline {} of tenant {}",
    1650            1103 :             self.timeline_id, self.tenant_id
    1651            1103 :         );
    1652                 : 
    1653            1103 :         let tenant_conf_guard = self.tenant_conf.read().unwrap();
    1654            1103 :         let wal_connect_timeout = tenant_conf_guard
    1655            1103 :             .tenant_conf
    1656            1103 :             .walreceiver_connect_timeout
    1657            1103 :             .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
    1658            1103 :         let lagging_wal_timeout = tenant_conf_guard
    1659            1103 :             .tenant_conf
    1660            1103 :             .lagging_wal_timeout
    1661            1103 :             .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
    1662            1103 :         let max_lsn_wal_lag = tenant_conf_guard
    1663            1103 :             .tenant_conf
    1664            1103 :             .max_lsn_wal_lag
    1665            1103 :             .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
    1666            1103 :         drop(tenant_conf_guard);
    1667            1103 : 
    1668            1103 :         let mut guard = self.walreceiver.lock().unwrap();
    1669            1103 :         assert!(
    1670            1103 :             guard.is_none(),
    1671 UBC           0 :             "multiple launches / re-launches of WAL receiver are not supported"
    1672                 :         );
    1673 CBC        1103 :         *guard = Some(WalReceiver::start(
    1674            1103 :             Arc::clone(self),
    1675            1103 :             WalReceiverConf {
    1676            1103 :                 wal_connect_timeout,
    1677            1103 :                 lagging_wal_timeout,
    1678            1103 :                 max_lsn_wal_lag,
    1679            1103 :                 auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
    1680            1103 :                 availability_zone: self.conf.availability_zone.clone(),
    1681            1103 :             },
    1682            1103 :             broker_client,
    1683            1103 :             ctx,
    1684            1103 :         ));
    1685            1103 :     }
    1686                 : 
    1687                 :     /// Initialize with an empty layer map. Used when creating a new timeline.
    1688             967 :     pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
    1689             967 :         let mut layers = self.layers.try_write().expect(
    1690             967 :             "in the context where we call this function, no other task has access to the object",
    1691             967 :         );
    1692             967 :         layers.initialize_empty(Lsn(start_lsn.0));
    1693             967 :     }
    1694                 : 
    1695                 :     /// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
    1696                 :     /// files.
    1697             314 :     pub(super) async fn load_layer_map(
    1698             314 :         &self,
    1699             314 :         disk_consistent_lsn: Lsn,
    1700             314 :         index_part: Option<IndexPart>,
    1701             314 :     ) -> anyhow::Result<()> {
    1702                 :         use init::{Decision::*, Discovered, DismissedLayer};
    1703                 :         use LayerFileName::*;
    1704                 : 
    1705             314 :         let mut guard = self.layers.write().await;
    1706                 : 
    1707             314 :         let timer = self.metrics.load_layer_map_histo.start_timer();
    1708             314 : 
    1709             314 :         // Scan timeline directory and create ImageFileName and DeltaFilename
    1710             314 :         // structs representing all files on disk
    1711             314 :         let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id);
    1712             314 :         let (conf, tenant_id, timeline_id) = (self.conf, self.tenant_id, self.timeline_id);
    1713             314 :         let span = tracing::Span::current();
    1714             314 : 
    1715             314 :         // Copy to move into the task we're about to spawn
    1716             314 :         let generation = self.generation;
    1717                 : 
    1718             314 :         let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
    1719             314 :             move || {
    1720             314 :                 let _g = span.entered();
    1721             314 :                 let discovered = init::scan_timeline_dir(&timeline_path)?;
    1722             314 :                 let mut discovered_layers = Vec::with_capacity(discovered.len());
    1723             314 :                 let mut unrecognized_files = Vec::new();
    1724             314 : 
    1725             314 :                 let mut path = timeline_path;
    1726                 : 
    1727            5328 :                 for discovered in discovered {
    1728            5014 :                     let (name, kind) = match discovered {
    1729            4615 :                         Discovered::Layer(file_name, file_size) => {
    1730            4615 :                             discovered_layers.push((file_name, file_size));
    1731            4615 :                             continue;
    1732                 :                         }
    1733                 :                         Discovered::Metadata | Discovered::IgnoredBackup => {
    1734             314 :                             continue;
    1735                 :                         }
    1736 UBC           0 :                         Discovered::Unknown(file_name) => {
    1737               0 :                             // we will later error if there are any
    1738               0 :                             unrecognized_files.push(file_name);
    1739               0 :                             continue;
    1740                 :                         }
    1741 CBC          77 :                         Discovered::Ephemeral(name) => (name, "old ephemeral file"),
    1742               8 :                         Discovered::Temporary(name) => (name, "temporary timeline file"),
    1743 UBC           0 :                         Discovered::TemporaryDownload(name) => (name, "temporary download"),
    1744                 :                     };
    1745 CBC          85 :                     path.push(Utf8Path::new(&name));
    1746              85 :                     init::cleanup(&path, kind)?;
    1747              85 :                     path.pop();
    1748                 :                 }
    1749                 : 
    1750             314 :                 if !unrecognized_files.is_empty() {
    1751                 :                     // assume that if there are any there are many many.
    1752 UBC           0 :                     let n = unrecognized_files.len();
    1753               0 :                     let first = &unrecognized_files[..n.min(10)];
    1754               0 :                     anyhow::bail!(
    1755               0 :                         "unrecognized files in timeline dir (total {n}), first 10: {first:?}"
    1756               0 :                     );
    1757 CBC         314 :                 }
    1758             314 : 
    1759             314 :                 let decided = init::reconcile(
    1760             314 :                     discovered_layers,
    1761             314 :                     index_part.as_ref(),
    1762             314 :                     disk_consistent_lsn,
    1763             314 :                     generation,
    1764             314 :                 );
    1765             314 : 
    1766             314 :                 let mut loaded_layers = Vec::new();
    1767             314 :                 let mut needs_cleanup = Vec::new();
    1768             314 :                 let mut total_physical_size = 0;
    1769                 : 
    1770            8660 :                 for (name, decision) in decided {
    1771            8346 :                     let decision = match decision {
    1772            1768 :                         Ok(UseRemote { local, remote }) => {
    1773            1768 :                             // Remote is authoritative, but we may still choose to retain
    1774            1768 :                             // the local file if the contents appear to match
    1775            1768 :                             if local.file_size() == remote.file_size() {
    1776                 :                                 // Use the local file, but take the remote metadata so that we pick up
    1777                 :                                 // the correct generation.
    1778            1767 :                                 UseLocal(remote)
    1779                 :                             } else {
    1780               1 :                                 path.push(name.file_name());
    1781               1 :                                 init::cleanup_local_file_for_remote(&path, &local, &remote)?;
    1782               1 :                                 path.pop();
    1783               1 :                                 UseRemote { local, remote }
    1784                 :                             }
    1785                 :                         }
    1786            6310 :                         Ok(decision) => decision,
    1787               7 :                         Err(DismissedLayer::Future { local }) => {
    1788               7 :                             if local.is_some() {
    1789               7 :                                 path.push(name.file_name());
    1790               7 :                                 init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
    1791               7 :                                 path.pop();
    1792 LBC        (20) :                             }
    1793 CBC           7 :                             needs_cleanup.push(name);
    1794               7 :                             continue;
    1795                 :                         }
    1796             261 :                         Err(DismissedLayer::LocalOnly(local)) => {
    1797             261 :                             path.push(name.file_name());
    1798             261 :                             init::cleanup_local_only_file(&path, &name, &local)?;
    1799             261 :                             path.pop();
    1800             261 :                             // this file never existed remotely, we will have to do rework
    1801             261 :                             continue;
    1802                 :                         }
    1803                 :                     };
    1804                 : 
    1805            8078 :                     match &name {
    1806            4853 :                         Delta(d) => assert!(d.lsn_range.end <= disk_consistent_lsn + 1),
    1807            3225 :                         Image(i) => assert!(i.lsn <= disk_consistent_lsn),
    1808                 :                     }
    1809                 : 
    1810            8078 :                     let status = match &decision {
    1811            4346 :                         UseLocal(_) => LayerResidenceStatus::Resident,
    1812            3732 :                         Evicted(_) | UseRemote { .. } => LayerResidenceStatus::Evicted,
    1813                 :                     };
    1814                 : 
    1815            8078 :                     tracing::debug!(layer=%name, ?decision, ?status, "applied");
    1816                 : 
    1817            8078 :                     let stats = LayerAccessStats::for_loading_layer(status);
    1818                 : 
    1819            8078 :                     let layer: Arc<dyn PersistentLayer> = match (name, &decision) {
    1820            2321 :                         (Delta(d), UseLocal(m)) => {
    1821            2321 :                             total_physical_size += m.file_size();
    1822            2321 :                             Arc::new(DeltaLayer::new(
    1823            2321 :                                 conf,
    1824            2321 :                                 timeline_id,
    1825            2321 :                                 tenant_id,
    1826            2321 :                                 &d,
    1827            2321 :                                 m.file_size(),
    1828            2321 :                                 stats,
    1829            2321 :                             ))
    1830                 :                         }
    1831            2025 :                         (Image(i), UseLocal(m)) => {
    1832            2025 :                             total_physical_size += m.file_size();
    1833            2025 :                             Arc::new(ImageLayer::new(
    1834            2025 :                                 conf,
    1835            2025 :                                 timeline_id,
    1836            2025 :                                 tenant_id,
    1837            2025 :                                 &i,
    1838            2025 :                                 m.file_size(),
    1839            2025 :                                 stats,
    1840            2025 :                             ))
    1841                 :                         }
    1842            2532 :                         (Delta(d), Evicted(remote) | UseRemote { remote, .. }) => Arc::new(
    1843            2532 :                             RemoteLayer::new_delta(tenant_id, timeline_id, &d, remote, stats),
    1844            2532 :                         ),
    1845            1200 :                         (Image(i), Evicted(remote) | UseRemote { remote, .. }) => Arc::new(
    1846            1200 :                             RemoteLayer::new_img(tenant_id, timeline_id, &i, remote, stats),
    1847            1200 :                         ),
    1848                 :                     };
    1849                 : 
    1850            8078 :                     loaded_layers.push(layer);
    1851                 :                 }
    1852             314 :                 Ok((loaded_layers, needs_cleanup, total_physical_size))
    1853             314 :             }
    1854             314 :         })
    1855             314 :         .await
    1856             314 :         .map_err(anyhow::Error::new)
    1857             314 :         .and_then(|x| x)?;
    1858                 : 
    1859             314 :         let num_layers = loaded_layers.len();
    1860             314 : 
    1861             314 :         guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
    1862                 : 
    1863             314 :         if let Some(rtc) = self.remote_client.as_ref() {
    1864             314 :             rtc.schedule_layer_file_deletion(needs_cleanup)?;
    1865             314 :             rtc.schedule_index_upload_for_file_changes()?;
    1866                 :             // Tenant::create_timeline will wait for these uploads to happen before returning, or
    1867                 :             // on retry.
    1868 UBC           0 :         }
    1869                 : 
    1870 CBC         314 :         info!(
    1871             314 :             "loaded layer map with {} layers at {}, total physical size: {}",
    1872             314 :             num_layers, disk_consistent_lsn, total_physical_size
    1873             314 :         );
    1874             314 :         self.metrics.resident_physical_size_set(total_physical_size);
    1875             314 : 
    1876             314 :         timer.stop_and_record();
    1877             314 :         Ok(())
    1878             314 :     }
    1879                 : 
    1880             717 :     fn try_spawn_size_init_task(self: &Arc<Self>, lsn: Lsn, ctx: &RequestContext) {
    1881             717 :         let state = self.current_state();
    1882             691 :         if matches!(
    1883             717 :             state,
    1884                 :             TimelineState::Broken { .. } | TimelineState::Stopping
    1885                 :         ) {
    1886                 :             // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
    1887              26 :             return;
    1888             691 :         }
    1889                 : 
    1890             691 :         let permit = match Arc::clone(&self.current_logical_size.initial_size_computation)
    1891             691 :             .try_acquire_owned()
    1892                 :         {
    1893             373 :             Ok(permit) => permit,
    1894                 :             Err(TryAcquireError::NoPermits) => {
    1895                 :                 // computation already ongoing or finished with success
    1896             318 :                 return;
    1897                 :             }
    1898 UBC           0 :             Err(TryAcquireError::Closed) => unreachable!("we never call close"),
    1899                 :         };
    1900 CBC         373 :         debug_assert!(self
    1901             373 :             .current_logical_size
    1902             373 :             .initial_logical_size
    1903             373 :             .get()
    1904             373 :             .is_none());
    1905                 : 
    1906             373 :         info!(
    1907             373 :             "spawning logical size computation from context of task kind {:?}",
    1908             373 :             ctx.task_kind()
    1909             373 :         );
    1910                 :         // We need to start the computation task.
    1911                 :         // It gets a separate context since it will outlive the request that called this function.
    1912             373 :         let self_clone = Arc::clone(self);
    1913             373 :         let background_ctx = ctx.detached_child(
    1914             373 :             TaskKind::InitialLogicalSizeCalculation,
    1915             373 :             DownloadBehavior::Download,
    1916             373 :         );
    1917             373 :         task_mgr::spawn(
    1918             373 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    1919             373 :             task_mgr::TaskKind::InitialLogicalSizeCalculation,
    1920             373 :             Some(self.tenant_id),
    1921             373 :             Some(self.timeline_id),
    1922             373 :             "initial size calculation",
    1923             373 :             false,
    1924             373 :             // NB: don't log errors here, task_mgr will do that.
    1925             373 :             async move {
    1926             373 : 
    1927             373 :                 let cancel = task_mgr::shutdown_token();
    1928                 : 
    1929                 :                 // in case we were created during pageserver initialization, wait for
    1930                 :                 // initialization to complete before proceeding. startup time init runs on the same
    1931                 :                 // runtime.
    1932             373 :                 tokio::select! {
    1933             373 :                     _ = cancel.cancelled() => { return Ok(()); },
    1934             373 :                     _ = completion::Barrier::maybe_wait(self_clone.initial_logical_size_can_start.clone()) => {}
    1935             373 :                 };
    1936                 : 
    1937                 :                 // hold off background tasks from starting until all timelines get to try at least
    1938                 :                 // once initial logical size calculation; though retry will rarely be useful.
    1939                 :                 // holding off is done because heavier tasks execute blockingly on the same
    1940                 :                 // runtime.
    1941                 :                 //
    1942                 :                 // dropping this at every outcome is probably better than trying to cling on to it,
    1943                 :                 // delay will be terminated by a timeout regardless.
    1944             373 :                 let _completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() };
    1945             373 : 
    1946             373 :                 // no extra cancellation here, because nothing really waits for this to complete compared
    1947             373 :                 // to spawn_ondemand_logical_size_calculation.
    1948             373 :                 let cancel = CancellationToken::new();
    1949                 : 
    1950             373 :                 let calculated_size = match self_clone
    1951             373 :                     .logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel)
    1952          193882 :                     .await
    1953                 :                 {
    1954             368 :                     Ok(s) => s,
    1955                 :                     Err(CalculateLogicalSizeError::Cancelled) => {
    1956                 :                         // Don't make noise, this is a common task.
    1957                 :                         // In the unlikely case that there is another call to this function, we'll retry
    1958                 :                         // because initial_logical_size is still None.
    1959 LBC         (2) :                         info!("initial size calculation cancelled, likely timeline delete / tenant detach");
    1960             (2) :                         return Ok(());
    1961                 :                     }
    1962 CBC           3 :                     Err(CalculateLogicalSizeError::Other(err)) => {
    1963 UBC           0 :                         if let Some(e @ PageReconstructError::AncestorStopping(_)) =
    1964 CBC           3 :                             err.root_cause().downcast_ref()
    1965                 :                         {
    1966                 :                             // This can happen if the timeline parent timeline switches to
    1967                 :                             // Stopping state while we're still calculating the initial
    1968                 :                             // timeline size for the child, for example if the tenant is
    1969                 :                             // being detached or the pageserver is shut down. Like with
    1970                 :                             // CalculateLogicalSizeError::Cancelled, don't make noise.
    1971 UBC           0 :                             info!("initial size calculation failed because the timeline or its ancestor is Stopping, likely because the tenant is being detached: {e:#}");
    1972               0 :                             return Ok(());
    1973 CBC           3 :                         }
    1974               3 :                         return Err(err.context("Failed to calculate logical size"));
    1975                 :                     }
    1976                 :                 };
    1977                 : 
    1978                 :                 // we cannot query current_logical_size.current_size() to know the current
    1979                 :                 // *negative* value, only truncated to u64.
    1980             368 :                 let added = self_clone
    1981             368 :                     .current_logical_size
    1982             368 :                     .size_added_after_initial
    1983             368 :                     .load(AtomicOrdering::Relaxed);
    1984             368 : 
    1985             368 :                 let sum = calculated_size.saturating_add_signed(added);
    1986             368 : 
    1987             368 :                 // set the gauge value before it can be set in `update_current_logical_size`.
    1988             368 :                 self_clone.metrics.current_logical_size_gauge.set(sum);
    1989             368 : 
    1990             368 :                 match self_clone
    1991             368 :                     .current_logical_size
    1992             368 :                     .initial_logical_size
    1993             368 :                     .set(calculated_size)
    1994                 :                 {
    1995             368 :                     Ok(()) => (),
    1996 UBC           0 :                     Err(_what_we_just_attempted_to_set) => {
    1997               0 :                         let existing_size = self_clone
    1998               0 :                             .current_logical_size
    1999               0 :                             .initial_logical_size
    2000               0 :                             .get()
    2001               0 :                             .expect("once_cell set was lost, then get failed, impossible.");
    2002               0 :                         // This shouldn't happen because the semaphore is initialized with 1.
    2003               0 :                         // But if it happens, just complain & report success so there are no further retries.
    2004               0 :                         error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing")
    2005                 :                     }
    2006                 :                 }
    2007                 :                 // now that `initial_logical_size.is_some()`, reduce permit count to 0
    2008                 :                 // so that we prevent future callers from spawning this task
    2009 CBC         368 :                 permit.forget();
    2010             368 :                 Ok(())
    2011             373 :             }.in_current_span(),
    2012             373 :         );
    2013             717 :     }
    2014                 : 
    2015              39 :     pub fn spawn_ondemand_logical_size_calculation(
    2016              39 :         self: &Arc<Self>,
    2017              39 :         lsn: Lsn,
    2018              39 :         cause: LogicalSizeCalculationCause,
    2019              39 :         ctx: RequestContext,
    2020              39 :         cancel: CancellationToken,
    2021              39 :     ) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
    2022              39 :         let (sender, receiver) = oneshot::channel();
    2023              39 :         let self_clone = Arc::clone(self);
    2024              39 :         // XXX if our caller loses interest, i.e., ctx is cancelled,
    2025              39 :         // we should stop the size calculation work and return an error.
    2026              39 :         // That would require restructuring this function's API to
    2027              39 :         // return the result directly, instead of a Receiver for the result.
    2028              39 :         let ctx = ctx.detached_child(
    2029              39 :             TaskKind::OndemandLogicalSizeCalculation,
    2030              39 :             DownloadBehavior::Download,
    2031              39 :         );
    2032              39 :         task_mgr::spawn(
    2033              39 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    2034              39 :             task_mgr::TaskKind::OndemandLogicalSizeCalculation,
    2035              39 :             Some(self.tenant_id),
    2036              39 :             Some(self.timeline_id),
    2037              39 :             "ondemand logical size calculation",
    2038              39 :             false,
    2039              39 :             async move {
    2040              39 :                 let res = self_clone
    2041              39 :                     .logical_size_calculation_task(lsn, cause, &ctx, cancel)
    2042            2589 :                     .await;
    2043              39 :                 let _ = sender.send(res).ok();
    2044              39 :                 Ok(()) // Receiver is responsible for handling errors
    2045              39 :             }
    2046              39 :             .in_current_span(),
    2047              39 :         );
    2048              39 :         receiver
    2049              39 :     }
    2050                 : 
    2051            1648 :     #[instrument(skip_all)]
    2052                 :     async fn logical_size_calculation_task(
    2053                 :         self: &Arc<Self>,
    2054                 :         lsn: Lsn,
    2055                 :         cause: LogicalSizeCalculationCause,
    2056                 :         ctx: &RequestContext,
    2057                 :         cancel: CancellationToken,
    2058                 :     ) -> Result<u64, CalculateLogicalSizeError> {
    2059                 :         span::debug_assert_current_span_has_tenant_and_timeline_id();
    2060                 : 
    2061                 :         let mut timeline_state_updates = self.subscribe_for_state_updates();
    2062                 :         let self_calculation = Arc::clone(self);
    2063                 : 
    2064             412 :         let mut calculation = pin!(async {
    2065             412 :             let cancel = cancel.child_token();
    2066             412 :             let ctx = ctx.attached_child();
    2067             412 :             self_calculation
    2068             412 :                 .calculate_logical_size(lsn, cause, cancel, &ctx)
    2069          196471 :                 .await
    2070             410 :         });
    2071             407 :         let timeline_state_cancellation = async {
    2072                 :             loop {
    2073          196207 :                 match timeline_state_updates.changed().await {
    2074                 :                     Ok(()) => {
    2075 LBC         (2) :                         let new_state = timeline_state_updates.borrow().clone();
    2076             (2) :                         match new_state {
    2077                 :                             // we're running this job for active timelines only
    2078 UBC           0 :                             TimelineState::Active => continue,
    2079                 :                             TimelineState::Broken { .. }
    2080                 :                             | TimelineState::Stopping
    2081                 :                             | TimelineState::Loading => {
    2082 LBC         (2) :                                 break format!("aborted because timeline became inactive (new state: {new_state:?})")
    2083                 :                             }
    2084                 :                         }
    2085                 :                     }
    2086 UBC           0 :                     Err(_sender_dropped_error) => {
    2087               0 :                         // can't happen, the sender is not dropped as long as the Timeline exists
    2088               0 :                         break "aborted because state watch was dropped".to_string();
    2089                 :                     }
    2090                 :                 }
    2091                 :             }
    2092 LBC         (2) :         };
    2093                 : 
    2094 CBC         407 :         let taskmgr_shutdown_cancellation = async {
    2095          196336 :             task_mgr::shutdown_watcher().await;
    2096 UBC           0 :             "aborted because task_mgr shutdown requested".to_string()
    2097               0 :         };
    2098                 : 
    2099                 :         loop {
    2100 CBC      196883 :             tokio::select! {
    2101             410 :                 res = &mut calculation => { return res }
    2102 LBC         (2) :                 reason = timeline_state_cancellation => {
    2103 UBC           0 :                     debug!(reason = reason, "cancelling calculation");
    2104                 :                     cancel.cancel();
    2105                 :                     return calculation.await;
    2106                 :                 }
    2107               0 :                 reason = taskmgr_shutdown_cancellation => {
    2108               0 :                     debug!(reason = reason, "cancelling calculation");
    2109                 :                     cancel.cancel();
    2110                 :                     return calculation.await;
    2111                 :                 }
    2112                 :             }
    2113                 :         }
    2114                 :     }
    2115                 : 
    2116                 :     /// Calculate the logical size of the database at the latest LSN.
    2117                 :     ///
    2118                 :     /// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
    2119                 :     /// especially if we need to download remote layers.
    2120 CBC         418 :     pub async fn calculate_logical_size(
    2121             418 :         &self,
    2122             418 :         up_to_lsn: Lsn,
    2123             418 :         cause: LogicalSizeCalculationCause,
    2124             418 :         cancel: CancellationToken,
    2125             418 :         ctx: &RequestContext,
    2126             418 :     ) -> Result<u64, CalculateLogicalSizeError> {
    2127             418 :         info!(
    2128             418 :             "Calculating logical size for timeline {} at {}",
    2129             418 :             self.timeline_id, up_to_lsn
    2130             418 :         );
    2131                 :         // These failpoints are used by python tests to ensure that we don't delete
    2132                 :         // the timeline while the logical size computation is ongoing.
    2133                 :         // The first failpoint is used to make this function pause.
    2134                 :         // Then the python test initiates timeline delete operation in a thread.
    2135                 :         // It waits for a few seconds, then arms the second failpoint and disables
    2136                 :         // the first failpoint. The second failpoint prints an error if the timeline
    2137                 :         // delete code has deleted the on-disk state while we're still running here.
    2138                 :         // It shouldn't do that. If it does it anyway, the error will be caught
    2139                 :         // by the test suite, highlighting the problem.
    2140             418 :         fail::fail_point!("timeline-calculate-logical-size-pause");
    2141             418 :         fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
    2142               2 :             if !self
    2143               2 :                 .conf
    2144               2 :                 .metadata_path(&self.tenant_id, &self.timeline_id)
    2145               2 :                 .exists()
    2146                 :             {
    2147 UBC           0 :                 error!("timeline-calculate-logical-size-pre metadata file does not exist")
    2148 CBC           2 :             }
    2149                 :             // need to return something
    2150               2 :             Ok(0)
    2151             418 :         });
    2152                 :         // See if we've already done the work for initial size calculation.
    2153                 :         // This is a short-cut for timelines that are mostly unused.
    2154             416 :         if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
    2155               4 :             return Ok(size);
    2156             412 :         }
    2157             412 :         let storage_time_metrics = match cause {
    2158                 :             LogicalSizeCalculationCause::Initial
    2159                 :             | LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
    2160             400 :             | LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
    2161                 :             LogicalSizeCalculationCause::EvictionTaskImitation => {
    2162              12 :                 &self.metrics.imitate_logical_size_histo
    2163                 :             }
    2164                 :         };
    2165             412 :         let timer = storage_time_metrics.start_timer();
    2166             412 :         let logical_size = self
    2167             412 :             .get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx)
    2168          197786 :             .await?;
    2169 UBC           0 :         debug!("calculated logical size: {logical_size}");
    2170 CBC         407 :         timer.stop_and_record();
    2171             407 :         Ok(logical_size)
    2172             416 :     }
    2173                 : 
    2174                 :     /// Update current logical size, adding `delta' to the old value.
    2175         1029042 :     fn update_current_logical_size(&self, delta: i64) {
    2176         1029042 :         let logical_size = &self.current_logical_size;
    2177         1029042 :         logical_size.increment_size(delta);
    2178         1029042 : 
    2179         1029042 :         // Also set the value in the prometheus gauge. Note that
    2180         1029042 :         // there is a race condition here: if this is is called by two
    2181         1029042 :         // threads concurrently, the prometheus gauge might be set to
    2182         1029042 :         // one value while current_logical_size is set to the
    2183         1029042 :         // other.
    2184         1029042 :         match logical_size.current_size() {
    2185         1027550 :             Ok(CurrentLogicalSize::Exact(new_current_size)) => self
    2186         1027550 :                 .metrics
    2187         1027550 :                 .current_logical_size_gauge
    2188         1027550 :                 .set(new_current_size),
    2189            1492 :             Ok(CurrentLogicalSize::Approximate(_)) => {
    2190            1492 :                 // don't update the gauge yet, this allows us not to update the gauge back and
    2191            1492 :                 // forth between the initial size calculation task.
    2192            1492 :             }
    2193                 :             // this is overflow
    2194 UBC           0 :             Err(e) => error!("Failed to compute current logical size for metrics update: {e:?}"),
    2195                 :         }
    2196 CBC     1029042 :     }
    2197                 : 
    2198             413 :     async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
    2199             413 :         let guard = self.layers.read().await;
    2200           54386 :         for historic_layer in guard.layer_map().iter_historic_layers() {
    2201           54386 :             let historic_layer_name = historic_layer.filename().file_name();
    2202           54386 :             if layer_file_name == historic_layer_name {
    2203             413 :                 return Some(guard.get_from_desc(&historic_layer));
    2204           53973 :             }
    2205                 :         }
    2206                 : 
    2207 UBC           0 :         None
    2208 CBC         413 :     }
    2209                 : }
    2210                 : 
    2211                 : type TraversalId = String;
    2212                 : 
    2213                 : trait TraversalLayerExt {
    2214                 :     fn traversal_id(&self) -> TraversalId;
    2215                 : }
    2216                 : 
    2217                 : impl TraversalLayerExt for Arc<dyn PersistentLayer> {
    2218            1434 :     fn traversal_id(&self) -> TraversalId {
    2219            1434 :         let timeline_id = self.layer_desc().timeline_id;
    2220            1434 :         match self.local_path() {
    2221              10 :             Some(local_path) => {
    2222              10 :                 debug_assert!(local_path.to_string().contains(&format!("{}", timeline_id)),
    2223 UBC           0 :                     "need timeline ID to uniquely identify the layer when traversal crosses ancestor boundary",
    2224                 :                 );
    2225 CBC          10 :                 format!("{local_path}")
    2226                 :             }
    2227                 :             None => {
    2228            1424 :                 format!("remote {}/{self}", timeline_id)
    2229                 :             }
    2230                 :         }
    2231            1434 :     }
    2232                 : }
    2233                 : 
    2234                 : impl TraversalLayerExt for Arc<InMemoryLayer> {
    2235               2 :     fn traversal_id(&self) -> TraversalId {
    2236               2 :         format!("timeline {} in-memory {self}", self.get_timeline_id())
    2237               2 :     }
    2238                 : }
    2239                 : 
    2240                 : impl Timeline {
    2241                 :     ///
    2242                 :     /// Get a handle to a Layer for reading.
    2243                 :     ///
    2244                 :     /// The returned Layer might be from an ancestor timeline, if the
    2245                 :     /// segment hasn't been updated on this timeline yet.
    2246                 :     ///
    2247                 :     /// This function takes the current timeline's locked LayerMap as an argument,
    2248                 :     /// so callers can avoid potential race conditions.
    2249         6372305 :     async fn get_reconstruct_data(
    2250         6372305 :         &self,
    2251         6372305 :         key: Key,
    2252         6372305 :         request_lsn: Lsn,
    2253         6372305 :         reconstruct_state: &mut ValueReconstructState,
    2254         6372305 :         ctx: &RequestContext,
    2255         6372307 :     ) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
    2256         6372307 :         // Start from the current timeline.
    2257         6372307 :         let mut timeline_owned;
    2258         6372307 :         let mut timeline = self;
    2259         6372307 : 
    2260         6372307 :         let mut read_count = scopeguard::guard(0, |cnt| {
    2261         6372271 :             crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
    2262         6372307 :         });
    2263         6372307 : 
    2264         6372307 :         // For debugging purposes, collect the path of layers that we traversed
    2265         6372307 :         // through. It's included in the error message if we fail to find the key.
    2266         6372307 :         let mut traversal_path = Vec::<TraversalPathItem>::new();
    2267                 : 
    2268         6372307 :         let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
    2269         1297785 :             *cached_lsn
    2270                 :         } else {
    2271         5074522 :             Lsn(0)
    2272                 :         };
    2273                 : 
    2274                 :         // 'prev_lsn' tracks the last LSN that we were at in our search. It's used
    2275                 :         // to check that each iteration make some progress, to break infinite
    2276                 :         // looping if something goes wrong.
    2277         6372307 :         let mut prev_lsn = Lsn(u64::MAX);
    2278         6372307 : 
    2279         6372307 :         let mut result = ValueReconstructResult::Continue;
    2280         6372307 :         let mut cont_lsn = Lsn(request_lsn.0 + 1);
    2281                 : 
    2282        36845087 :         'outer: loop {
    2283        36845087 :             // The function should have updated 'state'
    2284        36845087 :             //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
    2285        36845087 :             match result {
    2286         5181611 :                 ValueReconstructResult::Complete => return Ok(traversal_path),
    2287                 :                 ValueReconstructResult::Continue => {
    2288                 :                     // If we reached an earlier cached page image, we're done.
    2289        31663469 :                     if cont_lsn == cached_lsn + 1 {
    2290         1190638 :                         MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
    2291         1190638 :                         return Ok(traversal_path);
    2292        30472831 :                     }
    2293        30472831 :                     if prev_lsn <= cont_lsn {
    2294                 :                         // Didn't make any progress in last iteration. Error out to avoid
    2295                 :                         // getting stuck in the loop.
    2296 UBC           0 :                         return Err(layer_traversal_error(format!(
    2297               0 :                             "could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
    2298               0 :                             key,
    2299               0 :                             Lsn(cont_lsn.0 - 1),
    2300               0 :                             request_lsn,
    2301               0 :                             timeline.ancestor_lsn
    2302               0 :                         ), traversal_path));
    2303 CBC    30472831 :                     }
    2304        30472831 :                     prev_lsn = cont_lsn;
    2305                 :                 }
    2306                 :                 ValueReconstructResult::Missing => {
    2307                 :                     return Err(layer_traversal_error(
    2308               7 :                         if cfg!(test) {
    2309               2 :                             format!(
    2310               2 :                                 "could not find data for key {} at LSN {}, for request at LSN {}\n{}",
    2311               2 :                                 key, cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
    2312               2 :                             )
    2313                 :                         } else {
    2314               5 :                             format!(
    2315               5 :                                 "could not find data for key {} at LSN {}, for request at LSN {}",
    2316               5 :                                 key, cont_lsn, request_lsn
    2317               5 :                             )
    2318                 :                         },
    2319               7 :                         traversal_path,
    2320                 :                     ));
    2321                 :                 }
    2322                 :             }
    2323                 : 
    2324                 :             // Recurse into ancestor if needed
    2325        30472831 :             if Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
    2326 UBC           0 :                 trace!(
    2327               0 :                     "going into ancestor {}, cont_lsn is {}",
    2328               0 :                     timeline.ancestor_lsn,
    2329               0 :                     cont_lsn
    2330               0 :                 );
    2331 CBC     1062058 :                 let ancestor = match timeline.get_ancestor_timeline() {
    2332         1062058 :                     Ok(timeline) => timeline,
    2333 UBC           0 :                     Err(e) => return Err(PageReconstructError::from(e)),
    2334                 :                 };
    2335                 : 
    2336                 :                 // It's possible that the ancestor timeline isn't active yet, or
    2337                 :                 // is active but hasn't yet caught up to the branch point. Wait
    2338                 :                 // for it.
    2339                 :                 //
    2340                 :                 // This cannot happen while the pageserver is running normally,
    2341                 :                 // because you cannot create a branch from a point that isn't
    2342                 :                 // present in the pageserver yet. However, we don't wait for the
    2343                 :                 // branch point to be uploaded to cloud storage before creating
    2344                 :                 // a branch. I.e., the branch LSN need not be remote consistent
    2345                 :                 // for the branching operation to succeed.
    2346                 :                 //
    2347                 :                 // Hence, if we try to load a tenant in such a state where
    2348                 :                 // 1. the existence of the branch was persisted (in IndexPart and/or locally)
    2349                 :                 // 2. but the ancestor state is behind branch_lsn because it was not yet persisted
    2350                 :                 // then we will need to wait for the ancestor timeline to
    2351                 :                 // re-stream WAL up to branch_lsn before we access it.
    2352                 :                 //
    2353                 :                 // How can a tenant get in such a state?
    2354                 :                 // - ungraceful pageserver process exit
    2355                 :                 // - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
    2356                 :                 //
    2357                 :                 // NB: this could be avoided by requiring
    2358                 :                 //   branch_lsn >= remote_consistent_lsn
    2359                 :                 // during branch creation.
    2360 CBC     1062058 :                 match ancestor.wait_to_become_active(ctx).await {
    2361         1062057 :                     Ok(()) => {}
    2362                 :                     Err(TimelineState::Stopping) => {
    2363 UBC           0 :                         return Err(PageReconstructError::AncestorStopping(ancestor.timeline_id));
    2364                 :                     }
    2365 CBC           1 :                     Err(state) => {
    2366               1 :                         return Err(PageReconstructError::Other(anyhow::anyhow!(
    2367               1 :                             "Timeline {} will not become active. Current state: {:?}",
    2368               1 :                             ancestor.timeline_id,
    2369               1 :                             &state,
    2370               1 :                         )));
    2371                 :                     }
    2372                 :                 }
    2373         1062057 :                 ancestor
    2374         1062057 :                     .wait_lsn(timeline.ancestor_lsn, ctx)
    2375              10 :                     .await
    2376         1062057 :                     .with_context(|| {
    2377 UBC           0 :                         format!(
    2378               0 :                             "wait for lsn {} on ancestor timeline_id={}",
    2379               0 :                             timeline.ancestor_lsn, ancestor.timeline_id
    2380               0 :                         )
    2381 CBC     1062057 :                     })?;
    2382                 : 
    2383         1062057 :                 timeline_owned = ancestor;
    2384         1062057 :                 timeline = &*timeline_owned;
    2385         1062057 :                 prev_lsn = Lsn(u64::MAX);
    2386         1062057 :                 continue 'outer;
    2387        29410773 :             }
    2388                 : 
    2389                 :             #[allow(clippy::never_loop)] // see comment at bottom of this loop
    2390                 :             'layer_map_search: loop {
    2391            1425 :                 let remote_layer = {
    2392        29412190 :                     let guard = timeline.layers.read().await;
    2393        29412164 :                     let layers = guard.layer_map();
    2394                 : 
    2395                 :                     // Check the open and frozen in-memory layers first, in order from newest
    2396                 :                     // to oldest.
    2397        29412164 :                     if let Some(open_layer) = &layers.open_layer {
    2398        24833727 :                         let start_lsn = open_layer.get_lsn_range().start;
    2399        24833727 :                         if cont_lsn > start_lsn {
    2400                 :                             //info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
    2401                 :                             // Get all the data needed to reconstruct the page version from this layer.
    2402                 :                             // But if we have an older cached page image, no need to go past that.
    2403         5076087 :                             let lsn_floor = max(cached_lsn + 1, start_lsn);
    2404         5076087 :                             result = match open_layer
    2405         5076087 :                                 .get_value_reconstruct_data(
    2406         5076087 :                                     key,
    2407         5076087 :                                     lsn_floor..cont_lsn,
    2408         5076087 :                                     reconstruct_state,
    2409         5076087 :                                     ctx,
    2410         5076087 :                                 )
    2411          479207 :                                 .await
    2412                 :                             {
    2413         5076086 :                                 Ok(result) => result,
    2414 UBC           0 :                                 Err(e) => return Err(PageReconstructError::from(e)),
    2415                 :                             };
    2416 CBC     5076086 :                             cont_lsn = lsn_floor;
    2417         5076086 :                             // metrics: open_layer does not count as fs access, so we are not updating `read_count`
    2418         5076086 :                             traversal_path.push((
    2419         5076086 :                                 result,
    2420         5076086 :                                 cont_lsn,
    2421         5076086 :                                 Box::new({
    2422         5076086 :                                     let open_layer = Arc::clone(open_layer);
    2423         5076086 :                                     move || open_layer.traversal_id()
    2424         5076086 :                                 }),
    2425         5076086 :                             ));
    2426         5076086 :                             continue 'outer;
    2427        19757640 :                         }
    2428         4578437 :                     }
    2429        24336077 :                     for frozen_layer in layers.frozen_layers.iter().rev() {
    2430         3105084 :                         let start_lsn = frozen_layer.get_lsn_range().start;
    2431         3105084 :                         if cont_lsn > start_lsn {
    2432                 :                             //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
    2433          387246 :                             let lsn_floor = max(cached_lsn + 1, start_lsn);
    2434          387246 :                             result = match frozen_layer
    2435          387246 :                                 .get_value_reconstruct_data(
    2436          387246 :                                     key,
    2437          387246 :                                     lsn_floor..cont_lsn,
    2438          387246 :                                     reconstruct_state,
    2439          387246 :                                     ctx,
    2440          387246 :                                 )
    2441           27403 :                                 .await
    2442                 :                             {
    2443          387246 :                                 Ok(result) => result,
    2444 UBC           0 :                                 Err(e) => return Err(PageReconstructError::from(e)),
    2445                 :                             };
    2446 CBC      387246 :                             cont_lsn = lsn_floor;
    2447          387246 :                             // metrics: open_layer does not count as fs access, so we are not updating `read_count`
    2448          387246 :                             traversal_path.push((
    2449          387246 :                                 result,
    2450          387246 :                                 cont_lsn,
    2451          387246 :                                 Box::new({
    2452          387246 :                                     let frozen_layer = Arc::clone(frozen_layer);
    2453          387246 :                                     move || frozen_layer.traversal_id()
    2454          387246 :                                 }),
    2455          387246 :                             ));
    2456          387246 :                             continue 'outer;
    2457         2717838 :                         }
    2458                 :                     }
    2459                 : 
    2460        23948831 :                     if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
    2461        23851667 :                         let layer = guard.get_from_desc(&layer);
    2462                 :                         // If it's a remote layer, download it and retry.
    2463            1425 :                         if let Some(remote_layer) =
    2464        23851667 :                             super::storage_layer::downcast_remote_layer(&layer)
    2465                 :                         {
    2466                 :                             // TODO: push a breadcrumb to 'traversal_path' to record the fact that
    2467                 :                             // we downloaded / would need to download this layer.
    2468            1425 :                             remote_layer // download happens outside the scope of `layers` guard object
    2469                 :                         } else {
    2470                 :                             // Get all the data needed to reconstruct the page version from this layer.
    2471                 :                             // But if we have an older cached page image, no need to go past that.
    2472        23850242 :                             let lsn_floor = max(cached_lsn + 1, lsn_floor);
    2473        23850242 :                             result = match layer
    2474        23850242 :                                 .get_value_reconstruct_data(
    2475        23850242 :                                     key,
    2476        23850242 :                                     lsn_floor..cont_lsn,
    2477        23850242 :                                     reconstruct_state,
    2478        23850242 :                                     ctx,
    2479        23850242 :                                 )
    2480         2126171 :                                 .await
    2481                 :                             {
    2482        23850227 :                                 Ok(result) => result,
    2483              10 :                                 Err(e) => return Err(PageReconstructError::from(e)),
    2484                 :                             };
    2485        23850227 :                             cont_lsn = lsn_floor;
    2486        23850227 :                             *read_count += 1;
    2487        23850227 :                             traversal_path.push((
    2488        23850227 :                                 result,
    2489        23850227 :                                 cont_lsn,
    2490        23850227 :                                 Box::new({
    2491        23850227 :                                     let layer = Arc::clone(&layer);
    2492        23850227 :                                     move || layer.traversal_id()
    2493        23850227 :                                 }),
    2494        23850227 :                             ));
    2495        23850227 :                             continue 'outer;
    2496                 :                         }
    2497           97164 :                     } else if timeline.ancestor_timeline.is_some() {
    2498                 :                         // Nothing on this timeline. Traverse to parent
    2499           97159 :                         result = ValueReconstructResult::Continue;
    2500           97159 :                         cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
    2501           97159 :                         continue 'outer;
    2502                 :                     } else {
    2503                 :                         // Nothing found
    2504               5 :                         result = ValueReconstructResult::Missing;
    2505               5 :                         continue 'outer;
    2506                 :                     }
    2507                 :                 };
    2508                 :                 // Download the remote_layer and replace it in the layer map.
    2509                 :                 // For that, we need to release the mutex. Otherwise, we'd deadlock.
    2510                 :                 //
    2511                 :                 // The control flow is so weird here because `drop(layers)` inside
    2512                 :                 // the if stmt above is not enough for current rustc: it requires
    2513                 :                 // that the layers lock guard is not in scope across the download
    2514                 :                 // await point.
    2515            1425 :                 let remote_layer_as_persistent: Arc<dyn PersistentLayer> =
    2516            1425 :                     Arc::clone(&remote_layer) as Arc<dyn PersistentLayer>;
    2517            1425 :                 let id = remote_layer_as_persistent.traversal_id();
    2518            1424 :                 info!(
    2519            1424 :                     "need remote layer {} for task kind {:?}",
    2520            1424 :                     id,
    2521            1424 :                     ctx.task_kind()
    2522            1424 :                 );
    2523                 : 
    2524                 :                 // The next layer doesn't exist locally. Need to download it.
    2525                 :                 // (The control flow is a bit complicated here because we must drop the 'layers'
    2526                 :                 // lock before awaiting on the Future.)
    2527            1424 :                 match (
    2528            1424 :                     ctx.download_behavior(),
    2529            1424 :                     self.conf.ondemand_download_behavior_treat_error_as_warn,
    2530            1424 :                 ) {
    2531                 :                     (DownloadBehavior::Download, _) => {
    2532            1424 :                         info!(
    2533            1424 :                             "on-demand downloading remote layer {id} for task kind {:?}",
    2534            1424 :                             ctx.task_kind()
    2535            1424 :                         );
    2536            1903 :                         timeline.download_remote_layer(remote_layer).await?;
    2537            1417 :                         continue 'layer_map_search;
    2538                 :                     }
    2539                 :                     (DownloadBehavior::Warn, _) | (DownloadBehavior::Error, true) => {
    2540 UBC           0 :                         warn!(
    2541               0 :                             "unexpectedly on-demand downloading remote layer {} for task kind {:?}",
    2542               0 :                             id,
    2543               0 :                             ctx.task_kind()
    2544               0 :                         );
    2545               0 :                         UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
    2546               0 :                         timeline.download_remote_layer(remote_layer).await?;
    2547               0 :                         continue 'layer_map_search;
    2548                 :                     }
    2549                 :                     (DownloadBehavior::Error, false) => {
    2550               0 :                         return Err(PageReconstructError::NeedsDownload(
    2551               0 :                             TenantTimelineId::new(self.tenant_id, self.timeline_id),
    2552               0 :                             remote_layer.filename(),
    2553               0 :                         ))
    2554                 :                     }
    2555                 :                 }
    2556                 :             }
    2557                 :         }
    2558 CBC     6372271 :     }
    2559                 : 
    2560         6372321 :     async fn lookup_cached_page(
    2561         6372321 :         &self,
    2562         6372321 :         key: &Key,
    2563         6372321 :         lsn: Lsn,
    2564         6372321 :         ctx: &RequestContext,
    2565         6372323 :     ) -> Option<(Lsn, Bytes)> {
    2566         6372323 :         let cache = page_cache::get();
    2567                 : 
    2568                 :         // FIXME: It's pointless to check the cache for things that are not 8kB pages.
    2569                 :         // We should look at the key to determine if it's a cacheable object
    2570         6372323 :         let (lsn, read_guard) = cache
    2571         6372323 :             .lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn, ctx)
    2572         5074522 :             .await?;
    2573         1297801 :         let img = Bytes::from(read_guard.to_vec());
    2574         1297801 :         Some((lsn, img))
    2575         6372323 :     }
    2576                 : 
    2577         1062058 :     fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
    2578         1062058 :         let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
    2579 UBC           0 :             format!(
    2580               0 :                 "Ancestor is missing. Timeline id: {} Ancestor id {:?}",
    2581               0 :                 self.timeline_id,
    2582               0 :                 self.get_ancestor_timeline_id(),
    2583               0 :             )
    2584 CBC     1062058 :         })?;
    2585         1062058 :         Ok(Arc::clone(ancestor))
    2586         1062058 :     }
    2587                 : 
    2588                 :     ///
    2589                 :     /// Get a handle to the latest layer for appending.
    2590                 :     ///
    2591        76639811 :     async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
    2592        76639858 :         let mut guard = self.layers.write().await;
    2593        76639855 :         let layer = guard
    2594        76639855 :             .get_layer_for_write(
    2595        76639855 :                 lsn,
    2596        76639855 :                 self.get_last_record_lsn(),
    2597        76639855 :                 self.conf,
    2598        76639855 :                 self.timeline_id,
    2599        76639855 :                 self.tenant_id,
    2600        76639855 :             )
    2601 UBC           0 :             .await?;
    2602 CBC    76639855 :         Ok(layer)
    2603        76639855 :     }
    2604                 : 
    2605        76622124 :     async fn put_value(
    2606        76622124 :         &self,
    2607        76622124 :         key: Key,
    2608        76622124 :         lsn: Lsn,
    2609        76622124 :         val: &Value,
    2610        76622124 :         ctx: &RequestContext,
    2611        76622124 :     ) -> anyhow::Result<()> {
    2612                 :         //info!("PUT: key {} at {}", key, lsn);
    2613        76622171 :         let layer = self.get_layer_for_write(lsn).await?;
    2614        76622168 :         layer.put_value(key, lsn, val, ctx).await?;
    2615        76622164 :         Ok(())
    2616        76622164 :     }
    2617                 : 
    2618           17687 :     async fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
    2619           17687 :         let layer = self.get_layer_for_write(lsn).await?;
    2620           17687 :         layer.put_tombstone(key_range, lsn).await?;
    2621           17687 :         Ok(())
    2622           17687 :     }
    2623                 : 
    2624        69330499 :     fn finish_write(&self, new_lsn: Lsn) {
    2625        69330499 :         assert!(new_lsn.is_aligned());
    2626                 : 
    2627        69330499 :         self.metrics.last_record_gauge.set(new_lsn.0 as i64);
    2628        69330499 :         self.last_record_lsn.advance(new_lsn);
    2629        69330499 :     }
    2630                 : 
    2631            5575 :     async fn freeze_inmem_layer(&self, write_lock_held: bool) {
    2632                 :         // Freeze the current open in-memory layer. It will be written to disk on next
    2633                 :         // iteration.
    2634            5575 :         let _write_guard = if write_lock_held {
    2635            3983 :             None
    2636                 :         } else {
    2637            1592 :             Some(self.write_lock.lock().await)
    2638                 :         };
    2639            5575 :         let mut guard = self.layers.write().await;
    2640            5575 :         guard
    2641            5575 :             .try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at)
    2642              21 :             .await;
    2643            5575 :     }
    2644                 : 
    2645                 :     /// Layer flusher task's main loop.
    2646            1277 :     async fn flush_loop(
    2647            1277 :         self: &Arc<Self>,
    2648            1277 :         mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
    2649            1277 :         ctx: &RequestContext,
    2650            1277 :     ) {
    2651            1277 :         info!("started flush loop");
    2652                 :         loop {
    2653            5932 :             tokio::select! {
    2654           10608 :                 _ = task_mgr::shutdown_watcher() => {
    2655           10608 :                     info!("shutting down layer flush task");
    2656           10608 :                     break;
    2657           10608 :                 },
    2658           10608 :                 _ = layer_flush_start_rx.changed() => {}
    2659           10608 :             }
    2660                 : 
    2661 UBC           0 :             trace!("waking up");
    2662 CBC        4663 :             let timer = self.metrics.flush_time_histo.start_timer();
    2663            4663 :             let flush_counter = *layer_flush_start_rx.borrow();
    2664            4655 :             let result = loop {
    2665           10006 :                 let layer_to_flush = {
    2666           10006 :                     let guard = self.layers.read().await;
    2667           10006 :                     guard.layer_map().frozen_layers.front().cloned()
    2668                 :                     // drop 'layers' lock to allow concurrent reads and writes
    2669                 :                 };
    2670           10006 :                 let Some(layer_to_flush) = layer_to_flush else {
    2671            4655 :                     break Ok(());
    2672                 :                 };
    2673           13069 :                 if let Err(err) = self.flush_frozen_layer(layer_to_flush, ctx).await {
    2674 UBC           0 :                     error!("could not flush frozen layer: {err:?}");
    2675               0 :                     break Err(err);
    2676 CBC        5343 :                 }
    2677                 :             };
    2678                 :             // Notify any listeners that we're done
    2679            4655 :             let _ = self
    2680            4655 :                 .layer_flush_done_tx
    2681            4655 :                 .send_replace((flush_counter, result));
    2682            4655 : 
    2683            4655 :             timer.stop_and_record();
    2684                 :         }
    2685             443 :     }
    2686                 : 
    2687            1592 :     async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
    2688            1592 :         let mut rx = self.layer_flush_done_tx.subscribe();
    2689            1592 : 
    2690            1592 :         // Increment the flush cycle counter and wake up the flush task.
    2691            1592 :         // Remember the new value, so that when we listen for the flush
    2692            1592 :         // to finish, we know when the flush that we initiated has
    2693            1592 :         // finished, instead of some other flush that was started earlier.
    2694            1592 :         let mut my_flush_request = 0;
    2695            1592 : 
    2696            1592 :         let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
    2697            1592 :         if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
    2698              59 :             anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
    2699            1533 :         }
    2700            1533 : 
    2701            1533 :         self.layer_flush_start_tx.send_modify(|counter| {
    2702            1533 :             my_flush_request = *counter + 1;
    2703            1533 :             *counter = my_flush_request;
    2704            1533 :         });
    2705                 : 
    2706            3067 :         loop {
    2707            3067 :             {
    2708            3067 :                 let (last_result_counter, last_result) = &*rx.borrow();
    2709            3067 :                 if *last_result_counter >= my_flush_request {
    2710            1533 :                     if let Err(_err) = last_result {
    2711                 :                         // We already logged the original error in
    2712                 :                         // flush_loop. We cannot propagate it to the caller
    2713                 :                         // here, because it might not be Cloneable
    2714 UBC           0 :                         anyhow::bail!(
    2715               0 :                             "Could not flush frozen layer. Request id: {}",
    2716               0 :                             my_flush_request
    2717               0 :                         );
    2718                 :                     } else {
    2719 CBC        1533 :                         return Ok(());
    2720                 :                     }
    2721            1534 :                 }
    2722                 :             }
    2723 UBC           0 :             trace!("waiting for flush to complete");
    2724 CBC        1535 :             rx.changed().await?;
    2725 UBC           0 :             trace!("done")
    2726                 :         }
    2727 CBC        1592 :     }
    2728                 : 
    2729            3983 :     fn flush_frozen_layers(&self) {
    2730            3983 :         self.layer_flush_start_tx.send_modify(|val| *val += 1);
    2731            3983 :     }
    2732                 : 
    2733                 :     /// Flush one frozen in-memory layer to disk, as a new delta layer.
    2734            5351 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer))]
    2735                 :     async fn flush_frozen_layer(
    2736                 :         self: &Arc<Self>,
    2737                 :         frozen_layer: Arc<InMemoryLayer>,
    2738                 :         ctx: &RequestContext,
    2739                 :     ) -> anyhow::Result<()> {
    2740                 :         // As a special case, when we have just imported an image into the repository,
    2741                 :         // instead of writing out a L0 delta layer, we directly write out image layer
    2742                 :         // files instead. This is possible as long as *all* the data imported into the
    2743                 :         // repository have the same LSN.
    2744                 :         let lsn_range = frozen_layer.get_lsn_range();
    2745                 :         let (layer_paths_to_upload, delta_layer_to_add) =
    2746                 :             if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
    2747                 :                 #[cfg(test)]
    2748                 :                 match &mut *self.flush_loop_state.lock().unwrap() {
    2749                 :                     FlushLoopState::NotStarted | FlushLoopState::Exited => {
    2750                 :                         panic!("flush loop not running")
    2751                 :                     }
    2752                 :                     FlushLoopState::Running {
    2753                 :                         initdb_optimization_count,
    2754                 :                         ..
    2755                 :                     } => {
    2756                 :                         *initdb_optimization_count += 1;
    2757                 :                     }
    2758                 :                 }
    2759                 :                 // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
    2760                 :                 // require downloading anything during initial import.
    2761                 :                 let (partitioning, _lsn) = self
    2762                 :                     .repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx)
    2763                 :                     .await?;
    2764                 :                 // For image layers, we add them immediately into the layer map.
    2765                 :                 (
    2766                 :                     self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
    2767                 :                         .await?,
    2768                 :                     None,
    2769                 :                 )
    2770                 :             } else {
    2771                 :                 #[cfg(test)]
    2772                 :                 match &mut *self.flush_loop_state.lock().unwrap() {
    2773                 :                     FlushLoopState::NotStarted | FlushLoopState::Exited => {
    2774                 :                         panic!("flush loop not running")
    2775                 :                     }
    2776                 :                     FlushLoopState::Running {
    2777                 :                         expect_initdb_optimization,
    2778                 :                         ..
    2779                 :                     } => {
    2780                 :                         assert!(!*expect_initdb_optimization, "expected initdb optimization");
    2781                 :                     }
    2782                 :                 }
    2783                 :                 // Normal case, write out a L0 delta layer file.
    2784                 :                 // `create_delta_layer` will not modify the layer map.
    2785                 :                 // We will remove frozen layer and add delta layer in one atomic operation later.
    2786                 :                 let layer = self.create_delta_layer(&frozen_layer, ctx).await?;
    2787                 :                 (
    2788                 :                     HashMap::from([(
    2789                 :                         layer.filename(),
    2790                 :                         LayerFileMetadata::new(layer.layer_desc().file_size, self.generation),
    2791                 :                     )]),
    2792                 :                     Some(layer),
    2793                 :                 )
    2794                 :             };
    2795                 : 
    2796                 :         // The new on-disk layers are now in the layer map. We can remove the
    2797                 :         // in-memory layer from the map now. The flushed layer is stored in
    2798                 :         // the mapping in `create_delta_layer`.
    2799                 :         {
    2800                 :             let mut guard = self.layers.write().await;
    2801                 : 
    2802                 :             if let Some(ref l) = delta_layer_to_add {
    2803                 :                 // TODO: move access stats, metrics update, etc. into layer manager.
    2804                 :                 l.access_stats().record_residence_event(
    2805                 :                     LayerResidenceStatus::Resident,
    2806                 :                     LayerResidenceEventReason::LayerCreate,
    2807                 :                 );
    2808                 : 
    2809                 :                 // update metrics
    2810                 :                 let sz = l.layer_desc().file_size;
    2811                 :                 self.metrics.record_new_file_metrics(sz);
    2812                 :             }
    2813                 : 
    2814                 :             guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer);
    2815                 :             // release lock on 'layers'
    2816                 :         }
    2817                 : 
    2818                 :         // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
    2819                 :         // a compaction can delete the file and then it won't be available for uploads any more.
    2820                 :         // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
    2821                 :         // race situation.
    2822                 :         // See https://github.com/neondatabase/neon/issues/4526
    2823            5347 :         pausable_failpoint!("flush-frozen-pausable");
    2824                 : 
    2825                 :         // This failpoint is used by another test case `test_pageserver_recovery`.
    2826 UBC           0 :         fail_point!("flush-frozen-exit");
    2827                 : 
    2828                 :         // Update the metadata file, with new 'disk_consistent_lsn'
    2829                 :         //
    2830                 :         // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
    2831                 :         // *all* the layers, to avoid fsyncing the file multiple times.
    2832                 :         let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
    2833                 :         let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
    2834                 : 
    2835                 :         // If we were able to advance 'disk_consistent_lsn', save it the metadata file.
    2836                 :         // After crash, we will restart WAL streaming and processing from that point.
    2837                 :         if disk_consistent_lsn != old_disk_consistent_lsn {
    2838                 :             assert!(disk_consistent_lsn > old_disk_consistent_lsn);
    2839                 :             self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload)
    2840                 :                 .await
    2841                 :                 .context("update_metadata_file")?;
    2842                 :             // Also update the in-memory copy
    2843                 :             self.disk_consistent_lsn.store(disk_consistent_lsn);
    2844                 :         }
    2845                 :         Ok(())
    2846                 :     }
    2847                 : 
    2848                 :     /// Update metadata file
    2849 CBC        5368 :     async fn update_metadata_file(
    2850            5368 :         &self,
    2851            5368 :         disk_consistent_lsn: Lsn,
    2852            5368 :         layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
    2853            5368 :     ) -> anyhow::Result<()> {
    2854            5368 :         // We can only save a valid 'prev_record_lsn' value on disk if we
    2855            5368 :         // flushed *all* in-memory changes to disk. We only track
    2856            5368 :         // 'prev_record_lsn' in memory for the latest processed record, so we
    2857            5368 :         // don't remember what the correct value that corresponds to some old
    2858            5368 :         // LSN is. But if we flush everything, then the value corresponding
    2859            5368 :         // current 'last_record_lsn' is correct and we can store it on disk.
    2860            5368 :         let RecordLsn {
    2861            5368 :             last: last_record_lsn,
    2862            5368 :             prev: prev_record_lsn,
    2863            5368 :         } = self.last_record_lsn.load();
    2864            5368 :         let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
    2865            1214 :             Some(prev_record_lsn)
    2866                 :         } else {
    2867            4154 :             None
    2868                 :         };
    2869                 : 
    2870            5368 :         let ancestor_timeline_id = self
    2871            5368 :             .ancestor_timeline
    2872            5368 :             .as_ref()
    2873            5368 :             .map(|ancestor| ancestor.timeline_id);
    2874            5368 : 
    2875            5368 :         let metadata = TimelineMetadata::new(
    2876            5368 :             disk_consistent_lsn,
    2877            5368 :             ondisk_prev_record_lsn,
    2878            5368 :             ancestor_timeline_id,
    2879            5368 :             self.ancestor_lsn,
    2880            5368 :             *self.latest_gc_cutoff_lsn.read(),
    2881            5368 :             self.initdb_lsn,
    2882            5368 :             self.pg_version,
    2883            5368 :         );
    2884            5368 : 
    2885            5368 :         fail_point!("checkpoint-before-saving-metadata", |x| bail!(
    2886 UBC           0 :             "{}",
    2887               0 :             x.unwrap()
    2888 CBC        5368 :         ));
    2889                 : 
    2890            5368 :         save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
    2891 UBC           0 :             .await
    2892 CBC        5368 :             .context("save_metadata")?;
    2893                 : 
    2894            5368 :         if let Some(remote_client) = &self.remote_client {
    2895           10711 :             for (path, layer_metadata) in layer_paths_to_upload {
    2896            5343 :                 remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
    2897                 :             }
    2898            5368 :             remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
    2899 UBC           0 :         }
    2900                 : 
    2901 CBC        5368 :         Ok(())
    2902            5368 :     }
    2903                 : 
    2904                 :     // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
    2905                 :     // in layer map immediately. The caller is responsible to put it into the layer map.
    2906            5312 :     async fn create_delta_layer(
    2907            5312 :         self: &Arc<Self>,
    2908            5312 :         frozen_layer: &Arc<InMemoryLayer>,
    2909            5312 :         ctx: &RequestContext,
    2910            5312 :     ) -> anyhow::Result<DeltaLayer> {
    2911            5312 :         let span = tracing::info_span!("blocking");
    2912            5312 :         let new_delta: DeltaLayer = tokio::task::spawn_blocking({
    2913            5312 :             let _g = span.entered();
    2914            5312 :             let self_clone = Arc::clone(self);
    2915            5312 :             let frozen_layer = Arc::clone(frozen_layer);
    2916            5312 :             let ctx = ctx.attached_child();
    2917            5312 :             move || {
    2918                 :                 // Write it out
    2919                 :                 // Keep this inside `spawn_blocking` and `Handle::current`
    2920                 :                 // as long as the write path is still sync and the read impl
    2921                 :                 // is still not fully async. Otherwise executor threads would
    2922                 :                 // be blocked.
    2923            5312 :                 let new_delta = Handle::current().block_on(frozen_layer.write_to_disk(&ctx))?;
    2924            5312 :                 let new_delta_path = new_delta.path();
    2925            5312 : 
    2926            5312 :                 // Sync it to disk.
    2927            5312 :                 //
    2928            5312 :                 // We must also fsync the timeline dir to ensure the directory entries for
    2929            5312 :                 // new layer files are durable.
    2930            5312 :                 //
    2931            5312 :                 // NB: timeline dir must be synced _after_ the file contents are durable.
    2932            5312 :                 // So, two separate fsyncs are required, they mustn't be batched.
    2933            5312 :                 //
    2934            5312 :                 // TODO: If we're running inside 'flush_frozen_layers' and there are multiple
    2935            5312 :                 // files to flush, the fsync overhead can be reduces as follows:
    2936            5312 :                 // 1. write them all to temporary file names
    2937            5312 :                 // 2. fsync them
    2938            5312 :                 // 3. rename to the final name
    2939            5312 :                 // 4. fsync the parent directory.
    2940            5312 :                 // Note that (1),(2),(3) today happen inside write_to_disk().
    2941            5312 :                 par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
    2942            5312 :                 par_fsync::par_fsync(&[self_clone
    2943            5312 :                     .conf
    2944            5312 :                     .timeline_path(&self_clone.tenant_id, &self_clone.timeline_id)])
    2945            5312 :                 .context("fsync of timeline dir")?;
    2946                 : 
    2947            5308 :                 anyhow::Ok(new_delta)
    2948            5312 :             }
    2949            5312 :         })
    2950            5308 :         .await
    2951            5308 :         .context("spawn_blocking")??;
    2952                 : 
    2953            5308 :         Ok(new_delta)
    2954            5308 :     }
    2955                 : 
    2956            1302 :     async fn repartition(
    2957            1302 :         &self,
    2958            1302 :         lsn: Lsn,
    2959            1302 :         partition_size: u64,
    2960            1302 :         ctx: &RequestContext,
    2961            1302 :     ) -> anyhow::Result<(KeyPartitioning, Lsn)> {
    2962            1302 :         {
    2963            1302 :             let partitioning_guard = self.partitioning.lock().unwrap();
    2964            1302 :             let distance = lsn.0 - partitioning_guard.1 .0;
    2965            1302 :             if partitioning_guard.1 != Lsn(0) && distance <= self.repartition_threshold {
    2966 UBC           0 :                 debug!(
    2967               0 :                     distance,
    2968               0 :                     threshold = self.repartition_threshold,
    2969               0 :                     "no repartitioning needed"
    2970               0 :                 );
    2971 CBC         639 :                 return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
    2972             663 :             }
    2973                 :         }
    2974          279558 :         let keyspace = self.collect_keyspace(lsn, ctx).await?;
    2975             662 :         let partitioning = keyspace.partition(partition_size);
    2976             662 : 
    2977             662 :         let mut partitioning_guard = self.partitioning.lock().unwrap();
    2978             662 :         if lsn > partitioning_guard.1 {
    2979             662 :             *partitioning_guard = (partitioning, lsn);
    2980             662 :         } else {
    2981 UBC           0 :             warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
    2982                 :         }
    2983 CBC         662 :         Ok((partitioning_guard.0.clone(), partitioning_guard.1))
    2984            1301 :     }
    2985                 : 
    2986                 :     // Is it time to create a new image layer for the given partition?
    2987           23824 :     async fn time_for_new_image_layer(
    2988           23824 :         &self,
    2989           23824 :         partition: &KeySpace,
    2990           23824 :         lsn: Lsn,
    2991           23824 :     ) -> anyhow::Result<bool> {
    2992           23824 :         let threshold = self.get_image_creation_threshold();
    2993                 : 
    2994           23824 :         let guard = self.layers.read().await;
    2995           23824 :         let layers = guard.layer_map();
    2996           23824 : 
    2997           23824 :         let mut max_deltas = 0;
    2998           23824 :         {
    2999           23824 :             let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
    3000           23824 :             if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
    3001            3653 :                 let img_range =
    3002            3653 :                     partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
    3003            3653 :                 if wanted.overlaps(&img_range) {
    3004                 :                     //
    3005                 :                     // gc_timeline only pays attention to image layers that are older than the GC cutoff,
    3006                 :                     // but create_image_layers creates image layers at last-record-lsn.
    3007                 :                     // So it's possible that gc_timeline wants a new image layer to be created for a key range,
    3008                 :                     // but the range is already covered by image layers at more recent LSNs. Before we
    3009                 :                     // create a new image layer, check if the range is already covered at more recent LSNs.
    3010 UBC           0 :                     if !layers
    3011               0 :                         .image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))?
    3012                 :                     {
    3013               0 :                         debug!(
    3014               0 :                             "Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
    3015               0 :                             img_range.start, img_range.end, cutoff_lsn, lsn
    3016               0 :                         );
    3017               0 :                         return Ok(true);
    3018               0 :                     }
    3019 CBC        3653 :                 }
    3020           20171 :             }
    3021                 :         }
    3022                 : 
    3023         1833014 :         for part_range in &partition.ranges {
    3024         1812557 :             let image_coverage = layers.image_coverage(part_range, lsn)?;
    3025         3483460 :             for (img_range, last_img) in image_coverage {
    3026         1674270 :                 let img_lsn = if let Some(last_img) = last_img {
    3027          203479 :                     last_img.get_lsn_range().end
    3028                 :                 } else {
    3029         1470791 :                     Lsn(0)
    3030                 :                 };
    3031                 :                 // Let's consider an example:
    3032                 :                 //
    3033                 :                 // delta layer with LSN range 71-81
    3034                 :                 // delta layer with LSN range 81-91
    3035                 :                 // delta layer with LSN range 91-101
    3036                 :                 // image layer at LSN 100
    3037                 :                 //
    3038                 :                 // If 'lsn' is still 100, i.e. no new WAL has been processed since the last image layer,
    3039                 :                 // there's no need to create a new one. We check this case explicitly, to avoid passing
    3040                 :                 // a bogus range to count_deltas below, with start > end. It's even possible that there
    3041                 :                 // are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
    3042                 :                 // after we read last_record_lsn, which is passed here in the 'lsn' argument.
    3043         1674270 :                 if img_lsn < lsn {
    3044         1655411 :                     let num_deltas =
    3045         1655411 :                         layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?;
    3046                 : 
    3047         1655411 :                     max_deltas = max_deltas.max(num_deltas);
    3048         1655411 :                     if num_deltas >= threshold {
    3049 UBC           0 :                         debug!(
    3050               0 :                             "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
    3051               0 :                             img_range.start, img_range.end, num_deltas, img_lsn, lsn
    3052               0 :                         );
    3053 CBC        3367 :                         return Ok(true);
    3054         1652044 :                     }
    3055           18859 :                 }
    3056                 :             }
    3057                 :         }
    3058                 : 
    3059 UBC           0 :         debug!(
    3060               0 :             max_deltas,
    3061               0 :             "none of the partitioned ranges had >= {threshold} deltas"
    3062               0 :         );
    3063 CBC       20457 :         Ok(false)
    3064           23824 :     }
    3065                 : 
    3066            1301 :     async fn create_image_layers(
    3067            1301 :         &self,
    3068            1301 :         partitioning: &KeyPartitioning,
    3069            1301 :         lsn: Lsn,
    3070            1301 :         force: bool,
    3071            1301 :         ctx: &RequestContext,
    3072            1301 :     ) -> Result<HashMap<LayerFileName, LayerFileMetadata>, PageReconstructError> {
    3073            1301 :         let timer = self.metrics.create_images_time_histo.start_timer();
    3074            1301 :         let mut image_layers: Vec<ImageLayer> = Vec::new();
    3075            1301 : 
    3076            1301 :         // We need to avoid holes between generated image layers.
    3077            1301 :         // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
    3078            1301 :         // image layer with hole between them. In this case such layer can not be utilized by GC.
    3079            1301 :         //
    3080            1301 :         // How such hole between partitions can appear?
    3081            1301 :         // if we have relation with relid=1 and size 100 and relation with relid=2 with size 200 then result of
    3082            1301 :         // KeySpace::partition may contain partitions <100000000..100000099> and <200000000..200000199>.
    3083            1301 :         // If there is delta layer <100000000..300000000> then it never be garbage collected because
    3084            1301 :         // image layers  <100000000..100000099> and <200000000..200000199> are not completely covering it.
    3085            1301 :         let mut start = Key::MIN;
    3086                 : 
    3087           23863 :         for partition in partitioning.parts.iter() {
    3088           23863 :             let img_range = start..partition.ranges.last().unwrap().end;
    3089           23863 :             start = img_range.end;
    3090           23863 :             if force || self.time_for_new_image_layer(partition, lsn).await? {
    3091            3406 :                 let mut image_layer_writer = ImageLayerWriter::new(
    3092            3406 :                     self.conf,
    3093            3406 :                     self.timeline_id,
    3094            3406 :                     self.tenant_id,
    3095            3406 :                     &img_range,
    3096            3406 :                     lsn,
    3097            3406 :                 )
    3098 UBC           0 :                 .await?;
    3099                 : 
    3100 CBC        3406 :                 fail_point!("image-layer-writer-fail-before-finish", |_| {
    3101 UBC           0 :                     Err(PageReconstructError::Other(anyhow::anyhow!(
    3102               0 :                         "failpoint image-layer-writer-fail-before-finish"
    3103               0 :                     )))
    3104 CBC        3406 :                 });
    3105           57844 :                 for range in &partition.ranges {
    3106           54442 :                     let mut key = range.start;
    3107          278564 :                     while key < range.end {
    3108          740114 :                         let img = match self.get(key, lsn, ctx).await {
    3109          224122 :                             Ok(img) => img,
    3110 UBC           0 :                             Err(err) => {
    3111               0 :                                 // If we fail to reconstruct a VM or FSM page, we can zero the
    3112               0 :                                 // page without losing any actual user data. That seems better
    3113               0 :                                 // than failing repeatedly and getting stuck.
    3114               0 :                                 //
    3115               0 :                                 // We had a bug at one point, where we truncated the FSM and VM
    3116               0 :                                 // in the pageserver, but the Postgres didn't know about that
    3117               0 :                                 // and continued to generate incremental WAL records for pages
    3118               0 :                                 // that didn't exist in the pageserver. Trying to replay those
    3119               0 :                                 // WAL records failed to find the previous image of the page.
    3120               0 :                                 // This special case allows us to recover from that situation.
    3121               0 :                                 // See https://github.com/neondatabase/neon/issues/2601.
    3122               0 :                                 //
    3123               0 :                                 // Unfortunately we cannot do this for the main fork, or for
    3124               0 :                                 // any metadata keys, keys, as that would lead to actual data
    3125               0 :                                 // loss.
    3126               0 :                                 if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
    3127               0 :                                     warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
    3128               0 :                                     ZERO_PAGE.clone()
    3129                 :                                 } else {
    3130               0 :                                     return Err(err);
    3131                 :                                 }
    3132                 :                             }
    3133                 :                         };
    3134 CBC      224122 :                         image_layer_writer.put_image(key, &img).await?;
    3135          224122 :                         key = key.next();
    3136                 :                     }
    3137                 :                 }
    3138            3402 :                 let image_layer = image_layer_writer.finish().await?;
    3139            3402 :                 image_layers.push(image_layer);
    3140           20457 :             }
    3141                 :         }
    3142                 :         // All layers that the GC wanted us to create have now been created.
    3143                 :         //
    3144                 :         // It's possible that another GC cycle happened while we were compacting, and added
    3145                 :         // something new to wanted_image_layers, and we now clear that before processing it.
    3146                 :         // That's OK, because the next GC iteration will put it back in.
    3147            1297 :         *self.wanted_image_layers.lock().unwrap() = None;
    3148            1297 : 
    3149            1297 :         // Sync the new layer to disk before adding it to the layer map, to make sure
    3150            1297 :         // we don't garbage collect something based on the new layer, before it has
    3151            1297 :         // reached the disk.
    3152            1297 :         //
    3153            1297 :         // We must also fsync the timeline dir to ensure the directory entries for
    3154            1297 :         // new layer files are durable
    3155            1297 :         //
    3156            1297 :         // Compaction creates multiple image layers. It would be better to create them all
    3157            1297 :         // and fsync them all in parallel.
    3158            1297 :         let all_paths = image_layers
    3159            1297 :             .iter()
    3160            3402 :             .map(|layer| layer.path())
    3161            1297 :             .collect::<Vec<_>>();
    3162            1297 : 
    3163            1297 :         par_fsync::par_fsync_async(&all_paths)
    3164             208 :             .await
    3165            1297 :             .context("fsync of newly created layer files")?;
    3166                 : 
    3167            1297 :         par_fsync::par_fsync_async(&[self.conf.timeline_path(&self.tenant_id, &self.timeline_id)])
    3168            1299 :             .await
    3169            1296 :             .context("fsync of timeline dir")?;
    3170                 : 
    3171            1296 :         let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
    3172                 : 
    3173            1296 :         let mut guard = self.layers.write().await;
    3174            1296 :         let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id);
    3175                 : 
    3176            4698 :         for l in &image_layers {
    3177            3402 :             let path = l.filename();
    3178            3402 :             let metadata = timeline_path
    3179            3402 :                 .join(path.file_name())
    3180            3402 :                 .metadata()
    3181            3402 :                 .with_context(|| format!("reading metadata of layer file {}", path.file_name()))?;
    3182                 : 
    3183            3402 :             layer_paths_to_upload.insert(
    3184            3402 :                 path,
    3185            3402 :                 LayerFileMetadata::new(metadata.len(), self.generation),
    3186            3402 :             );
    3187            3402 : 
    3188            3402 :             // update metrics
    3189            3402 :             self.metrics.record_new_file_metrics(metadata.len());
    3190            3402 :             let l = Arc::new(l);
    3191            3402 :             l.access_stats().record_residence_event(
    3192            3402 :                 LayerResidenceStatus::Resident,
    3193            3402 :                 LayerResidenceEventReason::LayerCreate,
    3194            3402 :             );
    3195                 :         }
    3196            1296 :         guard.track_new_image_layers(image_layers);
    3197            1296 :         drop_wlock(guard);
    3198            1296 :         timer.stop_and_record();
    3199            1296 : 
    3200            1296 :         Ok(layer_paths_to_upload)
    3201            1296 :     }
    3202                 : }
    3203                 : 
    3204             948 : #[derive(Default)]
    3205                 : struct CompactLevel0Phase1Result {
    3206                 :     new_layers: Vec<Arc<DeltaLayer>>,
    3207                 :     deltas_to_compact: Vec<Arc<PersistentLayerDesc>>,
    3208                 : }
    3209                 : 
    3210                 : /// Top-level failure to compact.
    3211 UBC           0 : #[derive(Debug)]
    3212                 : enum CompactionError {
    3213                 :     /// L0 compaction requires layers to be downloaded.
    3214                 :     ///
    3215                 :     /// This should not happen repeatedly, but will be retried once by top-level
    3216                 :     /// `Timeline::compact`.
    3217                 :     DownloadRequired(Vec<Arc<RemoteLayer>>),
    3218                 :     /// The timeline or pageserver is shutting down
    3219                 :     ShuttingDown,
    3220                 :     /// Compaction cannot be done right now; page reconstruction and so on.
    3221                 :     Other(anyhow::Error),
    3222                 : }
    3223                 : 
    3224                 : impl From<anyhow::Error> for CompactionError {
    3225 CBC           1 :     fn from(value: anyhow::Error) -> Self {
    3226               1 :         CompactionError::Other(value)
    3227               1 :     }
    3228                 : }
    3229                 : 
    3230                 : #[serde_as]
    3231            4242 : #[derive(serde::Serialize)]
    3232                 : struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
    3233                 : 
    3234            8799 : #[derive(Default)]
    3235                 : enum DurationRecorder {
    3236                 :     #[default]
    3237                 :     NotStarted,
    3238                 :     Recorded(RecordedDuration, tokio::time::Instant),
    3239                 : }
    3240                 : 
    3241                 : impl DurationRecorder {
    3242            2784 :     pub fn till_now(&self) -> DurationRecorder {
    3243            2784 :         match self {
    3244                 :             DurationRecorder::NotStarted => {
    3245 UBC           0 :                 panic!("must only call on recorded measurements")
    3246                 :             }
    3247 CBC        2784 :             DurationRecorder::Recorded(_, ended) => {
    3248            2784 :                 let now = tokio::time::Instant::now();
    3249            2784 :                 DurationRecorder::Recorded(RecordedDuration(now - *ended), now)
    3250            2784 :             }
    3251            2784 :         }
    3252            2784 :     }
    3253            2121 :     pub fn into_recorded(self) -> Option<RecordedDuration> {
    3254            2121 :         match self {
    3255 UBC           0 :             DurationRecorder::NotStarted => None,
    3256 CBC        2121 :             DurationRecorder::Recorded(recorded, _) => Some(recorded),
    3257                 :         }
    3258            2121 :     }
    3259                 : }
    3260                 : 
    3261            1257 : #[derive(Default)]
    3262                 : struct CompactLevel0Phase1StatsBuilder {
    3263                 :     version: Option<u64>,
    3264                 :     tenant_id: Option<TenantId>,
    3265                 :     timeline_id: Option<TimelineId>,
    3266                 :     read_lock_acquisition_micros: DurationRecorder,
    3267                 :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    3268                 :     read_lock_held_key_sort_micros: DurationRecorder,
    3269                 :     read_lock_held_prerequisites_micros: DurationRecorder,
    3270                 :     read_lock_held_compute_holes_micros: DurationRecorder,
    3271                 :     read_lock_drop_micros: DurationRecorder,
    3272                 :     write_layer_files_micros: DurationRecorder,
    3273                 :     level0_deltas_count: Option<usize>,
    3274                 :     new_deltas_count: Option<usize>,
    3275                 :     new_deltas_size: Option<u64>,
    3276                 : }
    3277                 : 
    3278                 : #[serde_as]
    3279             303 : #[derive(serde::Serialize)]
    3280                 : struct CompactLevel0Phase1Stats {
    3281                 :     version: u64,
    3282                 :     #[serde_as(as = "serde_with::DisplayFromStr")]
    3283                 :     tenant_id: TenantId,
    3284                 :     #[serde_as(as = "serde_with::DisplayFromStr")]
    3285                 :     timeline_id: TimelineId,
    3286                 :     read_lock_acquisition_micros: RecordedDuration,
    3287                 :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    3288                 :     read_lock_held_key_sort_micros: RecordedDuration,
    3289                 :     read_lock_held_prerequisites_micros: RecordedDuration,
    3290                 :     read_lock_held_compute_holes_micros: RecordedDuration,
    3291                 :     read_lock_drop_micros: RecordedDuration,
    3292                 :     write_layer_files_micros: RecordedDuration,
    3293                 :     level0_deltas_count: usize,
    3294                 :     new_deltas_count: usize,
    3295                 :     new_deltas_size: u64,
    3296                 : }
    3297                 : 
    3298                 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    3299                 :     type Error = anyhow::Error;
    3300                 : 
    3301             303 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    3302             303 :         Ok(Self {
    3303             303 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    3304             303 :             tenant_id: value
    3305             303 :                 .tenant_id
    3306             303 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    3307             303 :             timeline_id: value
    3308             303 :                 .timeline_id
    3309             303 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    3310             303 :             read_lock_acquisition_micros: value
    3311             303 :                 .read_lock_acquisition_micros
    3312             303 :                 .into_recorded()
    3313             303 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    3314             303 :             read_lock_held_spawn_blocking_startup_micros: value
    3315             303 :                 .read_lock_held_spawn_blocking_startup_micros
    3316             303 :                 .into_recorded()
    3317             303 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    3318             303 :             read_lock_held_key_sort_micros: value
    3319             303 :                 .read_lock_held_key_sort_micros
    3320             303 :                 .into_recorded()
    3321             303 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    3322             303 :             read_lock_held_prerequisites_micros: value
    3323             303 :                 .read_lock_held_prerequisites_micros
    3324             303 :                 .into_recorded()
    3325             303 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    3326             303 :             read_lock_held_compute_holes_micros: value
    3327             303 :                 .read_lock_held_compute_holes_micros
    3328             303 :                 .into_recorded()
    3329             303 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    3330             303 :             read_lock_drop_micros: value
    3331             303 :                 .read_lock_drop_micros
    3332             303 :                 .into_recorded()
    3333             303 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    3334             303 :             write_layer_files_micros: value
    3335             303 :                 .write_layer_files_micros
    3336             303 :                 .into_recorded()
    3337             303 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    3338             303 :             level0_deltas_count: value
    3339             303 :                 .level0_deltas_count
    3340             303 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    3341             303 :             new_deltas_count: value
    3342             303 :                 .new_deltas_count
    3343             303 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    3344             303 :             new_deltas_size: value
    3345             303 :                 .new_deltas_size
    3346             303 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    3347                 :         })
    3348             303 :     }
    3349                 : }
    3350                 : 
    3351                 : impl Timeline {
    3352                 :     /// Level0 files first phase of compaction, explained in the [`compact_inner`] comment.
    3353                 :     ///
    3354                 :     /// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
    3355                 :     /// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
    3356                 :     /// start of level0 files compaction, the on-demand download should be revisited as well.
    3357                 :     ///
    3358                 :     /// [`compact_inner`]: Self::compact_inner
    3359            1257 :     async fn compact_level0_phase1(
    3360            1257 :         self: &Arc<Self>,
    3361            1257 :         _layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
    3362            1257 :         guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
    3363            1257 :         mut stats: CompactLevel0Phase1StatsBuilder,
    3364            1257 :         target_file_size: u64,
    3365            1257 :         ctx: &RequestContext,
    3366            1257 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
    3367            1257 :         stats.read_lock_held_spawn_blocking_startup_micros =
    3368            1257 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
    3369            1257 :         let layers = guard.layer_map();
    3370            1257 :         let level0_deltas = layers.get_level0_deltas()?;
    3371            1257 :         let mut level0_deltas = level0_deltas
    3372            1257 :             .into_iter()
    3373            6666 :             .map(|x| guard.get_from_desc(&x))
    3374            1257 :             .collect_vec();
    3375            1257 :         stats.level0_deltas_count = Some(level0_deltas.len());
    3376            1257 :         // Only compact if enough layers have accumulated.
    3377            1257 :         let threshold = self.get_compaction_threshold();
    3378            1257 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
    3379 UBC           0 :             debug!(
    3380               0 :                 level0_deltas = level0_deltas.len(),
    3381               0 :                 threshold, "too few deltas to compact"
    3382               0 :             );
    3383 CBC         948 :             return Ok(CompactLevel0Phase1Result::default());
    3384             309 :         }
    3385             309 : 
    3386             309 :         // This failpoint is used together with `test_duplicate_layers` integration test.
    3387             309 :         // It returns the compaction result exactly the same layers as input to compaction.
    3388             309 :         // We want to ensure that this will not cause any problem when updating the layer map
    3389             309 :         // after the compaction is finished.
    3390             309 :         //
    3391             309 :         // Currently, there are two rare edge cases that will cause duplicated layers being
    3392             309 :         // inserted.
    3393             309 :         // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
    3394             309 :         //    is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
    3395             309 :         //    map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
    3396             309 :         //    point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
    3397             309 :         //    and this causes an overwrite. This is acceptable because the content is the same, and we should do a
    3398             309 :         //    layer replace instead of the normal remove / upload process.
    3399             309 :         // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
    3400             309 :         //    size length. Compaction will likely create the same set of n files afterwards.
    3401             309 :         //
    3402             309 :         // This failpoint is a superset of both of the cases.
    3403             309 :         fail_point!("compact-level0-phase1-return-same", |_| {
    3404               2 :             println!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
    3405               2 :             Ok(CompactLevel0Phase1Result {
    3406               2 :                 new_layers: level0_deltas
    3407               2 :                     .iter()
    3408              28 :                     .map(|x| x.clone().downcast_delta_layer().unwrap())
    3409               2 :                     .collect(),
    3410               2 :                 deltas_to_compact: level0_deltas
    3411               2 :                     .iter()
    3412              28 :                     .map(|x| x.layer_desc().clone().into())
    3413               2 :                     .collect(),
    3414               2 :             })
    3415             309 :         });
    3416                 : 
    3417                 :         // Gather the files to compact in this iteration.
    3418                 :         //
    3419                 :         // Start with the oldest Level 0 delta file, and collect any other
    3420                 :         // level 0 files that form a contiguous sequence, such that the end
    3421                 :         // LSN of previous file matches the start LSN of the next file.
    3422                 :         //
    3423                 :         // Note that if the files don't form such a sequence, we might
    3424                 :         // "compact" just a single file. That's a bit pointless, but it allows
    3425                 :         // us to get rid of the level 0 file, and compact the other files on
    3426                 :         // the next iteration. This could probably made smarter, but such
    3427                 :         // "gaps" in the sequence of level 0 files should only happen in case
    3428                 :         // of a crash, partial download from cloud storage, or something like
    3429                 :         // that, so it's not a big deal in practice.
    3430            9422 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
    3431             307 :         let mut level0_deltas_iter = level0_deltas.iter();
    3432             307 : 
    3433             307 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
    3434             307 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
    3435             307 :         let mut deltas_to_compact = vec![Arc::clone(first_level0_delta)];
    3436            4336 :         for l in level0_deltas_iter {
    3437            4029 :             let lsn_range = &l.layer_desc().lsn_range;
    3438            4029 : 
    3439            4029 :             if lsn_range.start != prev_lsn_end {
    3440 UBC           0 :                 break;
    3441 CBC        4029 :             }
    3442            4029 :             deltas_to_compact.push(Arc::clone(l));
    3443            4029 :             prev_lsn_end = lsn_range.end;
    3444                 :         }
    3445             307 :         let lsn_range = Range {
    3446             307 :             start: deltas_to_compact
    3447             307 :                 .first()
    3448             307 :                 .unwrap()
    3449             307 :                 .layer_desc()
    3450             307 :                 .lsn_range
    3451             307 :                 .start,
    3452             307 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
    3453             307 :         };
    3454             307 : 
    3455             307 :         let remotes = deltas_to_compact
    3456             307 :             .iter()
    3457            4336 :             .filter(|l| l.is_remote_layer())
    3458             307 :             .inspect(|l| info!("compact requires download of {l}"))
    3459             307 :             .map(|l| {
    3460               3 :                 l.clone()
    3461               3 :                     .downcast_remote_layer()
    3462               3 :                     .expect("just checked it is remote layer")
    3463             307 :             })
    3464             307 :             .collect::<Vec<_>>();
    3465             307 : 
    3466             307 :         if !remotes.is_empty() {
    3467                 :             // caller is holding the lock to layer_removal_cs, and we don't want to download while
    3468                 :             // holding that; in future download_remote_layer might take it as well. this is
    3469                 :             // regardless of earlier image creation downloading on-demand, while holding the lock.
    3470               1 :             return Err(CompactionError::DownloadRequired(remotes));
    3471             306 :         }
    3472                 : 
    3473             306 :         info!(
    3474             306 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
    3475             306 :             lsn_range.start,
    3476             306 :             lsn_range.end,
    3477             306 :             deltas_to_compact.len(),
    3478             306 :             level0_deltas.len()
    3479             306 :         );
    3480                 : 
    3481            4333 :         for l in deltas_to_compact.iter() {
    3482            4333 :             info!("compact includes {l}");
    3483                 :         }
    3484                 : 
    3485                 :         // We don't need the original list of layers anymore. Drop it so that
    3486                 :         // we don't accidentally use it later in the function.
    3487             306 :         drop(level0_deltas);
    3488             306 : 
    3489             306 :         stats.read_lock_held_prerequisites_micros = stats
    3490             306 :             .read_lock_held_spawn_blocking_startup_micros
    3491             306 :             .till_now();
    3492             306 : 
    3493             306 :         // Determine N largest holes where N is number of compacted layers.
    3494             306 :         let max_holes = deltas_to_compact.len();
    3495             306 :         let last_record_lsn = self.get_last_record_lsn();
    3496             306 :         let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
    3497             306 :         let min_hole_coverage_size = 3; // TODO: something more flexible?
    3498             306 : 
    3499             306 :         // min-heap (reserve space for one more element added before eviction)
    3500             306 :         let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
    3501             306 :         let mut prev: Option<Key> = None;
    3502             306 : 
    3503             306 :         let mut all_keys = Vec::new();
    3504             306 : 
    3505             306 :         let downcast_deltas: Vec<_> = deltas_to_compact
    3506             306 :             .iter()
    3507            4333 :             .map(|l| l.clone().downcast_delta_layer().expect("delta layer"))
    3508             306 :             .collect();
    3509            4333 :         for dl in downcast_deltas.iter() {
    3510                 :             // TODO: replace this with an await once we fully go async
    3511            4333 :             all_keys.extend(DeltaLayer::load_keys(dl, ctx).await?);
    3512                 :         }
    3513                 : 
    3514                 :         // The current stdlib sorting implementation is designed in a way where it is
    3515                 :         // particularly fast where the slice is made up of sorted sub-ranges.
    3516       303973472 :         all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    3517             306 : 
    3518             306 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
    3519                 : 
    3520        30235442 :         for DeltaEntry { key: next_key, .. } in all_keys.iter() {
    3521        30235442 :             let next_key = *next_key;
    3522        30235442 :             if let Some(prev_key) = prev {
    3523                 :                 // just first fast filter
    3524        30235136 :                 if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
    3525          207342 :                     let key_range = prev_key..next_key;
    3526                 :                     // Measuring hole by just subtraction of i128 representation of key range boundaries
    3527                 :                     // has not so much sense, because largest holes will corresponds field1/field2 changes.
    3528                 :                     // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
    3529                 :                     // That is why it is better to measure size of hole as number of covering image layers.
    3530          207342 :                     let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
    3531          207342 :                     if coverage_size >= min_hole_coverage_size {
    3532              68 :                         heap.push(Hole {
    3533              68 :                             key_range,
    3534              68 :                             coverage_size,
    3535              68 :                         });
    3536              68 :                         if heap.len() > max_holes {
    3537              25 :                             heap.pop(); // remove smallest hole
    3538              43 :                         }
    3539          207274 :                     }
    3540        30027794 :                 }
    3541             306 :             }
    3542        30235442 :             prev = Some(next_key.next());
    3543                 :         }
    3544             306 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
    3545             306 :         drop_rlock(guard);
    3546             306 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
    3547             306 :         let mut holes = heap.into_vec();
    3548             306 :         holes.sort_unstable_by_key(|hole| hole.key_range.start);
    3549             306 :         let mut next_hole = 0; // index of next hole in holes vector
    3550             306 : 
    3551             306 :         // This iterator walks through all key-value pairs from all the layers
    3552             306 :         // we're compacting, in key, LSN order.
    3553             306 :         let all_values_iter = all_keys.iter();
    3554             306 : 
    3555             306 :         // This iterator walks through all keys and is needed to calculate size used by each key
    3556             306 :         let mut all_keys_iter = all_keys
    3557             306 :             .iter()
    3558        29620953 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
    3559        29620647 :             .coalesce(|mut prev, cur| {
    3560        29620647 :                 // Coalesce keys that belong to the same key pair.
    3561        29620647 :                 // This ensures that compaction doesn't put them
    3562        29620647 :                 // into different layer files.
    3563        29620647 :                 // Still limit this by the target file size,
    3564        29620647 :                 // so that we keep the size of the files in
    3565        29620647 :                 // check.
    3566        29620647 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
    3567        27749940 :                     prev.2 += cur.2;
    3568        27749940 :                     Ok(prev)
    3569                 :                 } else {
    3570         1870707 :                     Err((prev, cur))
    3571                 :                 }
    3572        29620647 :             });
    3573             306 : 
    3574             306 :         // Merge the contents of all the input delta layers into a new set
    3575             306 :         // of delta layers, based on the current partitioning.
    3576             306 :         //
    3577             306 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
    3578             306 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
    3579             306 :         // would be too large. In that case, we also split on the LSN dimension.
    3580             306 :         //
    3581             306 :         // LSN
    3582             306 :         //  ^
    3583             306 :         //  |
    3584             306 :         //  | +-----------+            +--+--+--+--+
    3585             306 :         //  | |           |            |  |  |  |  |
    3586             306 :         //  | +-----------+            |  |  |  |  |
    3587             306 :         //  | |           |            |  |  |  |  |
    3588             306 :         //  | +-----------+     ==>    |  |  |  |  |
    3589             306 :         //  | |           |            |  |  |  |  |
    3590             306 :         //  | +-----------+            |  |  |  |  |
    3591             306 :         //  | |           |            |  |  |  |  |
    3592             306 :         //  | +-----------+            +--+--+--+--+
    3593             306 :         //  |
    3594             306 :         //  +--------------> key
    3595             306 :         //
    3596             306 :         //
    3597             306 :         // If one key (X) has a lot of page versions:
    3598             306 :         //
    3599             306 :         // LSN
    3600             306 :         //  ^
    3601             306 :         //  |                                 (X)
    3602             306 :         //  | +-----------+            +--+--+--+--+
    3603             306 :         //  | |           |            |  |  |  |  |
    3604             306 :         //  | +-----------+            |  |  +--+  |
    3605             306 :         //  | |           |            |  |  |  |  |
    3606             306 :         //  | +-----------+     ==>    |  |  |  |  |
    3607             306 :         //  | |           |            |  |  +--+  |
    3608             306 :         //  | +-----------+            |  |  |  |  |
    3609             306 :         //  | |           |            |  |  |  |  |
    3610             306 :         //  | +-----------+            +--+--+--+--+
    3611             306 :         //  |
    3612             306 :         //  +--------------> key
    3613             306 :         // TODO: this actually divides the layers into fixed-size chunks, not
    3614             306 :         // based on the partitioning.
    3615             306 :         //
    3616             306 :         // TODO: we should also opportunistically materialize and
    3617             306 :         // garbage collect what we can.
    3618             306 :         let mut new_layers = Vec::new();
    3619             306 :         let mut prev_key: Option<Key> = None;
    3620             306 :         let mut writer: Option<DeltaLayerWriter> = None;
    3621             306 :         let mut key_values_total_size = 0u64;
    3622             306 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
    3623             306 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
    3624                 : 
    3625                 :         for &DeltaEntry {
    3626        29577812 :             key, lsn, ref val, ..
    3627        29578115 :         } in all_values_iter
    3628                 :         {
    3629        29577812 :             let value = val.load(ctx).await?;
    3630        29577809 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
    3631        29577809 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    3632        29577809 :             if !same_key || lsn == dup_end_lsn {
    3633         1871007 :                 let mut next_key_size = 0u64;
    3634         1871007 :                 let is_dup_layer = dup_end_lsn.is_valid();
    3635         1871007 :                 dup_start_lsn = Lsn::INVALID;
    3636         1871007 :                 if !same_key {
    3637         1870949 :                     dup_end_lsn = Lsn::INVALID;
    3638         1870949 :                 }
    3639                 :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    3640         1871010 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    3641         1871010 :                     next_key_size = next_size;
    3642         1871010 :                     if key != next_key {
    3643         1870646 :                         if dup_end_lsn.is_valid() {
    3644              18 :                             // We are writting segment with duplicates:
    3645              18 :                             // place all remaining values of this key in separate segment
    3646              18 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    3647              18 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    3648         1870628 :                         }
    3649         1870646 :                         break;
    3650             364 :                     }
    3651             364 :                     key_values_total_size += next_size;
    3652             364 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    3653             364 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    3654             364 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    3655                 :                         // Split key between multiple layers: such layer can contain only single key
    3656              58 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    3657              40 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    3658                 :                         } else {
    3659              18 :                             lsn // start with the first LSN for this key
    3660                 :                         };
    3661              58 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    3662              58 :                         break;
    3663             306 :                     }
    3664                 :                 }
    3665                 :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    3666         1871007 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    3667 UBC           0 :                     dup_start_lsn = dup_end_lsn;
    3668               0 :                     dup_end_lsn = lsn_range.end;
    3669 CBC     1871007 :                 }
    3670         1871007 :                 if writer.is_some() {
    3671         1870701 :                     let written_size = writer.as_mut().unwrap().size();
    3672         1870701 :                     let contains_hole =
    3673         1870701 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    3674                 :                     // check if key cause layer overflow or contains hole...
    3675         1870701 :                     if is_dup_layer
    3676         1870625 :                         || dup_end_lsn.is_valid()
    3677         1870611 :                         || written_size + key_values_total_size > target_file_size
    3678         1860632 :                         || contains_hole
    3679                 :                     {
    3680                 :                         // ... if so, flush previous layer and prepare to write new one
    3681           10110 :                         new_layers.push(Arc::new(
    3682           10110 :                             writer
    3683           10110 :                                 .take()
    3684           10110 :                                 .unwrap()
    3685           10110 :                                 .finish(prev_key.unwrap().next())
    3686 UBC           0 :                                 .await?,
    3687                 :                         ));
    3688 CBC       10110 :                         writer = None;
    3689           10110 : 
    3690           10110 :                         if contains_hole {
    3691              43 :                             // skip hole
    3692              43 :                             next_hole += 1;
    3693           10067 :                         }
    3694         1860591 :                     }
    3695             306 :                 }
    3696                 :                 // Remember size of key value because at next iteration we will access next item
    3697         1871007 :                 key_values_total_size = next_key_size;
    3698        27706802 :             }
    3699        29577809 :             if writer.is_none() {
    3700           10416 :                 // Create writer if not initiaized yet
    3701           10416 :                 writer = Some(
    3702                 :                     DeltaLayerWriter::new(
    3703           10416 :                         self.conf,
    3704           10416 :                         self.timeline_id,
    3705           10416 :                         self.tenant_id,
    3706           10416 :                         key,
    3707           10416 :                         if dup_end_lsn.is_valid() {
    3708                 :                             // this is a layer containing slice of values of the same key
    3709 UBC           0 :                             debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    3710 CBC          76 :                             dup_start_lsn..dup_end_lsn
    3711                 :                         } else {
    3712 UBC           0 :                             debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    3713 CBC       10340 :                             lsn_range.clone()
    3714                 :                         },
    3715                 :                     )
    3716 UBC           0 :                     .await?,
    3717                 :                 );
    3718 CBC    29567393 :             }
    3719                 : 
    3720        29577809 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    3721 UBC           0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    3722               0 :                     "failpoint delta-layer-writer-fail-before-finish"
    3723               0 :                 )))
    3724 CBC    29577809 :             });
    3725                 : 
    3726        29577809 :             writer.as_mut().unwrap().put_value(key, lsn, value).await?;
    3727                 : 
    3728        29577809 :             if !new_layers.is_empty() {
    3729        26394245 :                 fail_point!("after-timeline-compacted-first-L1");
    3730        26394245 :             }
    3731                 : 
    3732        29577809 :             prev_key = Some(key);
    3733                 :         }
    3734             303 :         if let Some(writer) = writer {
    3735             303 :             new_layers.push(Arc::new(writer.finish(prev_key.unwrap().next()).await?));
    3736 UBC           0 :         }
    3737                 : 
    3738                 :         // Sync layers
    3739 CBC         303 :         if !new_layers.is_empty() {
    3740                 :             // Print a warning if the created layer is larger than double the target size
    3741                 :             // Add two pages for potential overhead. This should in theory be already
    3742                 :             // accounted for in the target calculation, but for very small targets,
    3743                 :             // we still might easily hit the limit otherwise.
    3744             303 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    3745           10397 :             for layer in new_layers.iter() {
    3746           10397 :                 if layer.layer_desc().file_size > warn_limit {
    3747 UBC           0 :                     warn!(
    3748               0 :                         %layer,
    3749               0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    3750               0 :                     );
    3751 CBC       10397 :                 }
    3752                 :             }
    3753           10397 :             let mut layer_paths: Vec<Utf8PathBuf> = new_layers.iter().map(|l| l.path()).collect();
    3754             303 : 
    3755             303 :             // Fsync all the layer files and directory using multiple threads to
    3756             303 :             // minimize latency.
    3757             303 :             par_fsync::par_fsync(&layer_paths).context("fsync all new layers")?;
    3758                 : 
    3759             303 :             par_fsync::par_fsync(&[self.conf.timeline_path(&self.tenant_id, &self.timeline_id)])
    3760             303 :                 .context("fsync of timeline dir")?;
    3761                 : 
    3762             303 :             layer_paths.pop().unwrap();
    3763 UBC           0 :         }
    3764                 : 
    3765 CBC         303 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    3766             303 :         stats.new_deltas_count = Some(new_layers.len());
    3767           10397 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    3768             303 : 
    3769             303 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    3770             303 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    3771                 :         {
    3772             303 :             Ok(stats_json) => {
    3773             303 :                 info!(
    3774             303 :                     stats_json = stats_json.as_str(),
    3775             303 :                     "compact_level0_phase1 stats available"
    3776             303 :                 )
    3777                 :             }
    3778 UBC           0 :             Err(e) => {
    3779               0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    3780                 :             }
    3781                 :         }
    3782                 : 
    3783 CBC         303 :         Ok(CompactLevel0Phase1Result {
    3784             303 :             new_layers,
    3785             303 :             deltas_to_compact: deltas_to_compact
    3786             303 :                 .into_iter()
    3787            4289 :                 .map(|x| Arc::new(x.layer_desc().clone()))
    3788             303 :                 .collect(),
    3789             303 :         })
    3790            1254 :     }
    3791                 : 
    3792                 :     ///
    3793                 :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
    3794                 :     /// as Level 1 files.
    3795                 :     ///
    3796            1257 :     async fn compact_level0(
    3797            1257 :         self: &Arc<Self>,
    3798            1257 :         layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
    3799            1257 :         target_file_size: u64,
    3800            1257 :         ctx: &RequestContext,
    3801            1257 :     ) -> Result<(), CompactionError> {
    3802                 :         let CompactLevel0Phase1Result {
    3803            1253 :             new_layers,
    3804            1253 :             deltas_to_compact,
    3805                 :         } = {
    3806            1257 :             let phase1_span = info_span!("compact_level0_phase1");
    3807            1257 :             let ctx = ctx.attached_child();
    3808            1257 :             let mut stats = CompactLevel0Phase1StatsBuilder {
    3809            1257 :                 version: Some(2),
    3810            1257 :                 tenant_id: Some(self.tenant_id),
    3811            1257 :                 timeline_id: Some(self.timeline_id),
    3812            1257 :                 ..Default::default()
    3813            1257 :             };
    3814            1257 : 
    3815            1257 :             let begin = tokio::time::Instant::now();
    3816            1257 :             let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
    3817            1257 :             let now = tokio::time::Instant::now();
    3818            1257 :             stats.read_lock_acquisition_micros =
    3819            1257 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
    3820            1257 :             let layer_removal_cs = layer_removal_cs.clone();
    3821            1257 :             self.compact_level0_phase1(
    3822            1257 :                 layer_removal_cs,
    3823            1257 :                 phase1_layers_locked,
    3824            1257 :                 stats,
    3825            1257 :                 target_file_size,
    3826            1257 :                 &ctx,
    3827            1257 :             )
    3828            1257 :             .instrument(phase1_span)
    3829          477355 :             .await?
    3830                 :         };
    3831                 : 
    3832            1253 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
    3833                 :             // nothing to do
    3834             948 :             return Ok(());
    3835             305 :         }
    3836                 : 
    3837                 :         // Before deleting any layers, we need to wait for their upload ops to finish.
    3838                 :         // See remote_timeline_client module level comment on consistency.
    3839                 :         // Do it here because we don't want to hold self.layers.write() while waiting.
    3840             305 :         if let Some(remote_client) = &self.remote_client {
    3841 UBC           0 :             debug!("waiting for upload ops to complete");
    3842 CBC         305 :             remote_client
    3843             305 :                 .wait_completion()
    3844              66 :                 .await
    3845             305 :                 .context("wait for layer upload ops to complete")?;
    3846 UBC           0 :         }
    3847                 : 
    3848 CBC         304 :         let mut guard = self.layers.write().await;
    3849             304 :         let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
    3850             304 : 
    3851             304 :         // In some rare cases, we may generate a file with exactly the same key range / LSN as before the compaction.
    3852             304 :         // We should move to numbering the layer files instead of naming them using key range / LSN some day. But for
    3853             304 :         // now, we just skip the file to avoid unintentional modification to files on the disk and in the layer map.
    3854             304 :         let mut duplicated_layers = HashSet::new();
    3855             304 : 
    3856             304 :         let mut insert_layers = Vec::new();
    3857             304 :         let mut remove_layers = Vec::new();
    3858                 : 
    3859           10194 :         for l in new_layers {
    3860            9890 :             let new_delta_path = l.path();
    3861                 : 
    3862            9890 :             let metadata = new_delta_path.metadata().with_context(|| {
    3863 UBC           0 :                 format!("read file metadata for new created layer {new_delta_path}")
    3864 CBC        9890 :             })?;
    3865                 : 
    3866            9890 :             if let Some(remote_client) = &self.remote_client {
    3867            9890 :                 remote_client.schedule_layer_file_upload(
    3868            9890 :                     &l.filename(),
    3869            9890 :                     &LayerFileMetadata::new(metadata.len(), self.generation),
    3870            9890 :                 )?;
    3871 UBC           0 :             }
    3872                 : 
    3873                 :             // update metrics, including the timeline's physical size
    3874 CBC        9890 :             self.metrics.record_new_file_metrics(metadata.len());
    3875            9890 : 
    3876            9890 :             new_layer_paths.insert(
    3877            9890 :                 new_delta_path,
    3878            9890 :                 LayerFileMetadata::new(metadata.len(), self.generation),
    3879            9890 :             );
    3880            9890 :             l.access_stats().record_residence_event(
    3881            9890 :                 LayerResidenceStatus::Resident,
    3882            9890 :                 LayerResidenceEventReason::LayerCreate,
    3883            9890 :             );
    3884            9890 :             let l = l as Arc<dyn PersistentLayer>;
    3885            9890 :             if guard.contains(&l) {
    3886              28 :                 tracing::error!(layer=%l, "duplicated L1 layer");
    3887              28 :                 duplicated_layers.insert(l.layer_desc().key());
    3888                 :             } else {
    3889            9862 :                 if LayerMap::is_l0(l.layer_desc()) {
    3890 UBC           0 :                     return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
    3891 CBC        9862 :                 }
    3892            9862 :                 insert_layers.push(l);
    3893                 :             }
    3894                 :         }
    3895                 : 
    3896                 :         // Now that we have reshuffled the data to set of new delta layers, we can
    3897                 :         // delete the old ones
    3898             304 :         let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
    3899            4612 :         for ldesc in deltas_to_compact {
    3900            4308 :             if duplicated_layers.contains(&ldesc.key()) {
    3901                 :                 // skip duplicated layers, they will not be removed; we have already overwritten them
    3902                 :                 // with new layers in the compaction phase 1.
    3903              28 :                 continue;
    3904            4280 :             }
    3905            4280 :             layer_names_to_delete.push(ldesc.filename());
    3906            4280 :             remove_layers.push(guard.get_from_desc(&ldesc));
    3907                 :         }
    3908                 : 
    3909             304 :         guard.finish_compact_l0(
    3910             304 :             layer_removal_cs,
    3911             304 :             remove_layers,
    3912             304 :             insert_layers,
    3913             304 :             &self.metrics,
    3914             304 :         )?;
    3915                 : 
    3916             304 :         drop_wlock(guard);
    3917                 : 
    3918                 :         // Also schedule the deletions in remote storage
    3919             304 :         if let Some(remote_client) = &self.remote_client {
    3920             304 :             remote_client.schedule_layer_file_deletion(layer_names_to_delete)?;
    3921 UBC           0 :         }
    3922                 : 
    3923 CBC         304 :         Ok(())
    3924            1254 :     }
    3925                 : 
    3926                 :     /// Update information about which layer files need to be retained on
    3927                 :     /// garbage collection. This is separate from actually performing the GC,
    3928                 :     /// and is updated more frequently, so that compaction can remove obsolete
    3929                 :     /// page versions more aggressively.
    3930                 :     ///
    3931                 :     /// TODO: that's wishful thinking, compaction doesn't actually do that
    3932                 :     /// currently.
    3933                 :     ///
    3934                 :     /// The caller specifies how much history is needed with the 3 arguments:
    3935                 :     ///
    3936                 :     /// retain_lsns: keep a version of each page at these LSNs
    3937                 :     /// cutoff_horizon: also keep everything newer than this LSN
    3938                 :     /// pitr: the time duration required to keep data for PITR
    3939                 :     ///
    3940                 :     /// The 'retain_lsns' list is currently used to prevent removing files that
    3941                 :     /// are needed by child timelines. In the future, the user might be able to
    3942                 :     /// name additional points in time to retain. The caller is responsible for
    3943                 :     /// collecting that information.
    3944                 :     ///
    3945                 :     /// The 'cutoff_horizon' point is used to retain recent versions that might still be
    3946                 :     /// needed by read-only nodes. (As of this writing, the caller just passes
    3947                 :     /// the latest LSN subtracted by a constant, and doesn't do anything smart
    3948                 :     /// to figure out what read-only nodes might actually need.)
    3949                 :     ///
    3950                 :     /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
    3951                 :     /// whether a record is needed for PITR.
    3952                 :     ///
    3953                 :     /// NOTE: This function holds a short-lived lock to protect the 'gc_info'
    3954                 :     /// field, so that the three values passed as argument are stored
    3955                 :     /// atomically. But the caller is responsible for ensuring that no new
    3956                 :     /// branches are created that would need to be included in 'retain_lsns',
    3957                 :     /// for example. The caller should hold `Tenant::gc_cs` lock to ensure
    3958                 :     /// that.
    3959                 :     ///
    3960            2256 :     #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
    3961                 :     pub(super) async fn update_gc_info(
    3962                 :         &self,
    3963                 :         retain_lsns: Vec<Lsn>,
    3964                 :         cutoff_horizon: Lsn,
    3965                 :         pitr: Duration,
    3966                 :         ctx: &RequestContext,
    3967                 :     ) -> anyhow::Result<()> {
    3968                 :         // First, calculate pitr_cutoff_timestamp and then convert it to LSN.
    3969                 :         //
    3970                 :         // Some unit tests depend on garbage-collection working even when
    3971                 :         // CLOG data is missing, so that find_lsn_for_timestamp() doesn't
    3972                 :         // work, so avoid calling it altogether if time-based retention is not
    3973                 :         // configured. It would be pointless anyway.
    3974                 :         let pitr_cutoff = if pitr != Duration::ZERO {
    3975                 :             let now = SystemTime::now();
    3976                 :             if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
    3977                 :                 let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
    3978                 : 
    3979                 :                 match self.find_lsn_for_timestamp(pitr_timestamp, ctx).await? {
    3980                 :                     LsnForTimestamp::Present(lsn) => lsn,
    3981                 :                     LsnForTimestamp::Future(lsn) => {
    3982                 :                         // The timestamp is in the future. That sounds impossible,
    3983                 :                         // but what it really means is that there hasn't been
    3984                 :                         // any commits since the cutoff timestamp.
    3985 UBC           0 :                         debug!("future({})", lsn);
    3986                 :                         cutoff_horizon
    3987                 :                     }
    3988                 :                     LsnForTimestamp::Past(lsn) => {
    3989               0 :                         debug!("past({})", lsn);
    3990                 :                         // conservative, safe default is to remove nothing, when we
    3991                 :                         // have no commit timestamp data available
    3992                 :                         *self.get_latest_gc_cutoff_lsn()
    3993                 :                     }
    3994                 :                     LsnForTimestamp::NoData(lsn) => {
    3995               0 :                         debug!("nodata({})", lsn);
    3996                 :                         // conservative, safe default is to remove nothing, when we
    3997                 :                         // have no commit timestamp data available
    3998                 :                         *self.get_latest_gc_cutoff_lsn()
    3999                 :                     }
    4000                 :                 }
    4001                 :             } else {
    4002                 :                 // If we don't have enough data to convert to LSN,
    4003                 :                 // play safe and don't remove any layers.
    4004                 :                 *self.get_latest_gc_cutoff_lsn()
    4005                 :             }
    4006                 :         } else {
    4007                 :             // No time-based retention was configured. Set time-based cutoff to
    4008                 :             // same as LSN based.
    4009                 :             cutoff_horizon
    4010                 :         };
    4011                 : 
    4012                 :         // Grab the lock and update the values
    4013                 :         *self.gc_info.write().unwrap() = GcInfo {
    4014                 :             retain_lsns,
    4015                 :             horizon_cutoff: cutoff_horizon,
    4016                 :             pitr_cutoff,
    4017                 :         };
    4018                 : 
    4019                 :         Ok(())
    4020                 :     }
    4021                 : 
    4022                 :     ///
    4023                 :     /// Garbage collect layer files on a timeline that are no longer needed.
    4024                 :     ///
    4025                 :     /// Currently, we don't make any attempt at removing unneeded page versions
    4026                 :     /// within a layer file. We can only remove the whole file if it's fully
    4027                 :     /// obsolete.
    4028                 :     ///
    4029 CBC         695 :     pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
    4030             695 :         let timer = self.metrics.garbage_collect_histo.start_timer();
    4031             695 : 
    4032             695 :         fail_point!("before-timeline-gc");
    4033                 : 
    4034             695 :         let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
    4035                 :         // Is the timeline being deleted?
    4036             695 :         if self.is_stopping() {
    4037 UBC           0 :             anyhow::bail!("timeline is Stopping");
    4038 CBC         695 :         }
    4039             695 : 
    4040             695 :         let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
    4041             695 :             let gc_info = self.gc_info.read().unwrap();
    4042             695 : 
    4043             695 :             let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
    4044             695 :             let pitr_cutoff = gc_info.pitr_cutoff;
    4045             695 :             let retain_lsns = gc_info.retain_lsns.clone();
    4046             695 :             (horizon_cutoff, pitr_cutoff, retain_lsns)
    4047             695 :         };
    4048             695 : 
    4049             695 :         let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
    4050                 : 
    4051             695 :         let res = self
    4052             695 :             .gc_timeline(
    4053             695 :                 layer_removal_cs.clone(),
    4054             695 :                 horizon_cutoff,
    4055             695 :                 pitr_cutoff,
    4056             695 :                 retain_lsns,
    4057             695 :                 new_gc_cutoff,
    4058             695 :             )
    4059             695 :             .instrument(
    4060             695 :                 info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
    4061                 :             )
    4062             219 :             .await?;
    4063                 : 
    4064                 :         // only record successes
    4065             690 :         timer.stop_and_record();
    4066             690 : 
    4067             690 :         Ok(res)
    4068             690 :     }
    4069                 : 
    4070             695 :     async fn gc_timeline(
    4071             695 :         &self,
    4072             695 :         layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
    4073             695 :         horizon_cutoff: Lsn,
    4074             695 :         pitr_cutoff: Lsn,
    4075             695 :         retain_lsns: Vec<Lsn>,
    4076             695 :         new_gc_cutoff: Lsn,
    4077             695 :     ) -> anyhow::Result<GcResult> {
    4078             695 :         let now = SystemTime::now();
    4079             695 :         let mut result: GcResult = GcResult::default();
    4080             695 : 
    4081             695 :         // Nothing to GC. Return early.
    4082             695 :         let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
    4083             695 :         if latest_gc_cutoff >= new_gc_cutoff {
    4084             108 :             info!(
    4085             108 :                 "Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
    4086             108 :             );
    4087             108 :             return Ok(result);
    4088             587 :         }
    4089             587 : 
    4090             587 :         // We need to ensure that no one tries to read page versions or create
    4091             587 :         // branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
    4092             587 :         // for details. This will block until the old value is no longer in use.
    4093             587 :         //
    4094             587 :         // The GC cutoff should only ever move forwards.
    4095             587 :         {
    4096             587 :             let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
    4097             587 :             ensure!(
    4098             587 :                 *write_guard <= new_gc_cutoff,
    4099 UBC           0 :                 "Cannot move GC cutoff LSN backwards (was {}, new {})",
    4100               0 :                 *write_guard,
    4101                 :                 new_gc_cutoff
    4102                 :             );
    4103 CBC         587 :             write_guard.store_and_unlock(new_gc_cutoff).wait();
    4104                 :         }
    4105                 : 
    4106             587 :         info!("GC starting");
    4107                 : 
    4108 UBC           0 :         debug!("retain_lsns: {:?}", retain_lsns);
    4109                 : 
    4110                 :         // Before deleting any layers, we need to wait for their upload ops to finish.
    4111                 :         // See storage_sync module level comment on consistency.
    4112                 :         // Do it here because we don't want to hold self.layers.write() while waiting.
    4113 CBC         587 :         if let Some(remote_client) = &self.remote_client {
    4114 UBC           0 :             debug!("waiting for upload ops to complete");
    4115 CBC         587 :             remote_client
    4116             587 :                 .wait_completion()
    4117             184 :                 .await
    4118             587 :                 .context("wait for layer upload ops to complete")?;
    4119 UBC           0 :         }
    4120                 : 
    4121 CBC         587 :         let mut layers_to_remove = Vec::new();
    4122             587 :         let mut wanted_image_layers = KeySpaceRandomAccum::default();
    4123                 : 
    4124                 :         // Scan all layers in the timeline (remote or on-disk).
    4125                 :         //
    4126                 :         // Garbage collect the layer if all conditions are satisfied:
    4127                 :         // 1. it is older than cutoff LSN;
    4128                 :         // 2. it is older than PITR interval;
    4129                 :         // 3. it doesn't need to be retained for 'retain_lsns';
    4130                 :         // 4. newer on-disk image layers cover the layer's whole key range
    4131                 :         //
    4132                 :         // TODO holding a write lock is too agressive and avoidable
    4133             587 :         let mut guard = self.layers.write().await;
    4134             587 :         let layers = guard.layer_map();
    4135           16846 :         'outer: for l in layers.iter_historic_layers() {
    4136           16846 :             result.layers_total += 1;
    4137           16846 : 
    4138           16846 :             // 1. Is it newer than GC horizon cutoff point?
    4139           16846 :             if l.get_lsn_range().end > horizon_cutoff {
    4140 UBC           0 :                 debug!(
    4141               0 :                     "keeping {} because it's newer than horizon_cutoff {}",
    4142               0 :                     l.filename(),
    4143               0 :                     horizon_cutoff,
    4144               0 :                 );
    4145 CBC        1717 :                 result.layers_needed_by_cutoff += 1;
    4146            1717 :                 continue 'outer;
    4147           15129 :             }
    4148           15129 : 
    4149           15129 :             // 2. It is newer than PiTR cutoff point?
    4150           15129 :             if l.get_lsn_range().end > pitr_cutoff {
    4151 UBC           0 :                 debug!(
    4152               0 :                     "keeping {} because it's newer than pitr_cutoff {}",
    4153               0 :                     l.filename(),
    4154               0 :                     pitr_cutoff,
    4155               0 :                 );
    4156 CBC         194 :                 result.layers_needed_by_pitr += 1;
    4157             194 :                 continue 'outer;
    4158           14935 :             }
    4159                 : 
    4160                 :             // 3. Is it needed by a child branch?
    4161                 :             // NOTE With that we would keep data that
    4162                 :             // might be referenced by child branches forever.
    4163                 :             // We can track this in child timeline GC and delete parent layers when
    4164                 :             // they are no longer needed. This might be complicated with long inheritance chains.
    4165                 :             //
    4166                 :             // TODO Vec is not a great choice for `retain_lsns`
    4167           15178 :             for retain_lsn in &retain_lsns {
    4168                 :                 // start_lsn is inclusive
    4169             620 :                 if &l.get_lsn_range().start <= retain_lsn {
    4170 UBC           0 :                     debug!(
    4171               0 :                         "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
    4172               0 :                         l.filename(),
    4173               0 :                         retain_lsn,
    4174               0 :                         l.is_incremental(),
    4175               0 :                     );
    4176 CBC         377 :                     result.layers_needed_by_branches += 1;
    4177             377 :                     continue 'outer;
    4178             243 :                 }
    4179                 :             }
    4180                 : 
    4181                 :             // 4. Is there a later on-disk layer for this relation?
    4182                 :             //
    4183                 :             // The end-LSN is exclusive, while disk_consistent_lsn is
    4184                 :             // inclusive. For example, if disk_consistent_lsn is 100, it is
    4185                 :             // OK for a delta layer to have end LSN 101, but if the end LSN
    4186                 :             // is 102, then it might not have been fully flushed to disk
    4187                 :             // before crash.
    4188                 :             //
    4189                 :             // For example, imagine that the following layers exist:
    4190                 :             //
    4191                 :             // 1000      - image (A)
    4192                 :             // 1000-2000 - delta (B)
    4193                 :             // 2000      - image (C)
    4194                 :             // 2000-3000 - delta (D)
    4195                 :             // 3000      - image (E)
    4196                 :             //
    4197                 :             // If GC horizon is at 2500, we can remove layers A and B, but
    4198                 :             // we cannot remove C, even though it's older than 2500, because
    4199                 :             // the delta layer 2000-3000 depends on it.
    4200           14558 :             if !layers
    4201           14558 :                 .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))?
    4202                 :             {
    4203 UBC           0 :                 debug!("keeping {} because it is the latest layer", l.filename());
    4204                 :                 // Collect delta key ranges that need image layers to allow garbage
    4205                 :                 // collecting the layers.
    4206                 :                 // It is not so obvious whether we need to propagate information only about
    4207                 :                 // delta layers. Image layers can form "stairs" preventing old image from been deleted.
    4208                 :                 // But image layers are in any case less sparse than delta layers. Also we need some
    4209                 :                 // protection from replacing recent image layers with new one after each GC iteration.
    4210 CBC       11289 :                 if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
    4211 UBC           0 :                     wanted_image_layers.add_range(l.get_key_range());
    4212 CBC       11289 :                 }
    4213           11289 :                 result.layers_not_updated += 1;
    4214           11289 :                 continue 'outer;
    4215            3269 :             }
    4216                 : 
    4217                 :             // We didn't find any reason to keep this file, so remove it.
    4218 UBC           0 :             debug!(
    4219               0 :                 "garbage collecting {} is_dropped: xx is_incremental: {}",
    4220               0 :                 l.filename(),
    4221               0 :                 l.is_incremental(),
    4222               0 :             );
    4223 CBC        3269 :             layers_to_remove.push(Arc::clone(&l));
    4224                 :         }
    4225             587 :         self.wanted_image_layers
    4226             587 :             .lock()
    4227             587 :             .unwrap()
    4228             587 :             .replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
    4229             587 : 
    4230             587 :         if !layers_to_remove.is_empty() {
    4231                 :             // Persist the new GC cutoff value in the metadata file, before
    4232                 :             // we actually remove anything.
    4233              25 :             self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())
    4234 UBC           0 :                 .await?;
    4235                 : 
    4236                 :             // Actually delete the layers from disk and remove them from the map.
    4237                 :             // (couldn't do this in the loop above, because you cannot modify a collection
    4238                 :             // while iterating it. BTreeMap::retain() would be another option)
    4239 CBC          25 :             let mut layer_names_to_delete = Vec::with_capacity(layers_to_remove.len());
    4240              25 :             let gc_layers = layers_to_remove
    4241              25 :                 .iter()
    4242            3269 :                 .map(|x| guard.get_from_desc(x))
    4243              25 :                 .collect();
    4244            3294 :             for doomed_layer in layers_to_remove {
    4245            3269 :                 layer_names_to_delete.push(doomed_layer.filename());
    4246            3269 :                 result.layers_removed += 1;
    4247            3269 :             }
    4248              25 :             let apply = guard.finish_gc_timeline(layer_removal_cs, gc_layers, &self.metrics)?;
    4249                 : 
    4250              25 :             if result.layers_removed != 0 {
    4251              25 :                 fail_point!("after-timeline-gc-removed-layers");
    4252              25 :             }
    4253                 : 
    4254              25 :             if let Some(remote_client) = &self.remote_client {
    4255              25 :                 remote_client.schedule_layer_file_deletion(layer_names_to_delete)?;
    4256 UBC           0 :             }
    4257                 : 
    4258 CBC          20 :             apply.flush();
    4259             562 :         }
    4260                 : 
    4261             582 :         info!(
    4262             582 :             "GC completed removing {} layers, cutoff {}",
    4263             582 :             result.layers_removed, new_gc_cutoff
    4264             582 :         );
    4265                 : 
    4266             582 :         result.elapsed = now.elapsed()?;
    4267             582 :         Ok(result)
    4268             690 :     }
    4269                 : 
    4270                 :     ///
    4271                 :     /// Reconstruct a value, using the given base image and WAL records in 'data'.
    4272                 :     ///
    4273         6372248 :     async fn reconstruct_value(
    4274         6372248 :         &self,
    4275         6372248 :         key: Key,
    4276         6372248 :         request_lsn: Lsn,
    4277         6372248 :         mut data: ValueReconstructState,
    4278         6372249 :     ) -> Result<Bytes, PageReconstructError> {
    4279         6372249 :         // Perform WAL redo if needed
    4280         6372249 :         data.records.reverse();
    4281         6372249 : 
    4282         6372249 :         // If we have a page image, and no WAL, we're all set
    4283         6372249 :         if data.records.is_empty() {
    4284         4116401 :             if let Some((img_lsn, img)) = &data.img {
    4285 UBC           0 :                 trace!(
    4286               0 :                     "found page image for key {} at {}, no WAL redo required, req LSN {}",
    4287               0 :                     key,
    4288               0 :                     img_lsn,
    4289               0 :                     request_lsn,
    4290               0 :                 );
    4291 CBC     4116401 :                 Ok(img.clone())
    4292                 :             } else {
    4293 UBC           0 :                 Err(PageReconstructError::from(anyhow!(
    4294               0 :                     "base image for {key} at {request_lsn} not found"
    4295               0 :                 )))
    4296                 :             }
    4297                 :         } else {
    4298                 :             // We need to do WAL redo.
    4299                 :             //
    4300                 :             // If we don't have a base image, then the oldest WAL record better initialize
    4301                 :             // the page
    4302 CBC     2255848 :             if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
    4303 UBC           0 :                 Err(PageReconstructError::from(anyhow!(
    4304               0 :                     "Base image for {} at {} not found, but got {} WAL records",
    4305               0 :                     key,
    4306               0 :                     request_lsn,
    4307               0 :                     data.records.len()
    4308               0 :                 )))
    4309                 :             } else {
    4310 CBC     2255848 :                 if data.img.is_some() {
    4311 UBC           0 :                     trace!(
    4312               0 :                         "found {} WAL records and a base image for {} at {}, performing WAL redo",
    4313               0 :                         data.records.len(),
    4314               0 :                         key,
    4315               0 :                         request_lsn
    4316               0 :                     );
    4317                 :                 } else {
    4318               0 :                     trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
    4319                 :                 };
    4320                 : 
    4321 CBC     2255848 :                 let last_rec_lsn = data.records.last().unwrap().0;
    4322                 : 
    4323         2255848 :                 let img = match self
    4324         2255848 :                     .walredo_mgr
    4325         2255848 :                     .request_redo(key, request_lsn, data.img, data.records, self.pg_version)
    4326 UBC           0 :                     .await
    4327 CBC     2255846 :                     .context("Failed to reconstruct a page image:")
    4328                 :                 {
    4329         2255846 :                     Ok(img) => img,
    4330 UBC           0 :                     Err(e) => return Err(PageReconstructError::from(e)),
    4331                 :                 };
    4332                 : 
    4333 CBC     2255846 :                 if img.len() == page_cache::PAGE_SZ {
    4334         2252422 :                     let cache = page_cache::get();
    4335         2252422 :                     if let Err(e) = cache
    4336         2252422 :                         .memorize_materialized_page(
    4337         2252422 :                             self.tenant_id,
    4338         2252422 :                             self.timeline_id,
    4339         2252422 :                             key,
    4340         2252422 :                             last_rec_lsn,
    4341         2252422 :                             &img,
    4342         2252422 :                         )
    4343           58513 :                         .await
    4344         2252422 :                         .context("Materialized page memoization failed")
    4345                 :                     {
    4346 UBC           0 :                         return Err(PageReconstructError::from(e));
    4347 CBC     2252422 :                     }
    4348            3424 :                 }
    4349                 : 
    4350         2255846 :                 Ok(img)
    4351                 :             }
    4352                 :         }
    4353         6372247 :     }
    4354                 : 
    4355                 :     /// Download a layer file from remote storage and insert it into the layer map.
    4356                 :     ///
    4357                 :     /// It's safe to call this function for the same layer concurrently. In that case:
    4358                 :     /// - If the layer has already been downloaded, `OK(...)` is returned.
    4359                 :     /// - If the layer is currently being downloaded, we wait until that download succeeded / failed.
    4360                 :     ///     - If it succeeded, we return `Ok(...)`.
    4361                 :     ///     - If it failed, we or another concurrent caller will initiate a new download attempt.
    4362                 :     ///
    4363                 :     /// Download errors are classified and retried if appropriate by the underlying RemoteTimelineClient function.
    4364                 :     /// It has an internal limit for the maximum number of retries and prints appropriate log messages.
    4365                 :     /// If we exceed the limit, it returns an error, and this function passes it through.
    4366                 :     /// The caller _could_ retry further by themselves by calling this function again, but _should not_ do it.
    4367                 :     /// The reason is that they cannot distinguish permanent errors from temporary ones, whereas
    4368                 :     /// the underlying RemoteTimelineClient can.
    4369                 :     ///
    4370                 :     /// There is no internal timeout or slowness detection.
    4371                 :     /// If the caller has a deadline or needs a timeout, they can simply stop polling:
    4372                 :     /// we're **cancellation-safe** because the download happens in a separate task_mgr task.
    4373                 :     /// So, the current download attempt will run to completion even if we stop polling.
    4374            4326 :     #[instrument(skip_all, fields(layer=%remote_layer))]
    4375                 :     pub async fn download_remote_layer(
    4376                 :         &self,
    4377                 :         remote_layer: Arc<RemoteLayer>,
    4378                 :     ) -> anyhow::Result<()> {
    4379                 :         span::debug_assert_current_span_has_tenant_and_timeline_id();
    4380                 : 
    4381                 :         use std::sync::atomic::Ordering::Relaxed;
    4382                 : 
    4383                 :         let permit = match Arc::clone(&remote_layer.ongoing_download)
    4384                 :             .acquire_owned()
    4385                 :             .await
    4386                 :         {
    4387                 :             Ok(permit) => permit,
    4388                 :             Err(_closed) => {
    4389                 :                 if remote_layer.download_replacement_failure.load(Relaxed) {
    4390                 :                     // this path will be hit often, in case there are upper retries. however
    4391                 :                     // hitting this error will prevent a busy loop between get_reconstruct_data and
    4392                 :                     // download, so an error is prefered.
    4393                 :                     //
    4394                 :                     // TODO: we really should poison the timeline, but panicking is not yet
    4395                 :                     // supported. Related: https://github.com/neondatabase/neon/issues/3621
    4396                 :                     anyhow::bail!("an earlier download succeeded but LayerMap::replace failed")
    4397                 :                 } else {
    4398              61 :                     info!("download of layer has already finished");
    4399                 :                     return Ok(());
    4400                 :                 }
    4401                 :             }
    4402                 :         };
    4403                 : 
    4404                 :         let (sender, receiver) = tokio::sync::oneshot::channel();
    4405                 :         // Spawn a task so that download does not outlive timeline when we detach tenant / delete timeline.
    4406                 :         let self_clone = self.myself.upgrade().expect("timeline is gone");
    4407                 :         task_mgr::spawn(
    4408                 :             &tokio::runtime::Handle::current(),
    4409                 :             TaskKind::RemoteDownloadTask,
    4410                 :             Some(self.tenant_id),
    4411                 :             Some(self.timeline_id),
    4412                 :             &format!("download layer {}", remote_layer),
    4413                 :             false,
    4414            1379 :             async move {
    4415            1379 :                 let remote_client = self_clone.remote_client.as_ref().unwrap();
    4416                 : 
    4417                 :                 // Does retries + exponential back-off internally.
    4418                 :                 // When this fails, don't layer further retry attempts here.
    4419            1379 :                 let result = remote_client
    4420            1379 :                     .download_layer_file(&remote_layer.filename(), &remote_layer.layer_metadata)
    4421          459970 :                     .await;
    4422                 : 
    4423            1376 :                 if let Ok(size) = &result {
    4424            1370 :                     info!("layer file download finished");
    4425                 : 
    4426                 :                     // XXX the temp file is still around in Err() case
    4427                 :                     // and consumes space until we clean up upon pageserver restart.
    4428            1370 :                     self_clone.metrics.resident_physical_size_add(*size);
    4429                 : 
    4430                 :                     // Download complete. Replace the RemoteLayer with the corresponding
    4431                 :                     // Delta- or ImageLayer in the layer map.
    4432            1370 :                     let mut guard = self_clone.layers.write().await;
    4433            1370 :                     let new_layer =
    4434            1370 :                         remote_layer.create_downloaded_layer(&guard, self_clone.conf, *size);
    4435            1370 :                     {
    4436            1370 :                         let l: Arc<dyn PersistentLayer> = remote_layer.clone();
    4437            1370 :                         let failure = match guard.replace_and_verify(l, new_layer) {
    4438            1369 :                             Ok(()) => false,
    4439               1 :                             Err(e) => {
    4440               1 :                                 // this is a precondition failure, the layer filename derived
    4441               1 :                                 // attributes didn't match up, which doesn't seem likely.
    4442               1 :                                 error!("replacing downloaded layer into layermap failed: {e:#?}");
    4443               1 :                                 true
    4444                 :                             }
    4445                 :                         };
    4446                 : 
    4447            1370 :                         if failure {
    4448               1 :                             // mark the remote layer permanently failed; the timeline is most
    4449               1 :                             // likely unusable after this. sadly we cannot just poison the layermap
    4450               1 :                             // lock with panic, because that would create an issue with shutdown.
    4451               1 :                             //
    4452               1 :                             // this does not change the retry semantics on failed downloads.
    4453               1 :                             //
    4454               1 :                             // use of Relaxed is valid because closing of the semaphore gives
    4455               1 :                             // happens-before and wakes up any waiters; we write this value before
    4456               1 :                             // and any waiters (or would be waiters) will load it after closing
    4457               1 :                             // semaphore.
    4458               1 :                             //
    4459               1 :                             // See: https://github.com/neondatabase/neon/issues/3533
    4460               1 :                             remote_layer
    4461               1 :                                 .download_replacement_failure
    4462               1 :                                 .store(true, Relaxed);
    4463            1369 :                         }
    4464                 :                     }
    4465            1370 :                     drop_wlock(guard);
    4466            1370 : 
    4467            1370 :                     info!("on-demand download successful");
    4468                 : 
    4469                 :                     // Now that we've inserted the download into the layer map,
    4470                 :                     // close the semaphore. This will make other waiters for
    4471                 :                     // this download return Ok(()).
    4472            1370 :                     assert!(!remote_layer.ongoing_download.is_closed());
    4473            1370 :                     remote_layer.ongoing_download.close();
    4474                 :                 } else {
    4475                 :                     // Keep semaphore open. We'll drop the permit at the end of the function.
    4476               6 :                     error!(
    4477               6 :                         "layer file download failed: {:?}",
    4478               6 :                         result.as_ref().unwrap_err()
    4479               6 :                     );
    4480                 :                 }
    4481                 : 
    4482                 :                 // Don't treat it as an error if the task that triggered the download
    4483                 :                 // is no longer interested in the result.
    4484            1376 :                 sender.send(result.map(|_sz| ())).ok();
    4485            1376 : 
    4486            1376 :                 // In case we failed and there are other waiters, this will make one
    4487            1376 :                 // of them retry the download in a new task.
    4488            1376 :                 // XXX: This resets the exponential backoff because it's a new call to
    4489            1376 :                 // download_layer file.
    4490            1376 :                 drop(permit);
    4491            1376 : 
    4492            1376 :                 Ok(())
    4493            1376 :             }
    4494                 :             .in_current_span(),
    4495                 :         );
    4496                 : 
    4497                 :         receiver.await.context("download task cancelled")?
    4498                 :     }
    4499                 : 
    4500               3 :     pub async fn spawn_download_all_remote_layers(
    4501               3 :         self: Arc<Self>,
    4502               3 :         request: DownloadRemoteLayersTaskSpawnRequest,
    4503               3 :     ) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
    4504               3 :         let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
    4505               3 :         if let Some(st) = &*status_guard {
    4506               1 :             match &st.state {
    4507                 :                 DownloadRemoteLayersTaskState::Running => {
    4508 UBC           0 :                     return Err(st.clone());
    4509                 :                 }
    4510                 :                 DownloadRemoteLayersTaskState::ShutDown
    4511 CBC           1 :                 | DownloadRemoteLayersTaskState::Completed => {
    4512               1 :                     *status_guard = None;
    4513               1 :                 }
    4514                 :             }
    4515               2 :         }
    4516                 : 
    4517               3 :         let self_clone = Arc::clone(&self);
    4518               3 :         let task_id = task_mgr::spawn(
    4519               3 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    4520               3 :             task_mgr::TaskKind::DownloadAllRemoteLayers,
    4521               3 :             Some(self.tenant_id),
    4522               3 :             Some(self.timeline_id),
    4523               3 :             "download all remote layers task",
    4524                 :             false,
    4525               3 :             async move {
    4526              11 :                 self_clone.download_all_remote_layers(request).await;
    4527               3 :                 let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
    4528               3 :                  match &mut *status_guard {
    4529                 :                     None => {
    4530 UBC           0 :                         warn!("tasks status is supposed to be Some(), since we are running");
    4531                 :                     }
    4532 CBC           3 :                     Some(st) => {
    4533               3 :                         let exp_task_id = format!("{}", task_mgr::current_task_id().unwrap());
    4534               3 :                         if st.task_id != exp_task_id {
    4535 UBC           0 :                             warn!("task id changed while we were still running, expecting {} but have {}", exp_task_id, st.task_id);
    4536 CBC           3 :                         } else {
    4537               3 :                             st.state = DownloadRemoteLayersTaskState::Completed;
    4538               3 :                         }
    4539                 :                     }
    4540                 :                 };
    4541               3 :                 Ok(())
    4542               3 :             }
    4543               3 :             .instrument(info_span!(parent: None, "download_all_remote_layers", tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))
    4544                 :         );
    4545                 : 
    4546               3 :         let initial_info = DownloadRemoteLayersTaskInfo {
    4547               3 :             task_id: format!("{task_id}"),
    4548               3 :             state: DownloadRemoteLayersTaskState::Running,
    4549               3 :             total_layer_count: 0,
    4550               3 :             successful_download_count: 0,
    4551               3 :             failed_download_count: 0,
    4552               3 :         };
    4553               3 :         *status_guard = Some(initial_info.clone());
    4554               3 : 
    4555               3 :         Ok(initial_info)
    4556               3 :     }
    4557                 : 
    4558               3 :     async fn download_all_remote_layers(
    4559               3 :         self: &Arc<Self>,
    4560               3 :         request: DownloadRemoteLayersTaskSpawnRequest,
    4561               3 :     ) {
    4562               3 :         let mut downloads = Vec::new();
    4563                 :         {
    4564               3 :             let guard = self.layers.read().await;
    4565               3 :             let layers = guard.layer_map();
    4566               3 :             layers
    4567               3 :                 .iter_historic_layers()
    4568              12 :                 .map(|l| guard.get_from_desc(&l))
    4569              12 :                 .filter_map(|l| l.downcast_remote_layer())
    4570               8 :                 .map(|l| self.download_remote_layer(l))
    4571               8 :                 .for_each(|dl| downloads.push(dl))
    4572               3 :         }
    4573               3 :         let total_layer_count = downloads.len();
    4574               3 :         // limit download concurrency as specified in request
    4575               3 :         let downloads = futures::stream::iter(downloads);
    4576               3 :         let mut downloads = downloads.buffer_unordered(request.max_concurrent_downloads.get());
    4577               3 : 
    4578               3 :         macro_rules! lock_status {
    4579               3 :             ($st:ident) => {
    4580               3 :                 let mut st = self.download_all_remote_layers_task_info.write().unwrap();
    4581               3 :                 let st = st
    4582               3 :                     .as_mut()
    4583               3 :                     .expect("this function is only called after the task has been spawned");
    4584               3 :                 assert_eq!(
    4585               3 :                     st.task_id,
    4586               3 :                     format!(
    4587               3 :                         "{}",
    4588               3 :                         task_mgr::current_task_id().expect("we run inside a task_mgr task")
    4589               3 :                     )
    4590               3 :                 );
    4591               3 :                 let $st = st;
    4592               3 :             };
    4593               3 :         }
    4594               3 : 
    4595               3 :         {
    4596               3 :             lock_status!(st);
    4597               3 :             st.total_layer_count = total_layer_count as u64;
    4598                 :         }
    4599              11 :         loop {
    4600              22 :             tokio::select! {
    4601              11 :                 dl = downloads.next() => {
    4602                 :                     lock_status!(st);
    4603                 :                     match dl {
    4604                 :                         None => break,
    4605                 :                         Some(Ok(())) => {
    4606                 :                             st.successful_download_count += 1;
    4607                 :                         },
    4608                 :                         Some(Err(e)) => {
    4609               4 :                             error!(error = %e, "layer download failed");
    4610                 :                             st.failed_download_count += 1;
    4611                 :                         }
    4612                 :                     }
    4613                 :                 }
    4614                 :                 _ = task_mgr::shutdown_watcher() => {
    4615                 :                     // Kind of pointless to watch for shutdowns here,
    4616                 :                     // as download_remote_layer spawns other task_mgr tasks internally.
    4617                 :                     lock_status!(st);
    4618                 :                     st.state = DownloadRemoteLayersTaskState::ShutDown;
    4619                 :                 }
    4620              11 :             }
    4621              11 :         }
    4622                 :         {
    4623               3 :             lock_status!(st);
    4624               3 :             st.state = DownloadRemoteLayersTaskState::Completed;
    4625               3 :         }
    4626               3 :     }
    4627                 : 
    4628              22 :     pub fn get_download_all_remote_layers_task_info(&self) -> Option<DownloadRemoteLayersTaskInfo> {
    4629              22 :         self.download_all_remote_layers_task_info
    4630              22 :             .read()
    4631              22 :             .unwrap()
    4632              22 :             .clone()
    4633              22 :     }
    4634                 : }
    4635                 : 
    4636                 : pub struct DiskUsageEvictionInfo {
    4637                 :     /// Timeline's largest layer (remote or resident)
    4638                 :     pub max_layer_size: Option<u64>,
    4639                 :     /// Timeline's resident layers
    4640                 :     pub resident_layers: Vec<LocalLayerInfoForDiskUsageEviction>,
    4641                 : }
    4642                 : 
    4643                 : pub struct LocalLayerInfoForDiskUsageEviction {
    4644                 :     pub layer: Arc<dyn PersistentLayer>,
    4645                 :     pub last_activity_ts: SystemTime,
    4646                 : }
    4647                 : 
    4648                 : impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction {
    4649 UBC           0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    4650               0 :         // format the tv_sec, tv_nsec into rfc3339 in case someone is looking at it
    4651               0 :         // having to allocate a string to this is bad, but it will rarely be formatted
    4652               0 :         let ts = chrono::DateTime::<chrono::Utc>::from(self.last_activity_ts);
    4653               0 :         let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
    4654               0 :         f.debug_struct("LocalLayerInfoForDiskUsageEviction")
    4655               0 :             .field("layer", &self.layer)
    4656               0 :             .field("last_activity", &ts)
    4657               0 :             .finish()
    4658               0 :     }
    4659                 : }
    4660                 : 
    4661                 : impl LocalLayerInfoForDiskUsageEviction {
    4662 CBC         250 :     pub fn file_size(&self) -> u64 {
    4663             250 :         self.layer.layer_desc().file_size
    4664             250 :     }
    4665                 : }
    4666                 : 
    4667                 : impl Timeline {
    4668                 :     /// Returns non-remote layers for eviction.
    4669              13 :     pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
    4670              13 :         let guard = self.layers.read().await;
    4671              13 :         let layers = guard.layer_map();
    4672              13 : 
    4673              13 :         let mut max_layer_size: Option<u64> = None;
    4674              13 :         let mut resident_layers = Vec::new();
    4675                 : 
    4676             250 :         for l in layers.iter_historic_layers() {
    4677             250 :             let file_size = l.file_size();
    4678             250 :             max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
    4679             250 : 
    4680             250 :             let l = guard.get_from_desc(&l);
    4681             250 : 
    4682             250 :             if l.is_remote_layer() {
    4683 UBC           0 :                 continue;
    4684 CBC         250 :             }
    4685             250 : 
    4686             250 :             let last_activity_ts = l.access_stats().latest_activity().unwrap_or_else(|| {
    4687 UBC           0 :                 // We only use this fallback if there's an implementation error.
    4688               0 :                 // `latest_activity` already does rate-limited warn!() log.
    4689               0 :                 debug!(layer=%l, "last_activity returns None, using SystemTime::now");
    4690               0 :                 SystemTime::now()
    4691 CBC         250 :             });
    4692             250 : 
    4693             250 :             resident_layers.push(LocalLayerInfoForDiskUsageEviction {
    4694             250 :                 layer: l,
    4695             250 :                 last_activity_ts,
    4696             250 :             });
    4697                 :         }
    4698                 : 
    4699              13 :         DiskUsageEvictionInfo {
    4700              13 :             max_layer_size,
    4701              13 :             resident_layers,
    4702              13 :         }
    4703              13 :     }
    4704                 : }
    4705                 : 
    4706                 : type TraversalPathItem = (
    4707                 :     ValueReconstructResult,
    4708                 :     Lsn,
    4709                 :     Box<dyn Send + FnOnce() -> TraversalId>,
    4710                 : );
    4711                 : 
    4712                 : /// Helper function for get_reconstruct_data() to add the path of layers traversed
    4713                 : /// to an error, as anyhow context information.
    4714               7 : fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructError {
    4715               7 :     // We want the original 'msg' to be the outermost context. The outermost context
    4716               7 :     // is the most high-level information, which also gets propagated to the client.
    4717               7 :     let mut msg_iter = path
    4718               7 :         .into_iter()
    4719              12 :         .map(|(r, c, l)| {
    4720              12 :             format!(
    4721              12 :                 "layer traversal: result {:?}, cont_lsn {}, layer: {}",
    4722              12 :                 r,
    4723              12 :                 c,
    4724              12 :                 l(),
    4725              12 :             )
    4726              12 :         })
    4727               7 :         .chain(std::iter::once(msg));
    4728               7 :     // Construct initial message from the first traversed layer
    4729               7 :     let err = anyhow!(msg_iter.next().unwrap());
    4730               7 : 
    4731               7 :     // Append all subsequent traversals, and the error message 'msg', as contexts.
    4732              12 :     let msg = msg_iter.fold(err, |err, msg| err.context(msg));
    4733               7 :     PageReconstructError::from(msg)
    4734               7 : }
    4735                 : 
    4736                 : /// Various functions to mutate the timeline.
    4737                 : // TODO Currently, Deref is used to allow easy access to read methods from this trait.
    4738                 : // This is probably considered a bad practice in Rust and should be fixed eventually,
    4739                 : // but will cause large code changes.
    4740                 : pub struct TimelineWriter<'a> {
    4741                 :     tl: &'a Timeline,
    4742                 :     _write_guard: tokio::sync::MutexGuard<'a, ()>,
    4743                 : }
    4744                 : 
    4745                 : impl Deref for TimelineWriter<'_> {
    4746                 :     type Target = Timeline;
    4747                 : 
    4748 UBC           0 :     fn deref(&self) -> &Self::Target {
    4749               0 :         self.tl
    4750               0 :     }
    4751                 : }
    4752                 : 
    4753                 : impl<'a> TimelineWriter<'a> {
    4754                 :     /// Put a new page version that can be constructed from a WAL record
    4755                 :     ///
    4756                 :     /// This will implicitly extend the relation, if the page is beyond the
    4757                 :     /// current end-of-file.
    4758 CBC    76622124 :     pub async fn put(
    4759        76622124 :         &self,
    4760        76622124 :         key: Key,
    4761        76622124 :         lsn: Lsn,
    4762        76622124 :         value: &Value,
    4763        76622124 :         ctx: &RequestContext,
    4764        76622124 :     ) -> anyhow::Result<()> {
    4765        76622172 :         self.tl.put_value(key, lsn, value, ctx).await
    4766        76622167 :     }
    4767                 : 
    4768           17687 :     pub async fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
    4769           17687 :         self.tl.put_tombstone(key_range, lsn).await
    4770           17687 :     }
    4771                 : 
    4772                 :     /// Track the end of the latest digested WAL record.
    4773                 :     /// Remember the (end of) last valid WAL record remembered in the timeline.
    4774                 :     ///
    4775                 :     /// Call this after you have finished writing all the WAL up to 'lsn'.
    4776                 :     ///
    4777                 :     /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for
    4778                 :     /// the 'lsn' or anything older. The previous last record LSN is stored alongside
    4779                 :     /// the latest and can be read.
    4780        69330499 :     pub fn finish_write(&self, new_lsn: Lsn) {
    4781        69330499 :         self.tl.finish_write(new_lsn);
    4782        69330499 :     }
    4783                 : 
    4784         1029042 :     pub fn update_current_logical_size(&self, delta: i64) {
    4785         1029042 :         self.tl.update_current_logical_size(delta)
    4786         1029042 :     }
    4787                 : }
    4788                 : 
    4789                 : // We need TimelineWriter to be send in upcoming conversion of
    4790                 : // Timeline::layers to tokio::sync::RwLock.
    4791               1 : #[test]
    4792               1 : fn is_send() {
    4793               1 :     fn _assert_send<T: Send>() {}
    4794               1 :     _assert_send::<TimelineWriter<'_>>();
    4795               1 : }
    4796                 : 
    4797                 : /// Add a suffix to a layer file's name: .{num}.old
    4798                 : /// Uses the first available num (starts at 0)
    4799               1 : fn rename_to_backup(path: &Utf8Path) -> anyhow::Result<()> {
    4800               1 :     let filename = path
    4801               1 :         .file_name()
    4802               1 :         .ok_or_else(|| anyhow!("Path {path} don't have a file name"))?;
    4803               1 :     let mut new_path = path.to_owned();
    4804                 : 
    4805               1 :     for i in 0u32.. {
    4806               1 :         new_path.set_file_name(format!("{filename}.{i}.old"));
    4807               1 :         if !new_path.exists() {
    4808               1 :             std::fs::rename(path, &new_path)
    4809               1 :                 .with_context(|| format!("rename {path:?} to {new_path:?}"))?;
    4810               1 :             return Ok(());
    4811 UBC           0 :         }
    4812                 :     }
    4813                 : 
    4814               0 :     bail!("couldn't find an unused backup number for {:?}", path)
    4815 CBC           1 : }
    4816                 : 
    4817                 : /// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
    4818                 : ///
    4819                 : /// Returns `true` if the two `Arc` point to the same layer, false otherwise.
    4820                 : ///
    4821                 : /// If comparing persistent layers, ALWAYS compare the layer descriptor key.
    4822                 : #[inline(always)]
    4823            7310 : pub fn compare_arced_layers<L: ?Sized>(left: &Arc<L>, right: &Arc<L>) -> bool {
    4824            7310 :     // "dyn Trait" objects are "fat pointers" in that they have two components:
    4825            7310 :     // - pointer to the object
    4826            7310 :     // - pointer to the vtable
    4827            7310 :     //
    4828            7310 :     // rust does not provide a guarantee that these vtables are unique, but however
    4829            7310 :     // `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
    4830            7310 :     // pointer and the vtable need to be equal.
    4831            7310 :     //
    4832            7310 :     // See: https://github.com/rust-lang/rust/issues/103763
    4833            7310 :     //
    4834            7310 :     // A future version of rust will most likely use this form below, where we cast each
    4835            7310 :     // pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
    4836            7310 :     // not affect the comparison.
    4837            7310 :     //
    4838            7310 :     // See: https://github.com/rust-lang/rust/pull/106450
    4839            7310 :     let left = Arc::as_ptr(left) as *const ();
    4840            7310 :     let right = Arc::as_ptr(right) as *const ();
    4841            7310 : 
    4842            7310 :     left == right
    4843            7310 : }
    4844                 : 
    4845                 : #[cfg(test)]
    4846                 : mod tests {
    4847                 :     use std::sync::Arc;
    4848                 : 
    4849                 :     use utils::{id::TimelineId, lsn::Lsn};
    4850                 : 
    4851                 :     use crate::tenant::{harness::TenantHarness, storage_layer::PersistentLayer};
    4852                 : 
    4853                 :     use super::{EvictionError, Timeline};
    4854                 : 
    4855               1 :     #[tokio::test]
    4856               1 :     async fn two_layer_eviction_attempts_at_the_same_time() {
    4857               1 :         let harness =
    4858               1 :             TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
    4859               1 : 
    4860               1 :         let ctx = any_context();
    4861               1 :         let tenant = harness.try_load(&ctx).await.unwrap();
    4862               1 :         let timeline = tenant
    4863               1 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
    4864               2 :             .await
    4865               1 :             .unwrap();
    4866               1 : 
    4867               1 :         let rc = timeline
    4868               1 :             .remote_client
    4869               1 :             .clone()
    4870               1 :             .expect("just configured this");
    4871                 : 
    4872               1 :         let layer = find_some_layer(&timeline).await;
    4873                 : 
    4874               1 :         let cancel = tokio_util::sync::CancellationToken::new();
    4875               1 :         let batch = [layer];
    4876               1 : 
    4877               1 :         let first = {
    4878               1 :             let cancel = cancel.clone();
    4879               1 :             async {
    4880               1 :                 timeline
    4881               1 :                     .evict_layer_batch(&rc, &batch, cancel)
    4882 UBC           0 :                     .await
    4883 CBC           1 :                     .unwrap()
    4884               1 :             }
    4885                 :         };
    4886               1 :         let second = async {
    4887               1 :             timeline
    4888               1 :                 .evict_layer_batch(&rc, &batch, cancel)
    4889 UBC           0 :                 .await
    4890 CBC           1 :                 .unwrap()
    4891               1 :         };
    4892                 : 
    4893               1 :         let (first, second) = tokio::join!(first, second);
    4894                 : 
    4895               1 :         let (first, second) = (only_one(first), only_one(second));
    4896               1 : 
    4897               1 :         match (first, second) {
    4898                 :             (Ok(()), Err(EvictionError::FileNotFound))
    4899               1 :             | (Err(EvictionError::FileNotFound), Ok(())) => {
    4900               1 :                 // one of the evictions gets to do it,
    4901               1 :                 // other one gets FileNotFound. all is good.
    4902               1 :             }
    4903 UBC           0 :             other => unreachable!("unexpected {:?}", other),
    4904                 :         }
    4905                 :     }
    4906                 : 
    4907 CBC           1 :     #[tokio::test]
    4908               1 :     async fn layer_eviction_aba_fails() {
    4909               1 :         let harness = TenantHarness::create("layer_eviction_aba_fails").unwrap();
    4910               1 : 
    4911               1 :         let ctx = any_context();
    4912               1 :         let tenant = harness.try_load(&ctx).await.unwrap();
    4913               1 :         let timeline = tenant
    4914               1 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
    4915               2 :             .await
    4916               1 :             .unwrap();
    4917                 : 
    4918               1 :         let _e = tracing::info_span!("foobar", tenant_id = %tenant.tenant_id, timeline_id = %timeline.timeline_id).entered();
    4919               1 : 
    4920               1 :         let rc = timeline.remote_client.clone().unwrap();
    4921                 : 
    4922                 :         // TenantHarness allows uploads to happen given GenericRemoteStorage is configured
    4923               1 :         let layer = find_some_layer(&timeline).await;
    4924                 : 
    4925               1 :         let cancel = tokio_util::sync::CancellationToken::new();
    4926               1 :         let batch = [layer];
    4927               1 : 
    4928               1 :         let first = {
    4929               1 :             let cancel = cancel.clone();
    4930               1 :             async {
    4931               1 :                 timeline
    4932               1 :                     .evict_layer_batch(&rc, &batch, cancel)
    4933 UBC           0 :                     .await
    4934 CBC           1 :                     .unwrap()
    4935               1 :             }
    4936                 :         };
    4937                 : 
    4938                 :         // lets imagine this is stuck somehow, still referencing the original `Arc<dyn PersistentLayer>`
    4939               1 :         let second = {
    4940               1 :             let cancel = cancel.clone();
    4941               1 :             async {
    4942               1 :                 timeline
    4943               1 :                     .evict_layer_batch(&rc, &batch, cancel)
    4944 UBC           0 :                     .await
    4945 CBC           1 :                     .unwrap()
    4946               1 :             }
    4947                 :         };
    4948                 : 
    4949                 :         // while it's stuck, we evict and end up redownloading it
    4950               1 :         only_one(first.await).expect("eviction succeeded");
    4951                 : 
    4952               1 :         let layer = find_some_layer(&timeline).await;
    4953               1 :         let layer = layer.downcast_remote_layer().unwrap();
    4954               1 :         timeline.download_remote_layer(layer).await.unwrap();
    4955                 : 
    4956               1 :         let res = only_one(second.await);
    4957                 : 
    4958               1 :         assert!(
    4959               1 :             matches!(res, Err(EvictionError::LayerNotFound(_))),
    4960 UBC           0 :             "{res:?}"
    4961                 :         );
    4962                 : 
    4963                 :         // no more specific asserting, outside of preconds this is the only valid replacement
    4964                 :         // failure
    4965                 :     }
    4966                 : 
    4967 CBC           2 :     fn any_context() -> crate::context::RequestContext {
    4968               2 :         use crate::context::*;
    4969               2 :         use crate::task_mgr::*;
    4970               2 :         RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
    4971               2 :     }
    4972                 : 
    4973               4 :     fn only_one<T>(mut input: Vec<Option<T>>) -> T {
    4974               4 :         assert_eq!(1, input.len());
    4975               4 :         input
    4976               4 :             .pop()
    4977               4 :             .expect("length just checked")
    4978               4 :             .expect("no cancellation")
    4979               4 :     }
    4980                 : 
    4981               3 :     async fn find_some_layer(timeline: &Timeline) -> Arc<dyn PersistentLayer> {
    4982               3 :         let layers = timeline.layers.read().await;
    4983               3 :         let desc = layers
    4984               3 :             .layer_map()
    4985               3 :             .iter_historic_layers()
    4986               3 :             .next()
    4987               3 :             .expect("must find one layer to evict");
    4988               3 : 
    4989               3 :         layers.get_from_desc(&desc)
    4990               3 :     }
    4991                 : }
        

Generated by: LCOV version 2.1-beta