LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant - timeline.rs (source / functions) Coverage Total Hit LBC UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 89.2 % 2754 2457 1 296 2457
Current Date: 2024-01-09 02:06:09 Functions: 64.9 % 385 250 1 134 250
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : pub mod delete;
       2                 : mod eviction_task;
       3                 : mod init;
       4                 : pub mod layer_manager;
       5                 : pub(crate) mod logical_size;
       6                 : pub mod span;
       7                 : pub mod uninit;
       8                 : mod walreceiver;
       9                 : 
      10                 : use anyhow::{anyhow, bail, ensure, Context, Result};
      11                 : use bytes::Bytes;
      12                 : use camino::{Utf8Path, Utf8PathBuf};
      13                 : use enumset::EnumSet;
      14                 : use fail::fail_point;
      15                 : use itertools::Itertools;
      16                 : use pageserver_api::{
      17                 :     models::{
      18                 :         DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, LayerMapInfo,
      19                 :         TimelineState,
      20                 :     },
      21                 :     shard::{ShardIdentity, TenantShardId},
      22                 : };
      23                 : use rand::Rng;
      24                 : use serde_with::serde_as;
      25                 : use storage_broker::BrokerClientChannel;
      26                 : use tokio::{
      27                 :     runtime::Handle,
      28                 :     sync::{oneshot, watch},
      29                 : };
      30                 : use tokio_util::sync::CancellationToken;
      31                 : use tracing::*;
      32                 : use utils::sync::gate::Gate;
      33                 : 
      34                 : use std::collections::{BinaryHeap, HashMap, HashSet};
      35                 : use std::ops::{Deref, Range};
      36                 : use std::pin::pin;
      37                 : use std::sync::atomic::Ordering as AtomicOrdering;
      38                 : use std::sync::{Arc, Mutex, RwLock, Weak};
      39                 : use std::time::{Duration, Instant, SystemTime};
      40                 : use std::{
      41                 :     cmp::{max, min, Ordering},
      42                 :     ops::ControlFlow,
      43                 : };
      44                 : 
      45                 : use crate::context::{
      46                 :     AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
      47                 : };
      48                 : use crate::tenant::storage_layer::delta_layer::DeltaEntry;
      49                 : use crate::tenant::storage_layer::{
      50                 :     AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
      51                 :     LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
      52                 :     ValueReconstructState,
      53                 : };
      54                 : use crate::tenant::tasks::BackgroundLoopKind;
      55                 : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
      56                 : use crate::tenant::{
      57                 :     layer_map::{LayerMap, SearchResult},
      58                 :     metadata::{save_metadata, TimelineMetadata},
      59                 :     par_fsync,
      60                 : };
      61                 : use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
      62                 : 
      63                 : use crate::config::PageServerConf;
      64                 : use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
      65                 : use crate::metrics::{
      66                 :     TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
      67                 : };
      68                 : use crate::pgdatadir_mapping::LsnForTimestamp;
      69                 : use crate::pgdatadir_mapping::{is_inherited_key, is_rel_fsm_block_key, is_rel_vm_block_key};
      70                 : use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
      71                 : use crate::tenant::config::{EvictionPolicy, TenantConfOpt};
      72                 : use pageserver_api::reltag::RelTag;
      73                 : use pageserver_api::shard::ShardIndex;
      74                 : 
      75                 : use postgres_connection::PgConnectionConfig;
      76                 : use postgres_ffi::to_pg_timestamp;
      77                 : use utils::{
      78                 :     completion,
      79                 :     generation::Generation,
      80                 :     id::TimelineId,
      81                 :     lsn::{AtomicLsn, Lsn, RecordLsn},
      82                 :     seqwait::SeqWait,
      83                 :     simple_rcu::{Rcu, RcuReadGuard},
      84                 : };
      85                 : 
      86                 : use crate::page_cache;
      87                 : use crate::repository::GcResult;
      88                 : use crate::repository::{Key, Value};
      89                 : use crate::task_mgr;
      90                 : use crate::task_mgr::TaskKind;
      91                 : use crate::ZERO_PAGE;
      92                 : 
      93                 : use self::delete::DeleteTimelineFlow;
      94                 : pub(super) use self::eviction_task::EvictionTaskTenantState;
      95                 : use self::eviction_task::EvictionTaskTimelineState;
      96                 : use self::layer_manager::LayerManager;
      97                 : use self::logical_size::LogicalSize;
      98                 : use self::walreceiver::{WalReceiver, WalReceiverConf};
      99                 : 
     100                 : use super::config::TenantConf;
     101                 : use super::remote_timeline_client::index::{IndexLayerMetadata, IndexPart};
     102                 : use super::remote_timeline_client::RemoteTimelineClient;
     103                 : use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
     104                 : use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
     105                 : 
     106 CBC          42 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
     107                 : pub(super) enum FlushLoopState {
     108                 :     NotStarted,
     109                 :     Running {
     110                 :         #[cfg(test)]
     111                 :         expect_initdb_optimization: bool,
     112                 :         #[cfg(test)]
     113                 :         initdb_optimization_count: usize,
     114                 :     },
     115                 :     Exited,
     116                 : }
     117                 : 
     118                 : /// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
     119 UBC           0 : #[derive(Debug, Clone, PartialEq, Eq)]
     120                 : pub struct Hole {
     121                 :     key_range: Range<Key>,
     122                 :     coverage_size: usize,
     123                 : }
     124                 : 
     125                 : impl Ord for Hole {
     126 CBC         228 :     fn cmp(&self, other: &Self) -> Ordering {
     127             228 :         other.coverage_size.cmp(&self.coverage_size) // inverse order
     128             228 :     }
     129                 : }
     130                 : 
     131                 : impl PartialOrd for Hole {
     132             228 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     133             228 :         Some(self.cmp(other))
     134             228 :     }
     135                 : }
     136                 : 
     137                 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
     138                 : /// Can be removed after all refactors are done.
     139             283 : fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
     140             283 :     drop(rlock)
     141             283 : }
     142                 : 
     143                 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
     144                 : /// Can be removed after all refactors are done.
     145            1690 : fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
     146            1690 :     drop(rlock)
     147            1690 : }
     148                 : 
     149                 : /// The outward-facing resources required to build a Timeline
     150                 : pub struct TimelineResources {
     151                 :     pub remote_client: Option<RemoteTimelineClient>,
     152                 :     pub deletion_queue_client: DeletionQueueClient,
     153                 : }
     154                 : 
     155                 : pub struct Timeline {
     156                 :     conf: &'static PageServerConf,
     157                 :     tenant_conf: Arc<RwLock<AttachedTenantConf>>,
     158                 : 
     159                 :     myself: Weak<Self>,
     160                 : 
     161                 :     pub(crate) tenant_shard_id: TenantShardId,
     162                 :     pub timeline_id: TimelineId,
     163                 : 
     164                 :     /// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
     165                 :     /// Never changes for the lifetime of this [`Timeline`] object.
     166                 :     ///
     167                 :     /// This duplicates the generation stored in LocationConf, but that structure is mutable:
     168                 :     /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
     169                 :     pub(crate) generation: Generation,
     170                 : 
     171                 :     /// The detailed sharding information from our parent Tenant.  This enables us to map keys
     172                 :     /// to shards, and is constant through the lifetime of this Timeline.
     173                 :     shard_identity: ShardIdentity,
     174                 : 
     175                 :     pub pg_version: u32,
     176                 : 
     177                 :     /// The tuple has two elements.
     178                 :     /// 1. `LayerFileManager` keeps track of the various physical representations of the layer files (inmem, local, remote).
     179                 :     /// 2. `LayerMap`, the acceleration data structure for `get_reconstruct_data`.
     180                 :     ///
     181                 :     /// `LayerMap` maps out the `(PAGE,LSN) / (KEY,LSN)` space, which is composed of `(KeyRange, LsnRange)` rectangles.
     182                 :     /// We describe these rectangles through the `PersistentLayerDesc` struct.
     183                 :     ///
     184                 :     /// When we want to reconstruct a page, we first find the `PersistentLayerDesc`'s that we need for page reconstruction,
     185                 :     /// using `LayerMap`. Then, we use `LayerFileManager` to get the `PersistentLayer`'s that correspond to the
     186                 :     /// `PersistentLayerDesc`'s.
     187                 :     ///
     188                 :     /// Hence, it's important to keep things coherent. The `LayerFileManager` must always have an entry for all
     189                 :     /// `PersistentLayerDesc`'s in the `LayerMap`. If it doesn't, `LayerFileManager::get_from_desc` will panic at
     190                 :     /// runtime, e.g., during page reconstruction.
     191                 :     ///
     192                 :     /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
     193                 :     /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
     194                 :     pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
     195                 : 
     196                 :     /// Set of key ranges which should be covered by image layers to
     197                 :     /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
     198                 :     /// It is used by compaction task when it checks if new image layer should be created.
     199                 :     /// Newly created image layer doesn't help to remove the delta layer, until the
     200                 :     /// newly created image layer falls off the PITR horizon. So on next GC cycle,
     201                 :     /// gc_timeline may still want the new image layer to be created. To avoid redundant
     202                 :     /// image layers creation we should check if image layer exists but beyond PITR horizon.
     203                 :     /// This is why we need remember GC cutoff LSN.
     204                 :     ///
     205                 :     wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
     206                 : 
     207                 :     last_freeze_at: AtomicLsn,
     208                 :     // Atomic would be more appropriate here.
     209                 :     last_freeze_ts: RwLock<Instant>,
     210                 : 
     211                 :     // WAL redo manager
     212                 :     walredo_mgr: Arc<super::WalRedoManager>,
     213                 : 
     214                 :     /// Remote storage client.
     215                 :     /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
     216                 :     pub remote_client: Option<Arc<RemoteTimelineClient>>,
     217                 : 
     218                 :     // What page versions do we hold in the repository? If we get a
     219                 :     // request > last_record_lsn, we need to wait until we receive all
     220                 :     // the WAL up to the request. The SeqWait provides functions for
     221                 :     // that. TODO: If we get a request for an old LSN, such that the
     222                 :     // versions have already been garbage collected away, we should
     223                 :     // throw an error, but we don't track that currently.
     224                 :     //
     225                 :     // last_record_lsn.load().last points to the end of last processed WAL record.
     226                 :     //
     227                 :     // We also remember the starting point of the previous record in
     228                 :     // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
     229                 :     // first WAL record when the node is started up. But here, we just
     230                 :     // keep track of it.
     231                 :     last_record_lsn: SeqWait<RecordLsn, Lsn>,
     232                 : 
     233                 :     // All WAL records have been processed and stored durably on files on
     234                 :     // local disk, up to this LSN. On crash and restart, we need to re-process
     235                 :     // the WAL starting from this point.
     236                 :     //
     237                 :     // Some later WAL records might have been processed and also flushed to disk
     238                 :     // already, so don't be surprised to see some, but there's no guarantee on
     239                 :     // them yet.
     240                 :     disk_consistent_lsn: AtomicLsn,
     241                 : 
     242                 :     // Parent timeline that this timeline was branched from, and the LSN
     243                 :     // of the branch point.
     244                 :     ancestor_timeline: Option<Arc<Timeline>>,
     245                 :     ancestor_lsn: Lsn,
     246                 : 
     247                 :     pub(super) metrics: TimelineMetrics,
     248                 : 
     249                 :     /// Ensures layers aren't frozen by checkpointer between
     250                 :     /// [`Timeline::get_layer_for_write`] and layer reads.
     251                 :     /// Locked automatically by [`TimelineWriter`] and checkpointer.
     252                 :     /// Must always be acquired before the layer map/individual layer lock
     253                 :     /// to avoid deadlock.
     254                 :     write_lock: tokio::sync::Mutex<()>,
     255                 : 
     256                 :     /// Used to avoid multiple `flush_loop` tasks running
     257                 :     pub(super) flush_loop_state: Mutex<FlushLoopState>,
     258                 : 
     259                 :     /// layer_flush_start_tx can be used to wake up the layer-flushing task.
     260                 :     /// The value is a counter, incremented every time a new flush cycle is requested.
     261                 :     /// The flush cycle counter is sent back on the layer_flush_done channel when
     262                 :     /// the flush finishes. You can use that to wait for the flush to finish.
     263                 :     layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
     264                 :     /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
     265                 :     layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
     266                 : 
     267                 :     // Needed to ensure that we can't create a branch at a point that was already garbage collected
     268                 :     pub latest_gc_cutoff_lsn: Rcu<Lsn>,
     269                 : 
     270                 :     // List of child timelines and their branch points. This is needed to avoid
     271                 :     // garbage collecting data that is still needed by the child timelines.
     272                 :     pub gc_info: std::sync::RwLock<GcInfo>,
     273                 : 
     274                 :     // It may change across major versions so for simplicity
     275                 :     // keep it after running initdb for a timeline.
     276                 :     // It is needed in checks when we want to error on some operations
     277                 :     // when they are requested for pre-initdb lsn.
     278                 :     // It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn",
     279                 :     // though let's keep them both for better error visibility.
     280                 :     pub initdb_lsn: Lsn,
     281                 : 
     282                 :     /// When did we last calculate the partitioning?
     283                 :     partitioning: Mutex<(KeyPartitioning, Lsn)>,
     284                 : 
     285                 :     /// Configuration: how often should the partitioning be recalculated.
     286                 :     repartition_threshold: u64,
     287                 : 
     288                 :     /// Current logical size of the "datadir", at the last LSN.
     289                 :     current_logical_size: LogicalSize,
     290                 : 
     291                 :     /// Information about the last processed message by the WAL receiver,
     292                 :     /// or None if WAL receiver has not received anything for this timeline
     293                 :     /// yet.
     294                 :     pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
     295                 :     pub walreceiver: Mutex<Option<WalReceiver>>,
     296                 : 
     297                 :     /// Relation size cache
     298                 :     pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
     299                 : 
     300                 :     download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
     301                 : 
     302                 :     state: watch::Sender<TimelineState>,
     303                 : 
     304                 :     /// Prevent two tasks from deleting the timeline at the same time. If held, the
     305                 :     /// timeline is being deleted. If 'true', the timeline has already been deleted.
     306                 :     pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
     307                 : 
     308                 :     eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
     309                 : 
     310                 :     /// Load or creation time information about the disk_consistent_lsn and when the loading
     311                 :     /// happened. Used for consumption metrics.
     312                 :     pub(crate) loaded_at: (Lsn, SystemTime),
     313                 : 
     314                 :     /// Gate to prevent shutdown completing while I/O is still happening to this timeline's data
     315                 :     pub(crate) gate: Gate,
     316                 : 
     317                 :     /// Cancellation token scoped to this timeline: anything doing long-running work relating
     318                 :     /// to the timeline should drop out when this token fires.
     319                 :     pub(crate) cancel: CancellationToken,
     320                 : 
     321                 :     /// Make sure we only have one running compaction at a time in tests.
     322                 :     ///
     323                 :     /// Must only be taken in two places:
     324                 :     /// - [`Timeline::compact`] (this file)
     325                 :     /// - [`delete::delete_local_layer_files`]
     326                 :     ///
     327                 :     /// Timeline deletion will acquire both compaction and gc locks in whatever order.
     328                 :     compaction_lock: tokio::sync::Mutex<()>,
     329                 : 
     330                 :     /// Make sure we only have one running gc at a time.
     331                 :     ///
     332                 :     /// Must only be taken in two places:
     333                 :     /// - [`Timeline::gc`] (this file)
     334                 :     /// - [`delete::delete_local_layer_files`]
     335                 :     ///
     336                 :     /// Timeline deletion will acquire both compaction and gc locks in whatever order.
     337                 :     gc_lock: tokio::sync::Mutex<()>,
     338                 : }
     339                 : 
     340                 : pub struct WalReceiverInfo {
     341                 :     pub wal_source_connconf: PgConnectionConfig,
     342                 :     pub last_received_msg_lsn: Lsn,
     343                 :     pub last_received_msg_ts: u128,
     344                 : }
     345                 : 
     346                 : ///
     347                 : /// Information about how much history needs to be retained, needed by
     348                 : /// Garbage Collection.
     349                 : ///
     350                 : pub struct GcInfo {
     351                 :     /// Specific LSNs that are needed.
     352                 :     ///
     353                 :     /// Currently, this includes all points where child branches have
     354                 :     /// been forked off from. In the future, could also include
     355                 :     /// explicit user-defined snapshot points.
     356                 :     pub retain_lsns: Vec<Lsn>,
     357                 : 
     358                 :     /// In addition to 'retain_lsns', keep everything newer than this
     359                 :     /// point.
     360                 :     ///
     361                 :     /// This is calculated by subtracting 'gc_horizon' setting from
     362                 :     /// last-record LSN
     363                 :     ///
     364                 :     /// FIXME: is this inclusive or exclusive?
     365                 :     pub horizon_cutoff: Lsn,
     366                 : 
     367                 :     /// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this
     368                 :     /// point.
     369                 :     ///
     370                 :     /// This is calculated by finding a number such that a record is needed for PITR
     371                 :     /// if only if its LSN is larger than 'pitr_cutoff'.
     372                 :     pub pitr_cutoff: Lsn,
     373                 : }
     374                 : 
     375                 : /// An error happened in a get() operation.
     376              31 : #[derive(thiserror::Error, Debug)]
     377                 : pub(crate) enum PageReconstructError {
     378                 :     #[error(transparent)]
     379                 :     Other(#[from] anyhow::Error),
     380                 : 
     381                 :     #[error("Ancestor LSN wait error: {0}")]
     382                 :     AncestorLsnTimeout(#[from] WaitLsnError),
     383                 : 
     384                 :     /// The operation was cancelled
     385                 :     #[error("Cancelled")]
     386                 :     Cancelled,
     387                 : 
     388                 :     /// The ancestor of this is being stopped
     389                 :     #[error("ancestor timeline {0} is being stopped")]
     390                 :     AncestorStopping(TimelineId),
     391                 : 
     392                 :     /// An error happened replaying WAL records
     393                 :     #[error(transparent)]
     394                 :     WalRedo(anyhow::Error),
     395                 : }
     396                 : 
     397 UBC           0 : #[derive(thiserror::Error, Debug)]
     398                 : enum FlushLayerError {
     399                 :     /// Timeline cancellation token was cancelled
     400                 :     #[error("timeline shutting down")]
     401                 :     Cancelled,
     402                 : 
     403                 :     #[error(transparent)]
     404                 :     PageReconstructError(#[from] PageReconstructError),
     405                 : 
     406                 :     #[error(transparent)]
     407                 :     Other(#[from] anyhow::Error),
     408                 : }
     409                 : 
     410               0 : #[derive(Clone, Copy)]
     411                 : pub enum LogicalSizeCalculationCause {
     412                 :     Initial,
     413                 :     ConsumptionMetricsSyntheticSize,
     414                 :     EvictionTaskImitation,
     415                 :     TenantSizeHandler,
     416                 : }
     417                 : 
     418                 : pub enum GetLogicalSizePriority {
     419                 :     User,
     420                 :     Background,
     421                 : }
     422                 : 
     423 CBC         711 : #[derive(enumset::EnumSetType)]
     424                 : pub(crate) enum CompactFlags {
     425                 :     ForceRepartition,
     426                 : }
     427                 : 
     428                 : impl std::fmt::Debug for Timeline {
     429 UBC           0 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
     430               0 :         write!(f, "Timeline<{}>", self.timeline_id)
     431               0 :     }
     432                 : }
     433                 : 
     434 CBC           8 : #[derive(thiserror::Error, Debug)]
     435                 : pub(crate) enum WaitLsnError {
     436                 :     // Called on a timeline which is shutting down
     437                 :     #[error("Shutdown")]
     438                 :     Shutdown,
     439                 : 
     440                 :     // Called on an timeline not in active state or shutting down
     441                 :     #[error("Bad state (not active)")]
     442                 :     BadState,
     443                 : 
     444                 :     // Timeout expired while waiting for LSN to catch up with goal.
     445                 :     #[error("{0}")]
     446                 :     Timeout(String),
     447                 : }
     448                 : 
     449                 : /// Public interface functions
     450                 : impl Timeline {
     451                 :     /// Get the LSN where this branch was created
     452            3643 :     pub fn get_ancestor_lsn(&self) -> Lsn {
     453            3643 :         self.ancestor_lsn
     454            3643 :     }
     455                 : 
     456                 :     /// Get the ancestor's timeline id
     457            5046 :     pub fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
     458            5046 :         self.ancestor_timeline
     459            5046 :             .as_ref()
     460            5046 :             .map(|ancestor| ancestor.timeline_id)
     461            5046 :     }
     462                 : 
     463                 :     /// Lock and get timeline's GC cutoff
     464         3646466 :     pub fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
     465         3646466 :         self.latest_gc_cutoff_lsn.read()
     466         3646466 :     }
     467                 : 
     468                 :     /// Look up given page version.
     469                 :     ///
     470                 :     /// If a remote layer file is needed, it is downloaded as part of this
     471                 :     /// call.
     472                 :     ///
     473                 :     /// NOTE: It is considered an error to 'get' a key that doesn't exist. The
     474                 :     /// abstraction above this needs to store suitable metadata to track what
     475                 :     /// data exists with what keys, in separate metadata entries. If a
     476                 :     /// non-existent key is requested, we may incorrectly return a value from
     477                 :     /// an ancestor branch, for example, or waste a lot of cycles chasing the
     478                 :     /// non-existing key.
     479                 :     ///
     480                 :     /// # Cancel-Safety
     481                 :     ///
     482                 :     /// This method is cancellation-safe.
     483         5939237 :     pub(crate) async fn get(
     484         5939237 :         &self,
     485         5939237 :         key: Key,
     486         5939237 :         lsn: Lsn,
     487         5939237 :         ctx: &RequestContext,
     488         5939237 :     ) -> Result<Bytes, PageReconstructError> {
     489         5939223 :         if !lsn.is_valid() {
     490               1 :             return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
     491         5939222 :         }
     492                 : 
     493                 :         // This check is debug-only because of the cost of hashing, and because it's a double-check: we
     494                 :         // already checked the key against the shard_identity when looking up the Timeline from
     495                 :         // page_service.
     496         5939222 :         debug_assert!(!self.shard_identity.is_key_disposable(&key));
     497                 : 
     498                 :         // XXX: structured stats collection for layer eviction here.
     499 UBC           0 :         trace!(
     500               0 :             "get page request for {}@{} from task kind {:?}",
     501               0 :             key,
     502               0 :             lsn,
     503               0 :             ctx.task_kind()
     504               0 :         );
     505                 : 
     506                 :         // Check the page cache. We will get back the most recent page with lsn <= `lsn`.
     507                 :         // The cached image can be returned directly if there is no WAL between the cached image
     508                 :         // and requested LSN. The cached image can also be used to reduce the amount of WAL needed
     509                 :         // for redo.
     510 CBC     5939222 :         let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
     511         1342450 :             Some((cached_lsn, cached_img)) => {
     512         1342450 :                 match cached_lsn.cmp(&lsn) {
     513         1265902 :                     Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
     514                 :                     Ordering::Equal => {
     515           76548 :                         MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
     516           76548 :                         return Ok(cached_img); // exact LSN match, return the image
     517                 :                     }
     518                 :                     Ordering::Greater => {
     519 UBC           0 :                         unreachable!("the returned lsn should never be after the requested lsn")
     520                 :                     }
     521                 :                 }
     522 CBC     1265902 :                 Some((cached_lsn, cached_img))
     523                 :             }
     524         4596772 :             None => None,
     525                 :         };
     526                 : 
     527         5862674 :         let mut reconstruct_state = ValueReconstructState {
     528         5862674 :             records: Vec::new(),
     529         5862674 :             img: cached_page_img,
     530         5862674 :         };
     531         5862674 : 
     532         5862674 :         let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
     533         5862674 :         let path = self
     534         5862674 :             .get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
     535         2017647 :             .await?;
     536         5861444 :         timer.stop_and_record();
     537         5861444 : 
     538         5861444 :         let start = Instant::now();
     539         5861444 :         let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
     540         5861444 :         let elapsed = start.elapsed();
     541         5861444 :         crate::metrics::RECONSTRUCT_TIME
     542         5861444 :             .for_result(&res)
     543         5861444 :             .observe(elapsed.as_secs_f64());
     544         5861444 : 
     545         5861444 :         if cfg!(feature = "testing") && res.is_err() {
     546                 :             // it can only be walredo issue
     547                 :             use std::fmt::Write;
     548                 : 
     549 UBC           0 :             let mut msg = String::new();
     550               0 : 
     551               0 :             path.into_iter().for_each(|(res, cont_lsn, layer)| {
     552               0 :                 writeln!(
     553               0 :                     msg,
     554               0 :                     "- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
     555               0 :                     layer(),
     556               0 :                 )
     557               0 :                 .expect("string grows")
     558               0 :             });
     559                 : 
     560                 :             // this is to rule out or provide evidence that we could in some cases read a duplicate
     561                 :             // walrecord
     562               0 :             tracing::info!("walredo failed, path:\n{msg}");
     563 CBC     5861444 :         }
     564                 : 
     565         5861444 :         res
     566         5939180 :     }
     567                 : 
     568                 :     /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
     569         6341670 :     pub fn get_last_record_lsn(&self) -> Lsn {
     570         6341670 :         self.last_record_lsn.load().last
     571         6341670 :     }
     572                 : 
     573            2993 :     pub fn get_prev_record_lsn(&self) -> Lsn {
     574            2993 :         self.last_record_lsn.load().prev
     575            2993 :     }
     576                 : 
     577                 :     /// Atomically get both last and prev.
     578             917 :     pub fn get_last_record_rlsn(&self) -> RecordLsn {
     579             917 :         self.last_record_lsn.load()
     580             917 :     }
     581                 : 
     582          570784 :     pub fn get_disk_consistent_lsn(&self) -> Lsn {
     583          570784 :         self.disk_consistent_lsn.load()
     584          570784 :     }
     585                 : 
     586                 :     /// remote_consistent_lsn from the perspective of the tenant's current generation,
     587                 :     /// not validated with control plane yet.
     588                 :     /// See [`Self::get_remote_consistent_lsn_visible`].
     589            2993 :     pub fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
     590            2993 :         if let Some(remote_client) = &self.remote_client {
     591            2993 :             remote_client.remote_consistent_lsn_projected()
     592                 :         } else {
     593 UBC           0 :             None
     594                 :         }
     595 CBC        2993 :     }
     596                 : 
     597                 :     /// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
     598                 :     /// i.e. a value of remote_consistent_lsn_projected which has undergone
     599                 :     /// generation validation in the deletion queue.
     600          568864 :     pub fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
     601          568864 :         if let Some(remote_client) = &self.remote_client {
     602          568864 :             remote_client.remote_consistent_lsn_visible()
     603                 :         } else {
     604 UBC           0 :             None
     605                 :         }
     606 CBC      568864 :     }
     607                 : 
     608                 :     /// The sum of the file size of all historic layers in the layer map.
     609                 :     /// This method makes no distinction between local and remote layers.
     610                 :     /// Hence, the result **does not represent local filesystem usage**.
     611            3443 :     pub async fn layer_size_sum(&self) -> u64 {
     612            3443 :         let guard = self.layers.read().await;
     613            3443 :         let layer_map = guard.layer_map();
     614            3443 :         let mut size = 0;
     615          363345 :         for l in layer_map.iter_historic_layers() {
     616          363345 :             size += l.file_size();
     617          363345 :         }
     618            3443 :         size
     619            3443 :     }
     620                 : 
     621              18 :     pub fn resident_physical_size(&self) -> u64 {
     622              18 :         self.metrics.resident_physical_size_get()
     623              18 :     }
     624                 : 
     625                 :     ///
     626                 :     /// Wait until WAL has been received and processed up to this LSN.
     627                 :     ///
     628                 :     /// You should call this before any of the other get_* or list_* functions. Calling
     629                 :     /// those functions with an LSN that has been processed yet is an error.
     630                 :     ///
     631         1419461 :     pub(crate) async fn wait_lsn(
     632         1419461 :         &self,
     633         1419461 :         lsn: Lsn,
     634         1419461 :         _ctx: &RequestContext, /* Prepare for use by cancellation */
     635         1419461 :     ) -> Result<(), WaitLsnError> {
     636         1419426 :         if self.cancel.is_cancelled() {
     637 UBC           0 :             return Err(WaitLsnError::Shutdown);
     638 CBC     1419426 :         } else if !self.is_active() {
     639 UBC           0 :             return Err(WaitLsnError::BadState);
     640 CBC     1419426 :         }
     641                 : 
     642                 :         // This should never be called from the WAL receiver, because that could lead
     643                 :         // to a deadlock.
     644                 :         debug_assert!(
     645         1419426 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
     646 UBC           0 :             "wait_lsn cannot be called in WAL receiver"
     647                 :         );
     648                 :         debug_assert!(
     649 CBC     1419426 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
     650 UBC           0 :             "wait_lsn cannot be called in WAL receiver"
     651                 :         );
     652                 :         debug_assert!(
     653 CBC     1419426 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
     654 UBC           0 :             "wait_lsn cannot be called in WAL receiver"
     655                 :         );
     656                 : 
     657 CBC     1419426 :         let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
     658         1419426 : 
     659         1419426 :         match self
     660         1419426 :             .last_record_lsn
     661         1419426 :             .wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
     662          103451 :             .await
     663                 :         {
     664         1419422 :             Ok(()) => Ok(()),
     665               4 :             Err(e) => {
     666               4 :                 use utils::seqwait::SeqWaitError::*;
     667               4 :                 match e {
     668 UBC           0 :                     Shutdown => Err(WaitLsnError::Shutdown),
     669                 :                     Timeout => {
     670                 :                         // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
     671 CBC           4 :                         drop(_timer);
     672               4 :                         let walreceiver_status = self.walreceiver_status();
     673               4 :                         Err(WaitLsnError::Timeout(format!(
     674               4 :                         "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
     675               4 :                         lsn,
     676               4 :                         self.get_last_record_lsn(),
     677               4 :                         self.get_disk_consistent_lsn(),
     678               4 :                         walreceiver_status,
     679               4 :                     )))
     680                 :                     }
     681                 :                 }
     682                 :             }
     683                 :         }
     684         1419426 :     }
     685                 : 
     686            2997 :     pub(crate) fn walreceiver_status(&self) -> String {
     687            2997 :         match &*self.walreceiver.lock().unwrap() {
     688             110 :             None => "stopping or stopped".to_string(),
     689            2887 :             Some(walreceiver) => match walreceiver.status() {
     690            1812 :                 Some(status) => status.to_human_readable_string(),
     691            1075 :                 None => "Not active".to_string(),
     692                 :             },
     693                 :         }
     694            2997 :     }
     695                 : 
     696                 :     /// Check that it is valid to request operations with that lsn.
     697             538 :     pub fn check_lsn_is_in_scope(
     698             538 :         &self,
     699             538 :         lsn: Lsn,
     700             538 :         latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
     701             538 :     ) -> anyhow::Result<()> {
     702             538 :         ensure!(
     703             538 :             lsn >= **latest_gc_cutoff_lsn,
     704               9 :             "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
     705               9 :             lsn,
     706               9 :             **latest_gc_cutoff_lsn,
     707                 :         );
     708             529 :         Ok(())
     709             538 :     }
     710                 : 
     711                 :     /// Flush to disk all data that was written with the put_* functions
     712            1561 :     #[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
     713                 :     pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
     714                 :         self.freeze_inmem_layer(false).await;
     715                 :         self.flush_frozen_layers_and_wait().await
     716                 :     }
     717                 : 
     718                 :     /// Outermost timeline compaction operation; downloads needed layers.
     719            1375 :     pub(crate) async fn compact(
     720            1375 :         self: &Arc<Self>,
     721            1375 :         cancel: &CancellationToken,
     722            1375 :         flags: EnumSet<CompactFlags>,
     723            1375 :         ctx: &RequestContext,
     724            1375 :     ) -> Result<(), CompactionError> {
     725            1375 :         // most likely the cancellation token is from background task, but in tests it could be the
     726            1375 :         // request task as well.
     727            1375 : 
     728            1375 :         let prepare = async move {
     729            1375 :             let guard = self.compaction_lock.lock().await;
     730                 : 
     731            1375 :             let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
     732            1375 :                 BackgroundLoopKind::Compaction,
     733            1375 :                 ctx,
     734            1375 :             )
     735 UBC           0 :             .await;
     736                 : 
     737 CBC        1375 :             (guard, permit)
     738            1375 :         };
     739                 : 
     740                 :         // this wait probably never needs any "long time spent" logging, because we already nag if
     741                 :         // compaction task goes over it's period (20s) which is quite often in production.
     742            1375 :         let (_guard, _permit) = tokio::select! {
     743            1375 :             tuple = prepare => { tuple },
     744                 :             _ = self.cancel.cancelled() => return Ok(()),
     745                 :             _ = cancel.cancelled() => return Ok(()),
     746                 :         };
     747                 : 
     748            1375 :         let last_record_lsn = self.get_last_record_lsn();
     749            1375 : 
     750            1375 :         // Last record Lsn could be zero in case the timeline was just created
     751            1375 :         if !last_record_lsn.is_valid() {
     752 UBC           0 :             warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
     753               0 :             return Ok(());
     754 CBC        1375 :         }
     755            1375 : 
     756            1375 :         // High level strategy for compaction / image creation:
     757            1375 :         //
     758            1375 :         // 1. First, calculate the desired "partitioning" of the
     759            1375 :         // currently in-use key space. The goal is to partition the
     760            1375 :         // key space into roughly fixed-size chunks, but also take into
     761            1375 :         // account any existing image layers, and try to align the
     762            1375 :         // chunk boundaries with the existing image layers to avoid
     763            1375 :         // too much churn. Also try to align chunk boundaries with
     764            1375 :         // relation boundaries.  In principle, we don't know about
     765            1375 :         // relation boundaries here, we just deal with key-value
     766            1375 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
     767            1375 :         // map relations into key-value pairs. But in practice we know
     768            1375 :         // that 'field6' is the block number, and the fields 1-5
     769            1375 :         // identify a relation. This is just an optimization,
     770            1375 :         // though.
     771            1375 :         //
     772            1375 :         // 2. Once we know the partitioning, for each partition,
     773            1375 :         // decide if it's time to create a new image layer. The
     774            1375 :         // criteria is: there has been too much "churn" since the last
     775            1375 :         // image layer? The "churn" is fuzzy concept, it's a
     776            1375 :         // combination of too many delta files, or too much WAL in
     777            1375 :         // total in the delta file. Or perhaps: if creating an image
     778            1375 :         // file would allow to delete some older files.
     779            1375 :         //
     780            1375 :         // 3. After that, we compact all level0 delta files if there
     781            1375 :         // are too many of them.  While compacting, we also garbage
     782            1375 :         // collect any page versions that are no longer needed because
     783            1375 :         // of the new image layers we created in step 2.
     784            1375 :         //
     785            1375 :         // TODO: This high level strategy hasn't been implemented yet.
     786            1375 :         // Below are functions compact_level0() and create_image_layers()
     787            1375 :         // but they are a bit ad hoc and don't quite work like it's explained
     788            1375 :         // above. Rewrite it.
     789            1375 : 
     790            1375 :         // Is the timeline being deleted?
     791            1375 :         if self.is_stopping() {
     792 UBC           0 :             trace!("Dropping out of compaction on timeline shutdown");
     793               0 :             return Err(CompactionError::ShuttingDown);
     794 CBC        1375 :         }
     795            1375 : 
     796            1375 :         let target_file_size = self.get_checkpoint_distance();
     797            1375 : 
     798            1375 :         // Define partitioning schema if needed
     799            1375 : 
     800            1375 :         // FIXME: the match should only cover repartitioning, not the next steps
     801            1375 :         match self
     802            1375 :             .repartition(
     803            1375 :                 self.get_last_record_lsn(),
     804            1375 :                 self.get_compaction_target_size(),
     805            1375 :                 flags,
     806            1375 :                 ctx,
     807            1375 :             )
     808          139100 :             .await
     809                 :         {
     810            1372 :             Ok((partitioning, lsn)) => {
     811            1372 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     812            1372 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     813            1372 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     814            1372 :                     .build();
     815            1372 : 
     816            1372 :                 // 2. Compact
     817            1372 :                 let timer = self.metrics.compact_time_histo.start_timer();
     818          270040 :                 self.compact_level0(target_file_size, ctx).await?;
     819            1368 :                 timer.stop_and_record();
     820                 : 
     821                 :                 // 3. Create new image layers for partitions that have been modified
     822                 :                 // "enough".
     823            1368 :                 let layers = self
     824            1368 :                     .create_image_layers(&partitioning, lsn, false, &image_ctx)
     825           49448 :                     .await
     826            1364 :                     .map_err(anyhow::Error::from)?;
     827            1364 :                 if let Some(remote_client) = &self.remote_client {
     828            6958 :                     for layer in layers {
     829            5594 :                         remote_client.schedule_layer_file_upload(layer)?;
     830                 :                     }
     831 UBC           0 :                 }
     832                 : 
     833 CBC        1364 :                 if let Some(remote_client) = &self.remote_client {
     834                 :                     // should any new image layer been created, not uploading index_part will
     835                 :                     // result in a mismatch between remote_physical_size and layermap calculated
     836                 :                     // size, which will fail some tests, but should not be an issue otherwise.
     837            1364 :                     remote_client.schedule_index_upload_for_file_changes()?;
     838 UBC           0 :                 }
     839                 :             }
     840 CBC           1 :             Err(err) => {
     841               1 :                 // no partitioning? This is normal, if the timeline was just created
     842               1 :                 // as an empty timeline. Also in unit tests, when we use the timeline
     843               1 :                 // as a simple key-value store, ignoring the datadir layout. Log the
     844               1 :                 // error but continue.
     845               1 :                 //
     846               1 :                 // Suppress error when it's due to cancellation
     847               1 :                 if !self.cancel.is_cancelled() {
     848 LBC         (2) :                     error!("could not compact, repartitioning keyspace failed: {err:?}");
     849 CBC           1 :                 }
     850                 :             }
     851                 :         };
     852                 : 
     853            1365 :         Ok(())
     854            1365 :     }
     855                 : 
     856                 :     /// Mutate the timeline with a [`TimelineWriter`].
     857         1940253 :     pub async fn writer(&self) -> TimelineWriter<'_> {
     858         1940252 :         TimelineWriter {
     859         1940252 :             tl: self,
     860         1940252 :             _write_guard: self.write_lock.lock().await,
     861                 :         }
     862         1940252 :     }
     863                 : 
     864                 :     /// Check if more than 'checkpoint_distance' of WAL has been accumulated in
     865                 :     /// the in-memory layer, and initiate flushing it if so.
     866                 :     ///
     867                 :     /// Also flush after a period of time without new data -- it helps
     868                 :     /// safekeepers to regard pageserver as caught up and suspend activity.
     869          565871 :     pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
     870          565871 :         let last_lsn = self.get_last_record_lsn();
     871          565114 :         let open_layer_size = {
     872          565871 :             let guard = self.layers.read().await;
     873          565871 :             let layers = guard.layer_map();
     874          565871 :             let Some(open_layer) = layers.open_layer.as_ref() else {
     875             757 :                 return Ok(());
     876                 :             };
     877          565114 :             open_layer.size().await?
     878                 :         };
     879          565114 :         let last_freeze_at = self.last_freeze_at.load();
     880          565114 :         let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
     881          565114 :         let distance = last_lsn.widening_sub(last_freeze_at);
     882          565114 :         // Checkpointing the open layer can be triggered by layer size or LSN range.
     883          565114 :         // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
     884          565114 :         // we want to stay below that with a big margin.  The LSN distance determines how
     885          565114 :         // much WAL the safekeepers need to store.
     886          565114 :         if distance >= self.get_checkpoint_distance().into()
     887          563672 :             || open_layer_size > self.get_checkpoint_distance()
     888          562012 :             || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
     889                 :         {
     890            3102 :             info!(
     891            3102 :                 "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
     892            3102 :                 distance,
     893            3102 :                 open_layer_size,
     894            3102 :                 last_freeze_ts.elapsed()
     895            3102 :             );
     896                 : 
     897            3102 :             self.freeze_inmem_layer(true).await;
     898            3102 :             self.last_freeze_at.store(last_lsn);
     899            3102 :             *(self.last_freeze_ts.write().unwrap()) = Instant::now();
     900            3102 : 
     901            3102 :             // Wake up the layer flusher
     902            3102 :             self.flush_frozen_layers();
     903          562012 :         }
     904          565114 :         Ok(())
     905          565871 :     }
     906                 : 
     907            1102 :     pub fn activate(
     908            1102 :         self: &Arc<Self>,
     909            1102 :         broker_client: BrokerClientChannel,
     910            1102 :         background_jobs_can_start: Option<&completion::Barrier>,
     911            1102 :         ctx: &RequestContext,
     912            1102 :     ) {
     913            1102 :         self.spawn_initial_logical_size_computation_task(ctx);
     914            1102 :         self.launch_wal_receiver(ctx, broker_client);
     915            1102 :         self.set_state(TimelineState::Active);
     916            1102 :         self.launch_eviction_task(background_jobs_can_start);
     917            1102 :     }
     918                 : 
     919                 :     /// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
     920                 :     /// also to remote storage.  This method can easily take multiple seconds for a busy timeline.
     921                 :     ///
     922                 :     /// While we are flushing, we continue to accept read I/O.
     923             243 :     #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
     924                 :     pub(crate) async fn flush_and_shutdown(&self) {
     925                 :         debug_assert_current_span_has_tenant_and_timeline_id();
     926                 : 
     927                 :         // Stop ingesting data, so that we are not still writing to an InMemoryLayer while
     928                 :         // trying to flush
     929 UBC           0 :         tracing::debug!("Waiting for WalReceiverManager...");
     930                 :         task_mgr::shutdown_tasks(
     931                 :             Some(TaskKind::WalReceiverManager),
     932                 :             Some(self.tenant_shard_id),
     933                 :             Some(self.timeline_id),
     934                 :         )
     935                 :         .await;
     936                 : 
     937                 :         // Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
     938                 :         self.last_record_lsn.shutdown();
     939                 : 
     940                 :         // now all writers to InMemory layer are gone, do the final flush if requested
     941                 :         match self.freeze_and_flush().await {
     942                 :             Ok(_) => {
     943                 :                 // drain the upload queue
     944                 :                 if let Some(client) = self.remote_client.as_ref() {
     945                 :                     // if we did not wait for completion here, it might be our shutdown process
     946                 :                     // didn't wait for remote uploads to complete at all, as new tasks can forever
     947                 :                     // be spawned.
     948                 :                     //
     949                 :                     // what is problematic is the shutting down of RemoteTimelineClient, because
     950                 :                     // obviously it does not make sense to stop while we wait for it, but what
     951                 :                     // about corner cases like s3 suddenly hanging up?
     952                 :                     if let Err(e) = client.shutdown().await {
     953                 :                         // Non-fatal.  Shutdown is infallible.  Failures to flush just mean that
     954                 :                         // we have some extra WAL replay to do next time the timeline starts.
     955               0 :                         warn!("failed to flush to remote storage: {e:#}");
     956                 :                     }
     957                 :                 }
     958                 :             }
     959                 :             Err(e) => {
     960                 :                 // Non-fatal.  Shutdown is infallible.  Failures to flush just mean that
     961                 :                 // we have some extra WAL replay to do next time the timeline starts.
     962 CBC          42 :                 warn!("failed to freeze and flush: {e:#}");
     963                 :             }
     964                 :         }
     965                 : 
     966                 :         self.shutdown().await;
     967                 :     }
     968                 : 
     969                 :     /// Shut down immediately, without waiting for any open layers to flush to disk.  This is a subset of
     970                 :     /// the graceful [`Timeline::flush_and_shutdown`] function.
     971             500 :     pub(crate) async fn shutdown(&self) {
     972                 :         // Signal any subscribers to our cancellation token to drop out
     973 UBC           0 :         tracing::debug!("Cancelling CancellationToken");
     974 CBC         500 :         self.cancel.cancel();
     975             500 : 
     976             500 :         // Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
     977             500 :         // while doing so.
     978             500 :         self.last_record_lsn.shutdown();
     979             500 : 
     980             500 :         // Shut down the layer flush task before the remote client, as one depends on the other
     981             500 :         task_mgr::shutdown_tasks(
     982             500 :             Some(TaskKind::LayerFlushTask),
     983             500 :             Some(self.tenant_shard_id),
     984             500 :             Some(self.timeline_id),
     985             500 :         )
     986             425 :         .await;
     987                 : 
     988                 :         // Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
     989                 :         // case our caller wants to use that for a deletion
     990             500 :         if let Some(remote_client) = self.remote_client.as_ref() {
     991             500 :             match remote_client.stop() {
     992             500 :                 Ok(()) => {}
     993 UBC           0 :                 Err(StopError::QueueUninitialized) => {
     994               0 :                     // Shutting down during initialization is legal
     995               0 :                 }
     996                 :             }
     997               0 :         }
     998                 : 
     999               0 :         tracing::debug!("Waiting for tasks...");
    1000                 : 
    1001 CBC         598 :         task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;
    1002                 : 
    1003                 :         // Finally wait until any gate-holders are complete
    1004             500 :         self.gate.close().await;
    1005             500 :     }
    1006                 : 
    1007            1872 :     pub fn set_state(&self, new_state: TimelineState) {
    1008            1872 :         match (self.current_state(), new_state) {
    1009            1872 :             (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
    1010             109 :                 info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
    1011                 :             }
    1012 UBC           0 :             (st, TimelineState::Loading) => {
    1013               0 :                 error!("ignoring transition from {st:?} into Loading state");
    1014                 :             }
    1015 CBC           7 :             (TimelineState::Broken { .. }, new_state) => {
    1016               7 :                 error!("Ignoring state update {new_state:?} for broken timeline");
    1017                 :             }
    1018                 :             (TimelineState::Stopping, TimelineState::Active) => {
    1019 UBC           0 :                 error!("Not activating a Stopping timeline");
    1020                 :             }
    1021 CBC        1756 :             (_, new_state) => {
    1022            1756 :                 self.state.send_replace(new_state);
    1023            1756 :             }
    1024                 :         }
    1025            1872 :     }
    1026                 : 
    1027              18 :     pub fn set_broken(&self, reason: String) {
    1028              18 :         let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
    1029              18 :         let broken_state = TimelineState::Broken {
    1030              18 :             reason,
    1031              18 :             backtrace: backtrace_str,
    1032              18 :         };
    1033              18 :         self.set_state(broken_state);
    1034              18 : 
    1035              18 :         // Although the Broken state is not equivalent to shutdown() (shutdown will be called
    1036              18 :         // later when this tenant is detach or the process shuts down), firing the cancellation token
    1037              18 :         // here avoids the need for other tasks to watch for the Broken state explicitly.
    1038              18 :         self.cancel.cancel();
    1039              18 :     }
    1040                 : 
    1041         1959846 :     pub fn current_state(&self) -> TimelineState {
    1042         1959846 :         self.state.borrow().clone()
    1043         1959846 :     }
    1044                 : 
    1045             780 :     pub fn is_broken(&self) -> bool {
    1046             780 :         matches!(&*self.state.borrow(), TimelineState::Broken { .. })
    1047             780 :     }
    1048                 : 
    1049         1431185 :     pub fn is_active(&self) -> bool {
    1050         1431185 :         self.current_state() == TimelineState::Active
    1051         1431185 :     }
    1052                 : 
    1053            2363 :     pub fn is_stopping(&self) -> bool {
    1054            2363 :         self.current_state() == TimelineState::Stopping
    1055            2363 :     }
    1056                 : 
    1057            1102 :     pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
    1058            1102 :         self.state.subscribe()
    1059            1102 :     }
    1060                 : 
    1061         1199007 :     pub async fn wait_to_become_active(
    1062         1199007 :         &self,
    1063         1199007 :         _ctx: &RequestContext, // Prepare for use by cancellation
    1064         1199007 :     ) -> Result<(), TimelineState> {
    1065         1198972 :         let mut receiver = self.state.subscribe();
    1066         1198999 :         loop {
    1067         1198999 :             let current_state = receiver.borrow().clone();
    1068         1198999 :             match current_state {
    1069                 :                 TimelineState::Loading => {
    1070              27 :                     receiver
    1071              27 :                         .changed()
    1072              27 :                         .await
    1073              27 :                         .expect("holding a reference to self");
    1074                 :                 }
    1075                 :                 TimelineState::Active { .. } => {
    1076         1198968 :                     return Ok(());
    1077                 :                 }
    1078                 :                 TimelineState::Broken { .. } | TimelineState::Stopping => {
    1079                 :                     // There's no chance the timeline can transition back into ::Active
    1080               4 :                     return Err(current_state);
    1081                 :                 }
    1082                 :             }
    1083                 :         }
    1084         1198972 :     }
    1085                 : 
    1086              83 :     pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
    1087              83 :         let guard = self.layers.read().await;
    1088              83 :         let layer_map = guard.layer_map();
    1089              83 :         let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
    1090              83 :         if let Some(open_layer) = &layer_map.open_layer {
    1091               8 :             in_memory_layers.push(open_layer.info());
    1092              75 :         }
    1093              83 :         for frozen_layer in &layer_map.frozen_layers {
    1094 UBC           0 :             in_memory_layers.push(frozen_layer.info());
    1095               0 :         }
    1096                 : 
    1097 CBC          83 :         let mut historic_layers = Vec::new();
    1098            2974 :         for historic_layer in layer_map.iter_historic_layers() {
    1099            2974 :             let historic_layer = guard.get_from_desc(&historic_layer);
    1100            2974 :             historic_layers.push(historic_layer.info(reset));
    1101            2974 :         }
    1102                 : 
    1103              83 :         LayerMapInfo {
    1104              83 :             in_memory_layers,
    1105              83 :             historic_layers,
    1106              83 :         }
    1107              83 :     }
    1108                 : 
    1109               2 :     #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
    1110                 :     pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
    1111                 :         let Some(layer) = self.find_layer(layer_file_name).await else {
    1112                 :             return Ok(None);
    1113                 :         };
    1114                 : 
    1115                 :         if self.remote_client.is_none() {
    1116                 :             return Ok(Some(false));
    1117                 :         }
    1118                 : 
    1119                 :         layer.download().await?;
    1120                 : 
    1121                 :         Ok(Some(true))
    1122                 :     }
    1123                 : 
    1124                 :     /// Evict just one layer.
    1125                 :     ///
    1126                 :     /// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`.
    1127            2245 :     pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
    1128            2245 :         let _gate = self
    1129            2245 :             .gate
    1130            2245 :             .enter()
    1131            2245 :             .map_err(|_| anyhow::anyhow!("Shutting down"))?;
    1132                 : 
    1133            2245 :         let Some(local_layer) = self.find_layer(layer_file_name).await else {
    1134 UBC           0 :             return Ok(None);
    1135                 :         };
    1136                 : 
    1137 CBC        2245 :         let rtc = self
    1138            2245 :             .remote_client
    1139            2245 :             .as_ref()
    1140            2245 :             .ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?;
    1141                 : 
    1142            2245 :         match local_layer.evict_and_wait(rtc).await {
    1143            2245 :             Ok(()) => Ok(Some(true)),
    1144 UBC           0 :             Err(EvictionError::NotFound) => Ok(Some(false)),
    1145               0 :             Err(EvictionError::Downloaded) => Ok(Some(false)),
    1146                 :         }
    1147 CBC        2245 :     }
    1148                 : }
    1149                 : 
    1150                 : /// Number of times we will compute partition within a checkpoint distance.
    1151                 : const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
    1152                 : 
    1153                 : // Private functions
    1154                 : impl Timeline {
    1155         1131451 :     fn get_checkpoint_distance(&self) -> u64 {
    1156         1131451 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1157         1131451 :         tenant_conf
    1158         1131451 :             .checkpoint_distance
    1159         1131451 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
    1160         1131451 :     }
    1161                 : 
    1162          562012 :     fn get_checkpoint_timeout(&self) -> Duration {
    1163          562012 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1164          562012 :         tenant_conf
    1165          562012 :             .checkpoint_timeout
    1166          562012 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
    1167          562012 :     }
    1168                 : 
    1169            1419 :     fn get_compaction_target_size(&self) -> u64 {
    1170            1419 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1171            1419 :         tenant_conf
    1172            1419 :             .compaction_target_size
    1173            1419 :             .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
    1174            1419 :     }
    1175                 : 
    1176            1372 :     fn get_compaction_threshold(&self) -> usize {
    1177            1372 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1178            1372 :         tenant_conf
    1179            1372 :             .compaction_threshold
    1180            1372 :             .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
    1181            1372 :     }
    1182                 : 
    1183           35656 :     fn get_image_creation_threshold(&self) -> usize {
    1184           35656 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1185           35656 :         tenant_conf
    1186           35656 :             .image_creation_threshold
    1187           35656 :             .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
    1188           35656 :     }
    1189                 : 
    1190            2484 :     fn get_eviction_policy(&self) -> EvictionPolicy {
    1191            2484 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1192            2484 :         tenant_conf
    1193            2484 :             .eviction_policy
    1194            2484 :             .unwrap_or(self.conf.default_tenant_conf.eviction_policy)
    1195            2484 :     }
    1196                 : 
    1197            1342 :     fn get_evictions_low_residence_duration_metric_threshold(
    1198            1342 :         tenant_conf: &TenantConfOpt,
    1199            1342 :         default_tenant_conf: &TenantConf,
    1200            1342 :     ) -> Duration {
    1201            1342 :         tenant_conf
    1202            1342 :             .evictions_low_residence_duration_metric_threshold
    1203            1342 :             .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
    1204            1342 :     }
    1205                 : 
    1206           13131 :     fn get_gc_feedback(&self) -> bool {
    1207           13131 :         let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
    1208           13131 :         tenant_conf
    1209           13131 :             .gc_feedback
    1210           13131 :             .unwrap_or(self.conf.default_tenant_conf.gc_feedback)
    1211           13131 :     }
    1212                 : 
    1213              52 :     pub(super) fn tenant_conf_updated(&self) {
    1214              52 :         // NB: Most tenant conf options are read by background loops, so,
    1215              52 :         // changes will automatically be picked up.
    1216              52 : 
    1217              52 :         // The threshold is embedded in the metric. So, we need to update it.
    1218              52 :         {
    1219              52 :             let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
    1220              52 :                 &self.tenant_conf.read().unwrap().tenant_conf,
    1221              52 :                 &self.conf.default_tenant_conf,
    1222              52 :             );
    1223              52 : 
    1224              52 :             let tenant_id_str = self.tenant_shard_id.tenant_id.to_string();
    1225              52 :             let shard_id_str = format!("{}", self.tenant_shard_id.shard_slug());
    1226              52 : 
    1227              52 :             let timeline_id_str = self.timeline_id.to_string();
    1228              52 :             self.metrics
    1229              52 :                 .evictions_with_low_residence_duration
    1230              52 :                 .write()
    1231              52 :                 .unwrap()
    1232              52 :                 .change_threshold(
    1233              52 :                     &tenant_id_str,
    1234              52 :                     &shard_id_str,
    1235              52 :                     &timeline_id_str,
    1236              52 :                     new_threshold,
    1237              52 :                 );
    1238              52 :         }
    1239              52 :     }
    1240                 : 
    1241                 :     /// Open a Timeline handle.
    1242                 :     ///
    1243                 :     /// Loads the metadata for the timeline into memory, but not the layer map.
    1244                 :     #[allow(clippy::too_many_arguments)]
    1245            1290 :     pub(super) fn new(
    1246            1290 :         conf: &'static PageServerConf,
    1247            1290 :         tenant_conf: Arc<RwLock<AttachedTenantConf>>,
    1248            1290 :         metadata: &TimelineMetadata,
    1249            1290 :         ancestor: Option<Arc<Timeline>>,
    1250            1290 :         timeline_id: TimelineId,
    1251            1290 :         tenant_shard_id: TenantShardId,
    1252            1290 :         generation: Generation,
    1253            1290 :         shard_identity: ShardIdentity,
    1254            1290 :         walredo_mgr: Arc<super::WalRedoManager>,
    1255            1290 :         resources: TimelineResources,
    1256            1290 :         pg_version: u32,
    1257            1290 :         state: TimelineState,
    1258            1290 :         cancel: CancellationToken,
    1259            1290 :     ) -> Arc<Self> {
    1260            1290 :         let disk_consistent_lsn = metadata.disk_consistent_lsn();
    1261            1290 :         let (state, _) = watch::channel(state);
    1262            1290 : 
    1263            1290 :         let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
    1264            1290 :         let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
    1265            1290 : 
    1266            1290 :         let tenant_conf_guard = tenant_conf.read().unwrap();
    1267            1290 : 
    1268            1290 :         let evictions_low_residence_duration_metric_threshold =
    1269            1290 :             Self::get_evictions_low_residence_duration_metric_threshold(
    1270            1290 :                 &tenant_conf_guard.tenant_conf,
    1271            1290 :                 &conf.default_tenant_conf,
    1272            1290 :             );
    1273            1290 :         drop(tenant_conf_guard);
    1274            1290 : 
    1275            1290 :         Arc::new_cyclic(|myself| {
    1276            1290 :             let mut result = Timeline {
    1277            1290 :                 conf,
    1278            1290 :                 tenant_conf,
    1279            1290 :                 myself: myself.clone(),
    1280            1290 :                 timeline_id,
    1281            1290 :                 tenant_shard_id,
    1282            1290 :                 generation,
    1283            1290 :                 shard_identity,
    1284            1290 :                 pg_version,
    1285            1290 :                 layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())),
    1286            1290 :                 wanted_image_layers: Mutex::new(None),
    1287            1290 : 
    1288            1290 :                 walredo_mgr,
    1289            1290 :                 walreceiver: Mutex::new(None),
    1290            1290 : 
    1291            1290 :                 remote_client: resources.remote_client.map(Arc::new),
    1292            1290 : 
    1293            1290 :                 // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
    1294            1290 :                 last_record_lsn: SeqWait::new(RecordLsn {
    1295            1290 :                     last: disk_consistent_lsn,
    1296            1290 :                     prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
    1297            1290 :                 }),
    1298            1290 :                 disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
    1299            1290 : 
    1300            1290 :                 last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
    1301            1290 :                 last_freeze_ts: RwLock::new(Instant::now()),
    1302            1290 : 
    1303            1290 :                 loaded_at: (disk_consistent_lsn, SystemTime::now()),
    1304            1290 : 
    1305            1290 :                 ancestor_timeline: ancestor,
    1306            1290 :                 ancestor_lsn: metadata.ancestor_lsn(),
    1307            1290 : 
    1308            1290 :                 metrics: TimelineMetrics::new(
    1309            1290 :                     &tenant_shard_id,
    1310            1290 :                     &timeline_id,
    1311            1290 :                     crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
    1312            1290 :                         "mtime",
    1313            1290 :                         evictions_low_residence_duration_metric_threshold,
    1314            1290 :                     ),
    1315            1290 :                 ),
    1316            1290 : 
    1317            1290 :                 flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
    1318            1290 : 
    1319            1290 :                 layer_flush_start_tx,
    1320            1290 :                 layer_flush_done_tx,
    1321            1290 : 
    1322            1290 :                 write_lock: tokio::sync::Mutex::new(()),
    1323            1290 : 
    1324            1290 :                 gc_info: std::sync::RwLock::new(GcInfo {
    1325            1290 :                     retain_lsns: Vec::new(),
    1326            1290 :                     horizon_cutoff: Lsn(0),
    1327            1290 :                     pitr_cutoff: Lsn(0),
    1328            1290 :                 }),
    1329            1290 : 
    1330            1290 :                 latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
    1331            1290 :                 initdb_lsn: metadata.initdb_lsn(),
    1332            1290 : 
    1333            1290 :                 current_logical_size: if disk_consistent_lsn.is_valid() {
    1334                 :                     // we're creating timeline data with some layer files existing locally,
    1335                 :                     // need to recalculate timeline's logical size based on data in the layers.
    1336             718 :                     LogicalSize::deferred_initial(disk_consistent_lsn)
    1337                 :                 } else {
    1338                 :                     // we're creating timeline data without any layers existing locally,
    1339                 :                     // initial logical size is 0.
    1340             572 :                     LogicalSize::empty_initial()
    1341                 :                 },
    1342            1290 :                 partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
    1343            1290 :                 repartition_threshold: 0,
    1344            1290 : 
    1345            1290 :                 last_received_wal: Mutex::new(None),
    1346            1290 :                 rel_size_cache: RwLock::new(HashMap::new()),
    1347            1290 : 
    1348            1290 :                 download_all_remote_layers_task_info: RwLock::new(None),
    1349            1290 : 
    1350            1290 :                 state,
    1351            1290 : 
    1352            1290 :                 eviction_task_timeline_state: tokio::sync::Mutex::new(
    1353            1290 :                     EvictionTaskTimelineState::default(),
    1354            1290 :                 ),
    1355            1290 :                 delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
    1356            1290 : 
    1357            1290 :                 cancel,
    1358            1290 :                 gate: Gate::new(format!("Timeline<{tenant_shard_id}/{timeline_id}>")),
    1359            1290 : 
    1360            1290 :                 compaction_lock: tokio::sync::Mutex::default(),
    1361            1290 :                 gc_lock: tokio::sync::Mutex::default(),
    1362            1290 :             };
    1363            1290 :             result.repartition_threshold =
    1364            1290 :                 result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
    1365            1290 :             result
    1366            1290 :                 .metrics
    1367            1290 :                 .last_record_gauge
    1368            1290 :                 .set(disk_consistent_lsn.0 as i64);
    1369            1290 :             result
    1370            1290 :         })
    1371            1290 :     }
    1372                 : 
    1373            1837 :     pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
    1374            1837 :         let Ok(guard) = self.gate.enter() else {
    1375 UBC           0 :             info!("cannot start flush loop when the timeline gate has already been closed");
    1376               0 :             return;
    1377                 :         };
    1378 CBC        1837 :         let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
    1379            1837 :         match *flush_loop_state {
    1380            1274 :             FlushLoopState::NotStarted => (),
    1381                 :             FlushLoopState::Running { .. } => {
    1382             563 :                 info!(
    1383             563 :                     "skipping attempt to start flush_loop twice {}/{}",
    1384             563 :                     self.tenant_shard_id, self.timeline_id
    1385             563 :                 );
    1386             563 :                 return;
    1387                 :             }
    1388                 :             FlushLoopState::Exited => {
    1389 UBC           0 :                 warn!(
    1390               0 :                     "ignoring attempt to restart exited flush_loop {}/{}",
    1391               0 :                     self.tenant_shard_id, self.timeline_id
    1392               0 :                 );
    1393               0 :                 return;
    1394                 :             }
    1395                 :         }
    1396                 : 
    1397 CBC        1274 :         let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
    1398            1274 :         let self_clone = Arc::clone(self);
    1399            1274 : 
    1400            1274 :         debug!("spawning flush loop");
    1401            1274 :         *flush_loop_state = FlushLoopState::Running {
    1402            1274 :             #[cfg(test)]
    1403            1274 :             expect_initdb_optimization: false,
    1404            1274 :             #[cfg(test)]
    1405            1274 :             initdb_optimization_count: 0,
    1406            1274 :         };
    1407            1274 :         task_mgr::spawn(
    1408            1274 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    1409            1274 :             task_mgr::TaskKind::LayerFlushTask,
    1410            1274 :             Some(self.tenant_shard_id),
    1411            1274 :             Some(self.timeline_id),
    1412            1274 :             "layer flush task",
    1413                 :             false,
    1414            1274 :             async move {
    1415            1274 :                 let _guard = guard;
    1416            1274 :                 let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
    1417           22229 :                 self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
    1418             497 :                 let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
    1419             497 :                 assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
    1420             497 :                 *flush_loop_state  = FlushLoopState::Exited;
    1421             497 :                 Ok(())
    1422             497 :             }
    1423            1274 :             .instrument(info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
    1424                 :         );
    1425            1837 :     }
    1426                 : 
    1427                 :     /// Creates and starts the wal receiver.
    1428                 :     ///
    1429                 :     /// This function is expected to be called at most once per Timeline's lifecycle
    1430                 :     /// when the timeline is activated.
    1431            1102 :     fn launch_wal_receiver(
    1432            1102 :         self: &Arc<Self>,
    1433            1102 :         ctx: &RequestContext,
    1434            1102 :         broker_client: BrokerClientChannel,
    1435            1102 :     ) {
    1436            1102 :         info!(
    1437            1102 :             "launching WAL receiver for timeline {} of tenant {}",
    1438            1102 :             self.timeline_id, self.tenant_shard_id
    1439            1102 :         );
    1440                 : 
    1441            1102 :         let tenant_conf_guard = self.tenant_conf.read().unwrap();
    1442            1102 :         let wal_connect_timeout = tenant_conf_guard
    1443            1102 :             .tenant_conf
    1444            1102 :             .walreceiver_connect_timeout
    1445            1102 :             .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
    1446            1102 :         let lagging_wal_timeout = tenant_conf_guard
    1447            1102 :             .tenant_conf
    1448            1102 :             .lagging_wal_timeout
    1449            1102 :             .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
    1450            1102 :         let max_lsn_wal_lag = tenant_conf_guard
    1451            1102 :             .tenant_conf
    1452            1102 :             .max_lsn_wal_lag
    1453            1102 :             .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
    1454            1102 :         drop(tenant_conf_guard);
    1455            1102 : 
    1456            1102 :         let mut guard = self.walreceiver.lock().unwrap();
    1457            1102 :         assert!(
    1458            1102 :             guard.is_none(),
    1459 UBC           0 :             "multiple launches / re-launches of WAL receiver are not supported"
    1460                 :         );
    1461 CBC        1102 :         *guard = Some(WalReceiver::start(
    1462            1102 :             Arc::clone(self),
    1463            1102 :             WalReceiverConf {
    1464            1102 :                 wal_connect_timeout,
    1465            1102 :                 lagging_wal_timeout,
    1466            1102 :                 max_lsn_wal_lag,
    1467            1102 :                 auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
    1468            1102 :                 availability_zone: self.conf.availability_zone.clone(),
    1469            1102 :                 ingest_batch_size: self.conf.ingest_batch_size,
    1470            1102 :             },
    1471            1102 :             broker_client,
    1472            1102 :             ctx,
    1473            1102 :         ));
    1474            1102 :     }
    1475                 : 
    1476                 :     /// Initialize with an empty layer map. Used when creating a new timeline.
    1477             921 :     pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
    1478             921 :         let mut layers = self.layers.try_write().expect(
    1479             921 :             "in the context where we call this function, no other task has access to the object",
    1480             921 :         );
    1481             921 :         layers.initialize_empty(Lsn(start_lsn.0));
    1482             921 :     }
    1483                 : 
    1484                 :     /// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
    1485                 :     /// files.
    1486             357 :     pub(super) async fn load_layer_map(
    1487             357 :         &self,
    1488             357 :         disk_consistent_lsn: Lsn,
    1489             357 :         index_part: Option<IndexPart>,
    1490             357 :     ) -> anyhow::Result<()> {
    1491                 :         use init::{Decision::*, Discovered, DismissedLayer};
    1492                 :         use LayerFileName::*;
    1493                 : 
    1494             357 :         let mut guard = self.layers.write().await;
    1495                 : 
    1496             357 :         let timer = self.metrics.load_layer_map_histo.start_timer();
    1497             357 : 
    1498             357 :         // Scan timeline directory and create ImageFileName and DeltaFilename
    1499             357 :         // structs representing all files on disk
    1500             357 :         let timeline_path = self
    1501             357 :             .conf
    1502             357 :             .timeline_path(&self.tenant_shard_id, &self.timeline_id);
    1503             357 :         let conf = self.conf;
    1504             357 :         let span = tracing::Span::current();
    1505             357 : 
    1506             357 :         // Copy to move into the task we're about to spawn
    1507             357 :         let generation = self.generation;
    1508             357 :         let shard = self.get_shard_index();
    1509             357 :         let this = self.myself.upgrade().expect("&self method holds the arc");
    1510                 : 
    1511             357 :         let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
    1512             357 :             move || {
    1513             357 :                 let _g = span.entered();
    1514             357 :                 let discovered = init::scan_timeline_dir(&timeline_path)?;
    1515             357 :                 let mut discovered_layers = Vec::with_capacity(discovered.len());
    1516             357 :                 let mut unrecognized_files = Vec::new();
    1517             357 : 
    1518             357 :                 let mut path = timeline_path;
    1519                 : 
    1520           15490 :                 for discovered in discovered {
    1521           15133 :                     let (name, kind) = match discovered {
    1522           14717 :                         Discovered::Layer(file_name, file_size) => {
    1523           14717 :                             discovered_layers.push((file_name, file_size));
    1524           14717 :                             continue;
    1525                 :                         }
    1526                 :                         Discovered::Metadata | Discovered::IgnoredBackup => {
    1527             357 :                             continue;
    1528                 :                         }
    1529 UBC           0 :                         Discovered::Unknown(file_name) => {
    1530               0 :                             // we will later error if there are any
    1531               0 :                             unrecognized_files.push(file_name);
    1532               0 :                             continue;
    1533                 :                         }
    1534 CBC          51 :                         Discovered::Ephemeral(name) => (name, "old ephemeral file"),
    1535               6 :                         Discovered::Temporary(name) => (name, "temporary timeline file"),
    1536               2 :                         Discovered::TemporaryDownload(name) => (name, "temporary download"),
    1537                 :                     };
    1538              59 :                     path.push(Utf8Path::new(&name));
    1539              59 :                     init::cleanup(&path, kind)?;
    1540              59 :                     path.pop();
    1541                 :                 }
    1542                 : 
    1543             357 :                 if !unrecognized_files.is_empty() {
    1544                 :                     // assume that if there are any there are many many.
    1545 UBC           0 :                     let n = unrecognized_files.len();
    1546               0 :                     let first = &unrecognized_files[..n.min(10)];
    1547               0 :                     anyhow::bail!(
    1548               0 :                         "unrecognized files in timeline dir (total {n}), first 10: {first:?}"
    1549               0 :                     );
    1550 CBC         357 :                 }
    1551             357 : 
    1552             357 :                 let decided = init::reconcile(
    1553             357 :                     discovered_layers,
    1554             357 :                     index_part.as_ref(),
    1555             357 :                     disk_consistent_lsn,
    1556             357 :                     generation,
    1557             357 :                     shard,
    1558             357 :                 );
    1559             357 : 
    1560             357 :                 let mut loaded_layers = Vec::new();
    1561             357 :                 let mut needs_cleanup = Vec::new();
    1562             357 :                 let mut total_physical_size = 0;
    1563                 : 
    1564           58794 :                 for (name, decision) in decided {
    1565           58437 :                     let decision = match decision {
    1566           13559 :                         Ok(UseRemote { local, remote }) => {
    1567           13559 :                             // Remote is authoritative, but we may still choose to retain
    1568           13559 :                             // the local file if the contents appear to match
    1569           13559 :                             if local.file_size() == remote.file_size() {
    1570                 :                                 // Use the local file, but take the remote metadata so that we pick up
    1571                 :                                 // the correct generation.
    1572           13558 :                                 UseLocal(remote)
    1573                 :                             } else {
    1574               1 :                                 path.push(name.file_name());
    1575               1 :                                 init::cleanup_local_file_for_remote(&path, &local, &remote)?;
    1576               1 :                                 path.pop();
    1577               1 :                                 UseRemote { local, remote }
    1578                 :                             }
    1579                 :                         }
    1580           44242 :                         Ok(decision) => decision,
    1581               3 :                         Err(DismissedLayer::Future { local }) => {
    1582               3 :                             if local.is_some() {
    1583               2 :                                 path.push(name.file_name());
    1584               2 :                                 init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
    1585               2 :                                 path.pop();
    1586               1 :                             }
    1587               3 :                             needs_cleanup.push(name);
    1588               3 :                             continue;
    1589                 :                         }
    1590             633 :                         Err(DismissedLayer::LocalOnly(local)) => {
    1591             633 :                             path.push(name.file_name());
    1592             633 :                             init::cleanup_local_only_file(&path, &name, &local)?;
    1593             633 :                             path.pop();
    1594             633 :                             // this file never existed remotely, we will have to do rework
    1595             633 :                             continue;
    1596                 :                         }
    1597                 :                     };
    1598                 : 
    1599           57801 :                     match &name {
    1600           24440 :                         Delta(d) => assert!(d.lsn_range.end <= disk_consistent_lsn + 1),
    1601           33361 :                         Image(i) => assert!(i.lsn <= disk_consistent_lsn),
    1602                 :                     }
    1603                 : 
    1604           57801 :                     tracing::debug!(layer=%name, ?decision, "applied");
    1605                 : 
    1606           57801 :                     let layer = match decision {
    1607           14081 :                         UseLocal(m) => {
    1608           14081 :                             total_physical_size += m.file_size();
    1609           14081 :                             Layer::for_resident(conf, &this, name, m).drop_eviction_guard()
    1610                 :                         }
    1611           43719 :                         Evicted(remote) | UseRemote { remote, .. } => {
    1612           43720 :                             Layer::for_evicted(conf, &this, name, remote)
    1613                 :                         }
    1614                 :                     };
    1615                 : 
    1616           57801 :                     loaded_layers.push(layer);
    1617                 :                 }
    1618             357 :                 Ok((loaded_layers, needs_cleanup, total_physical_size))
    1619             357 :             }
    1620             357 :         })
    1621             351 :         .await
    1622             357 :         .map_err(anyhow::Error::new)
    1623             357 :         .and_then(|x| x)?;
    1624                 : 
    1625             357 :         let num_layers = loaded_layers.len();
    1626             357 : 
    1627             357 :         guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
    1628                 : 
    1629             357 :         if let Some(rtc) = self.remote_client.as_ref() {
    1630             357 :             rtc.schedule_layer_file_deletion(&needs_cleanup)?;
    1631             357 :             rtc.schedule_index_upload_for_file_changes()?;
    1632                 :             // This barrier orders above DELETEs before any later operations.
    1633                 :             // This is critical because code executing after the barrier might
    1634                 :             // create again objects with the same key that we just scheduled for deletion.
    1635                 :             // For example, if we just scheduled deletion of an image layer "from the future",
    1636                 :             // later compaction might run again and re-create the same image layer.
    1637                 :             // "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
    1638                 :             // "same" here means same key range and LSN.
    1639                 :             //
    1640                 :             // Without a barrier between above DELETEs and the re-creation's PUTs,
    1641                 :             // the upload queue may execute the PUT first, then the DELETE.
    1642                 :             // In our example, we will end up with an IndexPart referencing a non-existent object.
    1643                 :             //
    1644                 :             // 1. a future image layer is created and uploaded
    1645                 :             // 2. ps restart
    1646                 :             // 3. the future layer from (1) is deleted during load layer map
    1647                 :             // 4. image layer is re-created and uploaded
    1648                 :             // 5. deletion queue would like to delete (1) but actually deletes (4)
    1649                 :             // 6. delete by name works as expected, but it now deletes the wrong (later) version
    1650                 :             //
    1651                 :             // See https://github.com/neondatabase/neon/issues/5878
    1652                 :             //
    1653                 :             // NB: generation numbers naturally protect against this because they disambiguate
    1654                 :             //     (1) and (4)
    1655             357 :             rtc.schedule_barrier()?;
    1656                 :             // Tenant::create_timeline will wait for these uploads to happen before returning, or
    1657                 :             // on retry.
    1658 UBC           0 :         }
    1659                 : 
    1660 CBC         357 :         info!(
    1661             357 :             "loaded layer map with {} layers at {}, total physical size: {}",
    1662             357 :             num_layers, disk_consistent_lsn, total_physical_size
    1663             357 :         );
    1664                 : 
    1665             357 :         timer.stop_and_record();
    1666             357 :         Ok(())
    1667             357 :     }
    1668                 : 
    1669                 :     /// Retrieve current logical size of the timeline.
    1670                 :     ///
    1671                 :     /// The size could be lagging behind the actual number, in case
    1672                 :     /// the initial size calculation has not been run (gets triggered on the first size access).
    1673                 :     ///
    1674                 :     /// return size and boolean flag that shows if the size is exact
    1675          568881 :     pub(crate) fn get_current_logical_size(
    1676          568881 :         self: &Arc<Self>,
    1677          568881 :         priority: GetLogicalSizePriority,
    1678          568881 :         ctx: &RequestContext,
    1679          568881 :     ) -> logical_size::CurrentLogicalSize {
    1680          568881 :         let current_size = self.current_logical_size.current_size();
    1681          568881 :         debug!("Current size: {current_size:?}");
    1682                 : 
    1683          568881 :         match (current_size.accuracy(), priority) {
    1684          568015 :             (logical_size::Accuracy::Exact, _) => (), // nothing to do
    1685 UBC           0 :             (logical_size::Accuracy::Approximate, GetLogicalSizePriority::Background) => {
    1686               0 :                 // background task will eventually deliver an exact value, we're in no rush
    1687               0 :             }
    1688                 :             (logical_size::Accuracy::Approximate, GetLogicalSizePriority::User) => {
    1689                 :                 // background task is not ready, but user is asking for it now;
    1690                 :                 // => make the background task skip the line
    1691                 :                 // (The alternative would be to calculate the size here, but,
    1692                 :                 //  it can actually take a long time if the user has a lot of rels.
    1693                 :                 //  And we'll inevitable need it again; So, let the background task do the work.)
    1694 CBC         866 :                 match self
    1695             866 :                     .current_logical_size
    1696             866 :                     .cancel_wait_for_background_loop_concurrency_limit_semaphore
    1697             866 :                     .get()
    1698                 :                 {
    1699             853 :                     Some(cancel) => cancel.cancel(),
    1700                 :                     None => {
    1701              13 :                         let state = self.current_state();
    1702 UBC           0 :                         if matches!(
    1703 CBC          13 :                             state,
    1704                 :                             TimelineState::Broken { .. } | TimelineState::Stopping
    1705              13 :                         ) {
    1706              13 : 
    1707              13 :                             // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
    1708              13 :                             // Don't make noise.
    1709              13 :                         } else {
    1710 UBC           0 :                             warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work");
    1711                 :                         }
    1712                 :                     }
    1713                 :                 };
    1714                 :             }
    1715                 :         }
    1716                 : 
    1717 CBC      568881 :         if let CurrentLogicalSize::Approximate(_) = &current_size {
    1718             866 :             if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler {
    1719             439 :                 let first = self
    1720             439 :                     .current_logical_size
    1721             439 :                     .did_return_approximate_to_walreceiver
    1722             439 :                     .compare_exchange(
    1723             439 :                         false,
    1724             439 :                         true,
    1725             439 :                         AtomicOrdering::Relaxed,
    1726             439 :                         AtomicOrdering::Relaxed,
    1727             439 :                     )
    1728             439 :                     .is_ok();
    1729             439 :                 if first {
    1730              82 :                     crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc();
    1731             357 :                 }
    1732             427 :             }
    1733          568015 :         }
    1734                 : 
    1735          568881 :         current_size
    1736          568881 :     }
    1737                 : 
    1738            1102 :     fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &RequestContext) {
    1739            1102 :         let Some(initial_part_end) = self.current_logical_size.initial_part_end else {
    1740                 :             // nothing to do for freshly created timelines;
    1741             526 :             assert_eq!(
    1742             526 :                 self.current_logical_size.current_size().accuracy(),
    1743             526 :                 logical_size::Accuracy::Exact,
    1744             526 :             );
    1745             526 :             self.current_logical_size.initialized.add_permits(1);
    1746             526 :             return;
    1747                 :         };
    1748                 : 
    1749             576 :         let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new();
    1750             576 :         let token = cancel_wait_for_background_loop_concurrency_limit_semaphore.clone();
    1751             576 :         self.current_logical_size
    1752             576 :             .cancel_wait_for_background_loop_concurrency_limit_semaphore.set(token)
    1753             576 :             .expect("initial logical size calculation task must be spawned exactly once per Timeline object");
    1754             576 : 
    1755             576 :         let self_clone = Arc::clone(self);
    1756             576 :         let background_ctx = ctx.detached_child(
    1757             576 :             TaskKind::InitialLogicalSizeCalculation,
    1758             576 :             DownloadBehavior::Download,
    1759             576 :         );
    1760             576 :         task_mgr::spawn(
    1761             576 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    1762             576 :             task_mgr::TaskKind::InitialLogicalSizeCalculation,
    1763             576 :             Some(self.tenant_shard_id),
    1764             576 :             Some(self.timeline_id),
    1765             576 :             "initial size calculation",
    1766                 :             false,
    1767                 :             // NB: don't log errors here, task_mgr will do that.
    1768             576 :             async move {
    1769             576 :                 let cancel = task_mgr::shutdown_token();
    1770             576 :                 self_clone
    1771             576 :                     .initial_logical_size_calculation_task(
    1772             576 :                         initial_part_end,
    1773             576 :                         cancel_wait_for_background_loop_concurrency_limit_semaphore,
    1774             576 :                         cancel,
    1775             576 :                         background_ctx,
    1776             576 :                     )
    1777           66696 :                     .await;
    1778             570 :                 Ok(())
    1779             570 :             }
    1780             576 :             .instrument(info_span!(parent: None, "initial_size_calculation", tenant_id=%self.tenant_shard_id.tenant_id, timeline_id=%self.timeline_id)),
    1781                 :         );
    1782            1102 :     }
    1783                 : 
    1784             576 :     async fn initial_logical_size_calculation_task(
    1785             576 :         self: Arc<Self>,
    1786             576 :         initial_part_end: Lsn,
    1787             576 :         skip_concurrency_limiter: CancellationToken,
    1788             576 :         cancel: CancellationToken,
    1789             576 :         background_ctx: RequestContext,
    1790             576 :     ) {
    1791             570 :         scopeguard::defer! {
    1792             570 :             // Irrespective of the outcome of this operation, we should unblock anyone waiting for it.
    1793             570 :             self.current_logical_size.initialized.add_permits(1);
    1794             570 :         }
    1795                 : 
    1796                 :         enum BackgroundCalculationError {
    1797                 :             Cancelled,
    1798                 :             Other(anyhow::Error),
    1799                 :         }
    1800                 : 
    1801             576 :         let try_once = |attempt: usize| {
    1802             576 :             let background_ctx = &background_ctx;
    1803             576 :             let self_ref = &self;
    1804             576 :             let skip_concurrency_limiter = &skip_concurrency_limiter;
    1805             576 :             async move {
    1806             576 :                 let cancel = task_mgr::shutdown_token();
    1807             576 :                 let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
    1808             576 :                     BackgroundLoopKind::InitialLogicalSizeCalculation,
    1809             576 :                     background_ctx,
    1810             576 :                 );
    1811                 : 
    1812                 :                 use crate::metrics::initial_logical_size::StartCircumstances;
    1813             576 :                 let (_maybe_permit, circumstances) = tokio::select! {
    1814             516 :                     permit = wait_for_permit => {
    1815                 :                         (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit)
    1816                 :                     }
    1817                 :                     _ = self_ref.cancel.cancelled() => {
    1818                 :                         return Err(BackgroundCalculationError::Cancelled);
    1819                 :                     }
    1820                 :                     _ = cancel.cancelled() => {
    1821                 :                         return Err(BackgroundCalculationError::Cancelled);
    1822                 :                     },
    1823                 :                     () = skip_concurrency_limiter.cancelled() => {
    1824                 :                         // Some action that is part of a end user interaction requested logical size
    1825                 :                         // => break out of the rate limit
    1826                 :                         // TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime;
    1827                 :                         // but then again what happens if they cancel; also, we should just be using
    1828                 :                         // one runtime across the entire process, so, let's leave this for now.
    1829                 :                         (None, StartCircumstances::SkippedConcurrencyLimiter)
    1830                 :                     }
    1831                 :                 };
    1832                 : 
    1833             576 :                 let metrics_guard = if attempt == 1 {
    1834             576 :                     crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances)
    1835                 :                 } else {
    1836 UBC           0 :                     crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
    1837                 :                 };
    1838                 : 
    1839 CBC         576 :                 match self_ref
    1840             576 :                     .logical_size_calculation_task(
    1841             576 :                         initial_part_end,
    1842             576 :                         LogicalSizeCalculationCause::Initial,
    1843             576 :                         background_ctx,
    1844             576 :                     )
    1845           66693 :                     .await
    1846                 :                 {
    1847             549 :                     Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
    1848                 :                     Err(CalculateLogicalSizeError::Cancelled) => {
    1849              15 :                         Err(BackgroundCalculationError::Cancelled)
    1850                 :                     }
    1851               2 :                     Err(CalculateLogicalSizeError::Other(err)) => {
    1852               1 :                         if let Some(PageReconstructError::AncestorStopping(_)) =
    1853               2 :                             err.root_cause().downcast_ref()
    1854                 :                         {
    1855 UBC           0 :                             Err(BackgroundCalculationError::Cancelled)
    1856                 :                         } else {
    1857 CBC           2 :                             Err(BackgroundCalculationError::Other(err))
    1858                 :                         }
    1859                 :                     }
    1860                 :                 }
    1861             566 :             }
    1862             576 :         };
    1863                 : 
    1864             576 :         let retrying = async {
    1865             576 :             let mut attempt = 0;
    1866             576 :             loop {
    1867             576 :                 attempt += 1;
    1868             576 : 
    1869           66693 :                 match try_once(attempt).await {
    1870             549 :                     Ok(res) => return ControlFlow::Continue(res),
    1871              15 :                     Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
    1872               2 :                     Err(BackgroundCalculationError::Other(e)) => {
    1873               2 :                         warn!(attempt, "initial size calculation failed: {e:?}");
    1874                 :                         // exponential back-off doesn't make sense at these long intervals;
    1875                 :                         // use fixed retry interval with generous jitter instead
    1876               2 :                         let sleep_duration = Duration::from_secs(
    1877               2 :                             u64::try_from(
    1878               2 :                                 // 1hour base
    1879               2 :                                 (60_i64 * 60_i64)
    1880               2 :                                     // 10min jitter
    1881               2 :                                     + rand::thread_rng().gen_range(-10 * 60..10 * 60),
    1882               2 :                             )
    1883               2 :                             .expect("10min < 1hour"),
    1884               2 :                         );
    1885               2 :                         tokio::time::sleep(sleep_duration).await;
    1886                 :                     }
    1887                 :                 }
    1888                 :             }
    1889             564 :         };
    1890                 : 
    1891             576 :         let (calculated_size, metrics_guard) = tokio::select! {
    1892             564 :             res = retrying  => {
    1893                 :                 match res {
    1894                 :                     ControlFlow::Continue(calculated_size) => calculated_size,
    1895                 :                     ControlFlow::Break(()) => return,
    1896                 :                 }
    1897                 :             }
    1898                 :             _ = cancel.cancelled() => {
    1899                 :                 return;
    1900                 :             }
    1901                 :         };
    1902                 : 
    1903                 :         // we cannot query current_logical_size.current_size() to know the current
    1904                 :         // *negative* value, only truncated to u64.
    1905             549 :         let added = self
    1906             549 :             .current_logical_size
    1907             549 :             .size_added_after_initial
    1908             549 :             .load(AtomicOrdering::Relaxed);
    1909             549 : 
    1910             549 :         let sum = calculated_size.saturating_add_signed(added);
    1911             549 : 
    1912             549 :         // set the gauge value before it can be set in `update_current_logical_size`.
    1913             549 :         self.metrics.current_logical_size_gauge.set(sum);
    1914             549 : 
    1915             549 :         self.current_logical_size
    1916             549 :             .initial_logical_size
    1917             549 :             .set((calculated_size, metrics_guard.calculation_result_saved()))
    1918             549 :             .ok()
    1919             549 :             .expect("only this task sets it");
    1920             570 :     }
    1921                 : 
    1922              39 :     pub fn spawn_ondemand_logical_size_calculation(
    1923              39 :         self: &Arc<Self>,
    1924              39 :         lsn: Lsn,
    1925              39 :         cause: LogicalSizeCalculationCause,
    1926              39 :         ctx: RequestContext,
    1927              39 :     ) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
    1928              39 :         let (sender, receiver) = oneshot::channel();
    1929              39 :         let self_clone = Arc::clone(self);
    1930              39 :         // XXX if our caller loses interest, i.e., ctx is cancelled,
    1931              39 :         // we should stop the size calculation work and return an error.
    1932              39 :         // That would require restructuring this function's API to
    1933              39 :         // return the result directly, instead of a Receiver for the result.
    1934              39 :         let ctx = ctx.detached_child(
    1935              39 :             TaskKind::OndemandLogicalSizeCalculation,
    1936              39 :             DownloadBehavior::Download,
    1937              39 :         );
    1938              39 :         task_mgr::spawn(
    1939              39 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    1940              39 :             task_mgr::TaskKind::OndemandLogicalSizeCalculation,
    1941              39 :             Some(self.tenant_shard_id),
    1942              39 :             Some(self.timeline_id),
    1943              39 :             "ondemand logical size calculation",
    1944              39 :             false,
    1945              39 :             async move {
    1946              39 :                 let res = self_clone
    1947              39 :                     .logical_size_calculation_task(lsn, cause, &ctx)
    1948            2243 :                     .await;
    1949              39 :                 let _ = sender.send(res).ok();
    1950              39 :                 Ok(()) // Receiver is responsible for handling errors
    1951              39 :             }
    1952              39 :             .in_current_span(),
    1953              39 :         );
    1954              39 :         receiver
    1955              39 :     }
    1956                 : 
    1957                 :     /// # Cancel-Safety
    1958                 :     ///
    1959                 :     /// This method is cancellation-safe.
    1960 UBC           0 :     #[instrument(skip_all)]
    1961                 :     async fn logical_size_calculation_task(
    1962                 :         self: &Arc<Self>,
    1963                 :         lsn: Lsn,
    1964                 :         cause: LogicalSizeCalculationCause,
    1965                 :         ctx: &RequestContext,
    1966                 :     ) -> Result<u64, CalculateLogicalSizeError> {
    1967                 :         span::debug_assert_current_span_has_tenant_and_timeline_id();
    1968                 : 
    1969                 :         let _guard = self.gate.enter();
    1970                 : 
    1971                 :         let self_calculation = Arc::clone(self);
    1972                 : 
    1973 CBC         615 :         let mut calculation = pin!(async {
    1974             615 :             let ctx = ctx.attached_child();
    1975             615 :             self_calculation
    1976             615 :                 .calculate_logical_size(lsn, cause, &ctx)
    1977           68939 :                 .await
    1978             605 :         });
    1979                 : 
    1980           69544 :         tokio::select! {
    1981             603 :             res = &mut calculation => { res }
    1982                 :             _ = self.cancel.cancelled() => {
    1983 UBC           0 :                 debug!("cancelling logical size calculation for timeline shutdown");
    1984                 :                 calculation.await
    1985                 :             }
    1986                 :             _ = task_mgr::shutdown_watcher() => {
    1987               0 :                 debug!("cancelling logical size calculation for task shutdown");
    1988                 :                 calculation.await
    1989                 :             }
    1990                 :         }
    1991                 :     }
    1992                 : 
    1993                 :     /// Calculate the logical size of the database at the latest LSN.
    1994                 :     ///
    1995                 :     /// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
    1996                 :     /// especially if we need to download remote layers.
    1997                 :     ///
    1998                 :     /// # Cancel-Safety
    1999                 :     ///
    2000                 :     /// This method is cancellation-safe.
    2001 CBC         623 :     pub async fn calculate_logical_size(
    2002             623 :         &self,
    2003             623 :         up_to_lsn: Lsn,
    2004             623 :         cause: LogicalSizeCalculationCause,
    2005             623 :         ctx: &RequestContext,
    2006             623 :     ) -> Result<u64, CalculateLogicalSizeError> {
    2007             623 :         info!(
    2008             623 :             "Calculating logical size for timeline {} at {}",
    2009             623 :             self.timeline_id, up_to_lsn
    2010             623 :         );
    2011                 :         // These failpoints are used by python tests to ensure that we don't delete
    2012                 :         // the timeline while the logical size computation is ongoing.
    2013                 :         // The first failpoint is used to make this function pause.
    2014                 :         // Then the python test initiates timeline delete operation in a thread.
    2015                 :         // It waits for a few seconds, then arms the second failpoint and disables
    2016                 :         // the first failpoint. The second failpoint prints an error if the timeline
    2017                 :         // delete code has deleted the on-disk state while we're still running here.
    2018                 :         // It shouldn't do that. If it does it anyway, the error will be caught
    2019                 :         // by the test suite, highlighting the problem.
    2020 UBC           0 :         fail::fail_point!("timeline-calculate-logical-size-pause");
    2021 CBC         623 :         fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
    2022               2 :             if !self
    2023               2 :                 .conf
    2024               2 :                 .metadata_path(&self.tenant_shard_id, &self.timeline_id)
    2025               2 :                 .exists()
    2026                 :             {
    2027 UBC           0 :                 error!("timeline-calculate-logical-size-pre metadata file does not exist")
    2028 CBC           2 :             }
    2029                 :             // need to return something
    2030               2 :             Ok(0)
    2031             623 :         });
    2032                 :         // See if we've already done the work for initial size calculation.
    2033                 :         // This is a short-cut for timelines that are mostly unused.
    2034             621 :         if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
    2035               5 :             return Ok(size);
    2036             616 :         }
    2037             616 :         let storage_time_metrics = match cause {
    2038                 :             LogicalSizeCalculationCause::Initial
    2039                 :             | LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
    2040             601 :             | LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
    2041                 :             LogicalSizeCalculationCause::EvictionTaskImitation => {
    2042              15 :                 &self.metrics.imitate_logical_size_histo
    2043                 :             }
    2044                 :         };
    2045             616 :         let timer = storage_time_metrics.start_timer();
    2046             616 :         let logical_size = self
    2047             616 :             .get_current_logical_size_non_incremental(up_to_lsn, ctx)
    2048           70856 :             .await?;
    2049 UBC           0 :         debug!("calculated logical size: {logical_size}");
    2050 CBC         587 :         timer.stop_and_record();
    2051             587 :         Ok(logical_size)
    2052             613 :     }
    2053                 : 
    2054                 :     /// Update current logical size, adding `delta' to the old value.
    2055          427894 :     fn update_current_logical_size(&self, delta: i64) {
    2056          427894 :         let logical_size = &self.current_logical_size;
    2057          427894 :         logical_size.increment_size(delta);
    2058          427894 : 
    2059          427894 :         // Also set the value in the prometheus gauge. Note that
    2060          427894 :         // there is a race condition here: if this is is called by two
    2061          427894 :         // threads concurrently, the prometheus gauge might be set to
    2062          427894 :         // one value while current_logical_size is set to the
    2063          427894 :         // other.
    2064          427894 :         match logical_size.current_size() {
    2065          427745 :             CurrentLogicalSize::Exact(ref new_current_size) => self
    2066          427745 :                 .metrics
    2067          427745 :                 .current_logical_size_gauge
    2068          427745 :                 .set(new_current_size.into()),
    2069             149 :             CurrentLogicalSize::Approximate(_) => {
    2070             149 :                 // don't update the gauge yet, this allows us not to update the gauge back and
    2071             149 :                 // forth between the initial size calculation task.
    2072             149 :             }
    2073                 :         }
    2074          427894 :     }
    2075                 : 
    2076            2247 :     async fn find_layer(&self, layer_file_name: &str) -> Option<Layer> {
    2077            2247 :         let guard = self.layers.read().await;
    2078          799949 :         for historic_layer in guard.layer_map().iter_historic_layers() {
    2079          799949 :             let historic_layer_name = historic_layer.filename().file_name();
    2080          799949 :             if layer_file_name == historic_layer_name {
    2081            2247 :                 return Some(guard.get_from_desc(&historic_layer));
    2082          797702 :             }
    2083                 :         }
    2084                 : 
    2085 UBC           0 :         None
    2086 CBC        2247 :     }
    2087                 : 
    2088                 :     /// The timeline heatmap is a hint to secondary locations from the primary location,
    2089                 :     /// indicating which layers are currently on-disk on the primary.
    2090                 :     ///
    2091                 :     /// None is returned if the Timeline is in a state where uploading a heatmap
    2092                 :     /// doesn't make sense, such as shutting down or initializing.  The caller
    2093                 :     /// should treat this as a cue to simply skip doing any heatmap uploading
    2094                 :     /// for this timeline.
    2095               6 :     pub(crate) async fn generate_heatmap(&self) -> Option<HeatMapTimeline> {
    2096               6 :         let eviction_info = self.get_local_layers_for_disk_usage_eviction().await;
    2097                 : 
    2098               6 :         let remote_client = match &self.remote_client {
    2099               6 :             Some(c) => c,
    2100 UBC           0 :             None => return None,
    2101                 :         };
    2102                 : 
    2103 CBC           6 :         let layer_file_names = eviction_info
    2104               6 :             .resident_layers
    2105               6 :             .iter()
    2106            2453 :             .map(|l| l.layer.layer_desc().filename())
    2107               6 :             .collect::<Vec<_>>();
    2108                 : 
    2109               6 :         let decorated = match remote_client.get_layers_metadata(layer_file_names) {
    2110               6 :             Ok(d) => d,
    2111                 :             Err(_) => {
    2112                 :                 // Getting metadata only fails on Timeline in bad state.
    2113 UBC           0 :                 return None;
    2114                 :             }
    2115                 :         };
    2116                 : 
    2117 CBC           6 :         let heatmap_layers = std::iter::zip(
    2118               6 :             eviction_info.resident_layers.into_iter(),
    2119               6 :             decorated.into_iter(),
    2120               6 :         )
    2121            2453 :         .filter_map(|(layer, remote_info)| {
    2122            2453 :             remote_info.map(|remote_info| {
    2123            2453 :                 HeatMapLayer::new(
    2124            2453 :                     layer.layer.layer_desc().filename(),
    2125            2453 :                     IndexLayerMetadata::from(remote_info),
    2126            2453 :                     layer.last_activity_ts,
    2127            2453 :                 )
    2128            2453 :             })
    2129            2453 :         });
    2130               6 : 
    2131               6 :         Some(HeatMapTimeline::new(
    2132               6 :             self.timeline_id,
    2133               6 :             heatmap_layers.collect(),
    2134               6 :         ))
    2135               6 :     }
    2136                 : }
    2137                 : 
    2138                 : type TraversalId = String;
    2139                 : 
    2140                 : trait TraversalLayerExt {
    2141                 :     fn traversal_id(&self) -> TraversalId;
    2142                 : }
    2143                 : 
    2144                 : impl TraversalLayerExt for Layer {
    2145             895 :     fn traversal_id(&self) -> TraversalId {
    2146             895 :         self.local_path().to_string()
    2147             895 :     }
    2148                 : }
    2149                 : 
    2150                 : impl TraversalLayerExt for Arc<InMemoryLayer> {
    2151             322 :     fn traversal_id(&self) -> TraversalId {
    2152             322 :         format!("timeline {} in-memory {self}", self.get_timeline_id())
    2153             322 :     }
    2154                 : }
    2155                 : 
    2156                 : impl Timeline {
    2157                 :     ///
    2158                 :     /// Get a handle to a Layer for reading.
    2159                 :     ///
    2160                 :     /// The returned Layer might be from an ancestor timeline, if the
    2161                 :     /// segment hasn't been updated on this timeline yet.
    2162                 :     ///
    2163                 :     /// This function takes the current timeline's locked LayerMap as an argument,
    2164                 :     /// so callers can avoid potential race conditions.
    2165                 :     ///
    2166                 :     /// # Cancel-Safety
    2167                 :     ///
    2168                 :     /// This method is cancellation-safe.
    2169         5862688 :     async fn get_reconstruct_data(
    2170         5862688 :         &self,
    2171         5862688 :         key: Key,
    2172         5862688 :         request_lsn: Lsn,
    2173         5862688 :         reconstruct_state: &mut ValueReconstructState,
    2174         5862688 :         ctx: &RequestContext,
    2175         5862688 :     ) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
    2176         5862674 :         // Start from the current timeline.
    2177         5862674 :         let mut timeline_owned;
    2178         5862674 :         let mut timeline = self;
    2179         5862674 : 
    2180         5862674 :         let mut read_count = scopeguard::guard(0, |cnt| {
    2181         5862637 :             crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
    2182         5862674 :         });
    2183         5862674 : 
    2184         5862674 :         // For debugging purposes, collect the path of layers that we traversed
    2185         5862674 :         // through. It's included in the error message if we fail to find the key.
    2186         5862674 :         let mut traversal_path = Vec::<TraversalPathItem>::new();
    2187                 : 
    2188         5862674 :         let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
    2189         1265902 :             *cached_lsn
    2190                 :         } else {
    2191         4596772 :             Lsn(0)
    2192                 :         };
    2193                 : 
    2194                 :         // 'prev_lsn' tracks the last LSN that we were at in our search. It's used
    2195                 :         // to check that each iteration make some progress, to break infinite
    2196                 :         // looping if something goes wrong.
    2197         5862674 :         let mut prev_lsn = Lsn(u64::MAX);
    2198         5862674 : 
    2199         5862674 :         let mut result = ValueReconstructResult::Continue;
    2200         5862674 :         let mut cont_lsn = Lsn(request_lsn.0 + 1);
    2201                 : 
    2202        27327554 :         'outer: loop {
    2203        27327554 :             if self.cancel.is_cancelled() {
    2204              24 :                 return Err(PageReconstructError::Cancelled);
    2205        27327530 :             }
    2206        27327530 : 
    2207        27327530 :             // The function should have updated 'state'
    2208        27327530 :             //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
    2209        27327530 :             match result {
    2210         4626402 :                 ValueReconstructResult::Complete => return Ok(traversal_path),
    2211                 :                 ValueReconstructResult::Continue => {
    2212                 :                     // If we reached an earlier cached page image, we're done.
    2213        22701125 :                     if cont_lsn == cached_lsn + 1 {
    2214         1235042 :                         MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
    2215         1235042 :                         return Ok(traversal_path);
    2216        21466083 :                     }
    2217        21466083 :                     if prev_lsn <= cont_lsn {
    2218                 :                         // Didn't make any progress in last iteration. Error out to avoid
    2219                 :                         // getting stuck in the loop.
    2220            1146 :                         return Err(layer_traversal_error(format!(
    2221            1146 :                             "could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
    2222            1146 :                             key,
    2223            1146 :                             Lsn(cont_lsn.0 - 1),
    2224            1146 :                             request_lsn,
    2225            1146 :                             timeline.ancestor_lsn
    2226            1146 :                         ), traversal_path));
    2227        21464937 :                     }
    2228        21464937 :                     prev_lsn = cont_lsn;
    2229                 :                 }
    2230                 :                 ValueReconstructResult::Missing => {
    2231                 :                     return Err(layer_traversal_error(
    2232               3 :                         if cfg!(test) {
    2233               2 :                             format!(
    2234               2 :                                 "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}\n{}",
    2235               2 :                                 key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
    2236               2 :                             )
    2237                 :                         } else {
    2238               1 :                             format!(
    2239               1 :                                 "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}",
    2240               1 :                                 key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn
    2241               1 :                             )
    2242                 :                         },
    2243               3 :                         traversal_path,
    2244                 :                     ));
    2245                 :                 }
    2246                 :             }
    2247                 : 
    2248                 :             // Recurse into ancestor if needed
    2249        21464937 :             if is_inherited_key(key) && Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
    2250 UBC           0 :                 trace!(
    2251               0 :                     "going into ancestor {}, cont_lsn is {}",
    2252               0 :                     timeline.ancestor_lsn,
    2253               0 :                     cont_lsn
    2254               0 :                 );
    2255 CBC     1197870 :                 let ancestor = match timeline.get_ancestor_timeline() {
    2256         1197870 :                     Ok(timeline) => timeline,
    2257 UBC           0 :                     Err(e) => return Err(PageReconstructError::from(e)),
    2258                 :                 };
    2259                 : 
    2260                 :                 // It's possible that the ancestor timeline isn't active yet, or
    2261                 :                 // is active but hasn't yet caught up to the branch point. Wait
    2262                 :                 // for it.
    2263                 :                 //
    2264                 :                 // This cannot happen while the pageserver is running normally,
    2265                 :                 // because you cannot create a branch from a point that isn't
    2266                 :                 // present in the pageserver yet. However, we don't wait for the
    2267                 :                 // branch point to be uploaded to cloud storage before creating
    2268                 :                 // a branch. I.e., the branch LSN need not be remote consistent
    2269                 :                 // for the branching operation to succeed.
    2270                 :                 //
    2271                 :                 // Hence, if we try to load a tenant in such a state where
    2272                 :                 // 1. the existence of the branch was persisted (in IndexPart and/or locally)
    2273                 :                 // 2. but the ancestor state is behind branch_lsn because it was not yet persisted
    2274                 :                 // then we will need to wait for the ancestor timeline to
    2275                 :                 // re-stream WAL up to branch_lsn before we access it.
    2276                 :                 //
    2277                 :                 // How can a tenant get in such a state?
    2278                 :                 // - ungraceful pageserver process exit
    2279                 :                 // - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
    2280                 :                 //
    2281                 :                 // NB: this could be avoided by requiring
    2282                 :                 //   branch_lsn >= remote_consistent_lsn
    2283                 :                 // during branch creation.
    2284 CBC     1197870 :                 match ancestor.wait_to_become_active(ctx).await {
    2285         1197866 :                     Ok(()) => {}
    2286                 :                     Err(TimelineState::Stopping) => {
    2287               3 :                         return Err(PageReconstructError::AncestorStopping(ancestor.timeline_id));
    2288                 :                     }
    2289               1 :                     Err(state) => {
    2290               1 :                         return Err(PageReconstructError::Other(anyhow::anyhow!(
    2291               1 :                             "Timeline {} will not become active. Current state: {:?}",
    2292               1 :                             ancestor.timeline_id,
    2293               1 :                             &state,
    2294               1 :                         )));
    2295                 :                     }
    2296                 :                 }
    2297         1197866 :                 ancestor
    2298         1197866 :                     .wait_lsn(timeline.ancestor_lsn, ctx)
    2299               5 :                     .await
    2300         1197866 :                     .map_err(|e| match e {
    2301 UBC           0 :                         e @ WaitLsnError::Timeout(_) => PageReconstructError::AncestorLsnTimeout(e),
    2302               0 :                         WaitLsnError::Shutdown => PageReconstructError::Cancelled,
    2303               0 :                         e @ WaitLsnError::BadState => {
    2304               0 :                             PageReconstructError::Other(anyhow::anyhow!(e))
    2305                 :                         }
    2306 CBC     1197866 :                     })?;
    2307                 : 
    2308         1197866 :                 timeline_owned = ancestor;
    2309         1197866 :                 timeline = &*timeline_owned;
    2310         1197866 :                 prev_lsn = Lsn(u64::MAX);
    2311         1197866 :                 continue 'outer;
    2312        20267066 :             }
    2313                 : 
    2314        20267066 :             let guard = timeline.layers.read().await;
    2315        20267036 :             let layers = guard.layer_map();
    2316                 : 
    2317                 :             // Check the open and frozen in-memory layers first, in order from newest
    2318                 :             // to oldest.
    2319        20267036 :             if let Some(open_layer) = &layers.open_layer {
    2320        16065430 :                 let start_lsn = open_layer.get_lsn_range().start;
    2321        16065430 :                 if cont_lsn > start_lsn {
    2322                 :                     //info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
    2323                 :                     // Get all the data needed to reconstruct the page version from this layer.
    2324                 :                     // But if we have an older cached page image, no need to go past that.
    2325         4411185 :                     let lsn_floor = max(cached_lsn + 1, start_lsn);
    2326         4411185 :                     result = match open_layer
    2327         4411185 :                         .get_value_reconstruct_data(
    2328         4411185 :                             key,
    2329         4411185 :                             lsn_floor..cont_lsn,
    2330         4411185 :                             reconstruct_state,
    2331         4411185 :                             ctx,
    2332         4411185 :                         )
    2333          577321 :                         .await
    2334                 :                     {
    2335         4411185 :                         Ok(result) => result,
    2336 UBC           0 :                         Err(e) => return Err(PageReconstructError::from(e)),
    2337                 :                     };
    2338 CBC     4411185 :                     cont_lsn = lsn_floor;
    2339         4411185 :                     // metrics: open_layer does not count as fs access, so we are not updating `read_count`
    2340         4411185 :                     traversal_path.push((
    2341         4411185 :                         result,
    2342         4411185 :                         cont_lsn,
    2343         4411185 :                         Box::new({
    2344         4411185 :                             let open_layer = Arc::clone(open_layer);
    2345         4411185 :                             move || open_layer.traversal_id()
    2346         4411185 :                         }),
    2347         4411185 :                     ));
    2348         4411185 :                     continue 'outer;
    2349        11654245 :                 }
    2350         4201606 :             }
    2351        15855851 :             for frozen_layer in layers.frozen_layers.iter().rev() {
    2352          823242 :                 let start_lsn = frozen_layer.get_lsn_range().start;
    2353          823242 :                 if cont_lsn > start_lsn {
    2354                 :                     //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
    2355          257271 :                     let lsn_floor = max(cached_lsn + 1, start_lsn);
    2356          257271 :                     result = match frozen_layer
    2357          257271 :                         .get_value_reconstruct_data(
    2358          257271 :                             key,
    2359          257271 :                             lsn_floor..cont_lsn,
    2360          257271 :                             reconstruct_state,
    2361          257271 :                             ctx,
    2362          257271 :                         )
    2363           17207 :                         .await
    2364                 :                     {
    2365          257271 :                         Ok(result) => result,
    2366 UBC           0 :                         Err(e) => return Err(PageReconstructError::from(e)),
    2367                 :                     };
    2368 CBC      257271 :                     cont_lsn = lsn_floor;
    2369          257271 :                     // metrics: open_layer does not count as fs access, so we are not updating `read_count`
    2370          257271 :                     traversal_path.push((
    2371          257271 :                         result,
    2372          257271 :                         cont_lsn,
    2373          257271 :                         Box::new({
    2374          257271 :                             let frozen_layer = Arc::clone(frozen_layer);
    2375          257271 :                             move || frozen_layer.traversal_id()
    2376          257271 :                         }),
    2377          257271 :                     ));
    2378          257271 :                     continue 'outer;
    2379          565971 :                 }
    2380                 :             }
    2381                 : 
    2382        15598580 :             if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
    2383        15454851 :                 let layer = guard.get_from_desc(&layer);
    2384        15454851 :                 // Get all the data needed to reconstruct the page version from this layer.
    2385        15454851 :                 // But if we have an older cached page image, no need to go past that.
    2386        15454851 :                 let lsn_floor = max(cached_lsn + 1, lsn_floor);
    2387        15454851 :                 result = match layer
    2388        15454851 :                     .get_value_reconstruct_data(key, lsn_floor..cont_lsn, reconstruct_state, ctx)
    2389          891600 :                     .await
    2390                 :                 {
    2391        15454829 :                     Ok(result) => result,
    2392              10 :                     Err(e) => return Err(PageReconstructError::from(e)),
    2393                 :                 };
    2394        15454829 :                 cont_lsn = lsn_floor;
    2395        15454829 :                 *read_count += 1;
    2396        15454829 :                 traversal_path.push((
    2397        15454829 :                     result,
    2398        15454829 :                     cont_lsn,
    2399        15454829 :                     Box::new({
    2400        15454829 :                         let layer = layer.to_owned();
    2401        15454829 :                         move || layer.traversal_id()
    2402        15454829 :                     }),
    2403        15454829 :                 ));
    2404        15454829 :                 continue 'outer;
    2405          143729 :             } else if timeline.ancestor_timeline.is_some() {
    2406                 :                 // Nothing on this timeline. Traverse to parent
    2407          143728 :                 result = ValueReconstructResult::Continue;
    2408          143728 :                 cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
    2409          143728 :                 continue 'outer;
    2410                 :             } else {
    2411                 :                 // Nothing found
    2412               1 :                 result = ValueReconstructResult::Missing;
    2413               1 :                 continue 'outer;
    2414                 :             }
    2415                 :         }
    2416         5862631 :     }
    2417                 : 
    2418                 :     /// # Cancel-safety
    2419                 :     ///
    2420                 :     /// This method is cancellation-safe.
    2421         5939236 :     async fn lookup_cached_page(
    2422         5939236 :         &self,
    2423         5939236 :         key: &Key,
    2424         5939236 :         lsn: Lsn,
    2425         5939236 :         ctx: &RequestContext,
    2426         5939236 :     ) -> Option<(Lsn, Bytes)> {
    2427         5939222 :         let cache = page_cache::get();
    2428                 : 
    2429                 :         // FIXME: It's pointless to check the cache for things that are not 8kB pages.
    2430                 :         // We should look at the key to determine if it's a cacheable object
    2431         5939222 :         let (lsn, read_guard) = cache
    2432         5939222 :             .lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
    2433         4596772 :             .await?;
    2434         1342450 :         let img = Bytes::from(read_guard.to_vec());
    2435         1342450 :         Some((lsn, img))
    2436         5939222 :     }
    2437                 : 
    2438         1197905 :     fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
    2439         1197905 :         let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
    2440 UBC           0 :             format!(
    2441               0 :                 "Ancestor is missing. Timeline id: {} Ancestor id {:?}",
    2442               0 :                 self.timeline_id,
    2443               0 :                 self.get_ancestor_timeline_id(),
    2444               0 :             )
    2445 CBC     1197905 :         })?;
    2446         1197905 :         Ok(Arc::clone(ancestor))
    2447         1197905 :     }
    2448                 : 
    2449         3584257 :     pub(crate) fn get_shard_identity(&self) -> &ShardIdentity {
    2450         3584257 :         &self.shard_identity
    2451         3584257 :     }
    2452                 : 
    2453                 :     ///
    2454                 :     /// Get a handle to the latest layer for appending.
    2455                 :     ///
    2456         1781990 :     async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
    2457         1781989 :         let mut guard = self.layers.write().await;
    2458         1781989 :         let layer = guard
    2459         1781989 :             .get_layer_for_write(
    2460         1781989 :                 lsn,
    2461         1781989 :                 self.get_last_record_lsn(),
    2462         1781989 :                 self.conf,
    2463         1781989 :                 self.timeline_id,
    2464         1781989 :                 self.tenant_shard_id,
    2465         1781989 :             )
    2466 UBC           0 :             .await?;
    2467 CBC     1781989 :         Ok(layer)
    2468         1781989 :     }
    2469                 : 
    2470          607051 :     async fn put_value(
    2471          607051 :         &self,
    2472          607051 :         key: Key,
    2473          607051 :         lsn: Lsn,
    2474          607051 :         val: &Value,
    2475          607051 :         ctx: &RequestContext,
    2476          607051 :     ) -> anyhow::Result<()> {
    2477                 :         //info!("PUT: key {} at {}", key, lsn);
    2478          607051 :         let layer = self.get_layer_for_write(lsn).await?;
    2479          607051 :         layer.put_value(key, lsn, val, ctx).await?;
    2480          607051 :         Ok(())
    2481          607051 :     }
    2482                 : 
    2483         1168370 :     async fn put_values(
    2484         1168370 :         &self,
    2485         1168370 :         values: &HashMap<Key, Vec<(Lsn, Value)>>,
    2486         1168370 :         ctx: &RequestContext,
    2487         1168370 :     ) -> anyhow::Result<()> {
    2488                 :         // Pick the first LSN in the batch to get the layer to write to.
    2489         1168369 :         for lsns in values.values() {
    2490         1168369 :             if let Some((lsn, _)) = lsns.first() {
    2491         1168369 :                 let layer = self.get_layer_for_write(*lsn).await?;
    2492         1168369 :                 layer.put_values(values, ctx).await?;
    2493         1168369 :                 break;
    2494 UBC           0 :             }
    2495                 :         }
    2496 CBC     1168369 :         Ok(())
    2497         1168369 :     }
    2498                 : 
    2499            6569 :     async fn put_tombstones(&self, tombstones: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
    2500            6569 :         if let Some((_, lsn)) = tombstones.first() {
    2501            6569 :             let layer = self.get_layer_for_write(*lsn).await?;
    2502            6569 :             layer.put_tombstones(tombstones).await?;
    2503 UBC           0 :         }
    2504 CBC        6569 :         Ok(())
    2505            6569 :     }
    2506                 : 
    2507        49362591 :     pub(crate) fn finish_write(&self, new_lsn: Lsn) {
    2508        49362591 :         assert!(new_lsn.is_aligned());
    2509                 : 
    2510        49362591 :         self.metrics.last_record_gauge.set(new_lsn.0 as i64);
    2511        49362591 :         self.last_record_lsn.advance(new_lsn);
    2512        49362591 :     }
    2513                 : 
    2514            4663 :     async fn freeze_inmem_layer(&self, write_lock_held: bool) {
    2515                 :         // Freeze the current open in-memory layer. It will be written to disk on next
    2516                 :         // iteration.
    2517            4663 :         let _write_guard = if write_lock_held {
    2518            3102 :             None
    2519                 :         } else {
    2520            1561 :             Some(self.write_lock.lock().await)
    2521                 :         };
    2522            4663 :         let mut guard = self.layers.write().await;
    2523            4663 :         guard
    2524            4663 :             .try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at)
    2525              11 :             .await;
    2526            4663 :     }
    2527                 : 
    2528                 :     /// Layer flusher task's main loop.
    2529            1274 :     async fn flush_loop(
    2530            1274 :         self: &Arc<Self>,
    2531            1274 :         mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
    2532            1274 :         ctx: &RequestContext,
    2533            1274 :     ) {
    2534            1274 :         info!("started flush loop");
    2535                 :         loop {
    2536            5745 :             tokio::select! {
    2537           10454 :                 _ = self.cancel.cancelled() => {
    2538           10454 :                     info!("shutting down layer flush task");
    2539           10454 :                     break;
    2540           10454 :                 },
    2541           10454 :                 _ = task_mgr::shutdown_watcher() => {
    2542           10454 :                     info!("shutting down layer flush task");
    2543           10454 :                     break;
    2544           10454 :                 },
    2545           10454 :                 _ = layer_flush_start_rx.changed() => {}
    2546           10454 :             }
    2547                 : 
    2548 UBC           0 :             trace!("waking up");
    2549 CBC        4478 :             let timer = self.metrics.flush_time_histo.start_timer();
    2550            4478 :             let flush_counter = *layer_flush_start_rx.borrow();
    2551            8914 :             let result = loop {
    2552            8914 :                 if self.cancel.is_cancelled() {
    2553 UBC           0 :                     info!("dropping out of flush loop for timeline shutdown");
    2554                 :                     // Note: we do not bother transmitting into [`layer_flush_done_tx`], because
    2555                 :                     // anyone waiting on that will respect self.cancel as well: they will stop
    2556                 :                     // waiting at the same time we as drop out of this loop.
    2557               0 :                     return;
    2558 CBC        8914 :                 }
    2559                 : 
    2560            8914 :                 let layer_to_flush = {
    2561            8914 :                     let guard = self.layers.read().await;
    2562            8914 :                     guard.layer_map().frozen_layers.front().cloned()
    2563                 :                     // drop 'layers' lock to allow concurrent reads and writes
    2564                 :                 };
    2565            8914 :                 let Some(layer_to_flush) = layer_to_flush else {
    2566            4471 :                     break Ok(());
    2567                 :                 };
    2568           17245 :                 match self.flush_frozen_layer(layer_to_flush, ctx).await {
    2569            4436 :                     Ok(()) => {}
    2570                 :                     Err(FlushLayerError::Cancelled) => {
    2571               1 :                         info!("dropping out of flush loop for timeline shutdown");
    2572               1 :                         return;
    2573                 :                     }
    2574 UBC           0 :                     err @ Err(
    2575                 :                         FlushLayerError::Other(_) | FlushLayerError::PageReconstructError(_),
    2576                 :                     ) => {
    2577               0 :                         error!("could not flush frozen layer: {err:?}");
    2578               0 :                         break err;
    2579                 :                     }
    2580                 :                 }
    2581                 :             };
    2582                 :             // Notify any listeners that we're done
    2583 CBC        4471 :             let _ = self
    2584            4471 :                 .layer_flush_done_tx
    2585            4471 :                 .send_replace((flush_counter, result));
    2586            4471 : 
    2587            4471 :             timer.stop_and_record();
    2588                 :         }
    2589             497 :     }
    2590                 : 
    2591            1561 :     async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
    2592            1561 :         let mut rx = self.layer_flush_done_tx.subscribe();
    2593            1561 : 
    2594            1561 :         // Increment the flush cycle counter and wake up the flush task.
    2595            1561 :         // Remember the new value, so that when we listen for the flush
    2596            1561 :         // to finish, we know when the flush that we initiated has
    2597            1561 :         // finished, instead of some other flush that was started earlier.
    2598            1561 :         let mut my_flush_request = 0;
    2599            1561 : 
    2600            1561 :         let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
    2601            1561 :         if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
    2602              42 :             anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
    2603            1519 :         }
    2604            1519 : 
    2605            1519 :         self.layer_flush_start_tx.send_modify(|counter| {
    2606            1519 :             my_flush_request = *counter + 1;
    2607            1519 :             *counter = my_flush_request;
    2608            1519 :         });
    2609                 : 
    2610            3037 :         loop {
    2611            3037 :             {
    2612            3037 :                 let (last_result_counter, last_result) = &*rx.borrow();
    2613            3037 :                 if *last_result_counter >= my_flush_request {
    2614            1518 :                     if let Err(_err) = last_result {
    2615                 :                         // We already logged the original error in
    2616                 :                         // flush_loop. We cannot propagate it to the caller
    2617                 :                         // here, because it might not be Cloneable
    2618 UBC           0 :                         anyhow::bail!(
    2619               0 :                             "Could not flush frozen layer. Request id: {}",
    2620               0 :                             my_flush_request
    2621               0 :                         );
    2622                 :                     } else {
    2623 CBC        1518 :                         return Ok(());
    2624                 :                     }
    2625            1519 :                 }
    2626                 :             }
    2627 UBC           0 :             trace!("waiting for flush to complete");
    2628 CBC        1519 :             tokio::select! {
    2629            1518 :                 rx_e = rx.changed() => {
    2630                 :                     rx_e?;
    2631                 :                 },
    2632                 :                 // Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring
    2633                 :                 // the notification from [`flush_loop`] that it completed.
    2634                 :                 _ = self.cancel.cancelled() => {
    2635               1 :                     tracing::info!("Cancelled layer flush due on timeline shutdown");
    2636                 :                     return Ok(())
    2637                 :                 }
    2638                 :             };
    2639 UBC           0 :             trace!("done")
    2640                 :         }
    2641 CBC        1561 :     }
    2642                 : 
    2643            3102 :     fn flush_frozen_layers(&self) {
    2644            3102 :         self.layer_flush_start_tx.send_modify(|val| *val += 1);
    2645            3102 :     }
    2646                 : 
    2647                 :     /// Flush one frozen in-memory layer to disk, as a new delta layer.
    2648            4397 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id, layer=%frozen_layer))]
    2649                 :     async fn flush_frozen_layer(
    2650                 :         self: &Arc<Self>,
    2651                 :         frozen_layer: Arc<InMemoryLayer>,
    2652                 :         ctx: &RequestContext,
    2653                 :     ) -> Result<(), FlushLayerError> {
    2654                 :         // As a special case, when we have just imported an image into the repository,
    2655                 :         // instead of writing out a L0 delta layer, we directly write out image layer
    2656                 :         // files instead. This is possible as long as *all* the data imported into the
    2657                 :         // repository have the same LSN.
    2658                 :         let lsn_range = frozen_layer.get_lsn_range();
    2659                 :         let (layers_to_upload, delta_layer_to_add) =
    2660                 :             if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
    2661                 :                 #[cfg(test)]
    2662                 :                 match &mut *self.flush_loop_state.lock().unwrap() {
    2663                 :                     FlushLoopState::NotStarted | FlushLoopState::Exited => {
    2664                 :                         panic!("flush loop not running")
    2665                 :                     }
    2666                 :                     FlushLoopState::Running {
    2667                 :                         initdb_optimization_count,
    2668                 :                         ..
    2669                 :                     } => {
    2670                 :                         *initdb_optimization_count += 1;
    2671                 :                     }
    2672                 :                 }
    2673                 :                 // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
    2674                 :                 // require downloading anything during initial import.
    2675                 :                 let (partitioning, _lsn) = self
    2676                 :                     .repartition(
    2677                 :                         self.initdb_lsn,
    2678                 :                         self.get_compaction_target_size(),
    2679                 :                         EnumSet::empty(),
    2680                 :                         ctx,
    2681                 :                     )
    2682                 :                     .await?;
    2683                 : 
    2684                 :                 if self.cancel.is_cancelled() {
    2685                 :                     return Err(FlushLayerError::Cancelled);
    2686                 :                 }
    2687                 : 
    2688                 :                 // For image layers, we add them immediately into the layer map.
    2689                 :                 (
    2690                 :                     self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
    2691                 :                         .await?,
    2692                 :                     None,
    2693                 :                 )
    2694                 :             } else {
    2695                 :                 #[cfg(test)]
    2696                 :                 match &mut *self.flush_loop_state.lock().unwrap() {
    2697                 :                     FlushLoopState::NotStarted | FlushLoopState::Exited => {
    2698                 :                         panic!("flush loop not running")
    2699                 :                     }
    2700                 :                     FlushLoopState::Running {
    2701                 :                         expect_initdb_optimization,
    2702                 :                         ..
    2703                 :                     } => {
    2704                 :                         assert!(!*expect_initdb_optimization, "expected initdb optimization");
    2705                 :                     }
    2706                 :                 }
    2707                 :                 // Normal case, write out a L0 delta layer file.
    2708                 :                 // `create_delta_layer` will not modify the layer map.
    2709                 :                 // We will remove frozen layer and add delta layer in one atomic operation later.
    2710                 :                 let layer = self.create_delta_layer(&frozen_layer, ctx).await?;
    2711                 :                 (
    2712                 :                     // FIXME: even though we have a single image and single delta layer assumption
    2713                 :                     // we push them to vec
    2714                 :                     vec![layer.clone()],
    2715                 :                     Some(layer),
    2716                 :                 )
    2717                 :             };
    2718                 : 
    2719            4441 :         pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
    2720                 : 
    2721                 :         if self.cancel.is_cancelled() {
    2722                 :             return Err(FlushLayerError::Cancelled);
    2723                 :         }
    2724                 : 
    2725                 :         let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
    2726                 :         let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
    2727                 : 
    2728                 :         // The new on-disk layers are now in the layer map. We can remove the
    2729                 :         // in-memory layer from the map now. The flushed layer is stored in
    2730                 :         // the mapping in `create_delta_layer`.
    2731                 :         let metadata = {
    2732                 :             let mut guard = self.layers.write().await;
    2733                 : 
    2734                 :             if self.cancel.is_cancelled() {
    2735                 :                 return Err(FlushLayerError::Cancelled);
    2736                 :             }
    2737                 : 
    2738                 :             guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
    2739                 : 
    2740                 :             if disk_consistent_lsn != old_disk_consistent_lsn {
    2741                 :                 assert!(disk_consistent_lsn > old_disk_consistent_lsn);
    2742                 :                 self.disk_consistent_lsn.store(disk_consistent_lsn);
    2743                 : 
    2744                 :                 // Schedule remote uploads that will reflect our new disk_consistent_lsn
    2745                 :                 Some(self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?)
    2746                 :             } else {
    2747                 :                 None
    2748                 :             }
    2749                 :             // release lock on 'layers'
    2750                 :         };
    2751                 : 
    2752                 :         // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
    2753                 :         // a compaction can delete the file and then it won't be available for uploads any more.
    2754                 :         // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
    2755                 :         // race situation.
    2756                 :         // See https://github.com/neondatabase/neon/issues/4526
    2757            4440 :         pausable_failpoint!("flush-frozen-pausable");
    2758                 : 
    2759                 :         // This failpoint is used by another test case `test_pageserver_recovery`.
    2760 UBC           0 :         fail_point!("flush-frozen-exit");
    2761                 : 
    2762                 :         // Update the metadata file, with new 'disk_consistent_lsn'
    2763                 :         //
    2764                 :         // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
    2765                 :         // *all* the layers, to avoid fsyncing the file multiple times.
    2766                 : 
    2767                 :         // If we updated our disk_consistent_lsn, persist the updated metadata to local disk.
    2768                 :         if let Some(metadata) = metadata {
    2769                 :             save_metadata(
    2770                 :                 self.conf,
    2771                 :                 &self.tenant_shard_id,
    2772                 :                 &self.timeline_id,
    2773                 :                 &metadata,
    2774                 :             )
    2775                 :             .await
    2776                 :             .context("save_metadata")?;
    2777                 :         }
    2778                 :         Ok(())
    2779                 :     }
    2780                 : 
    2781                 :     /// Update metadata file
    2782 CBC        4464 :     fn schedule_uploads(
    2783            4464 :         &self,
    2784            4464 :         disk_consistent_lsn: Lsn,
    2785            4464 :         layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
    2786            4464 :     ) -> anyhow::Result<TimelineMetadata> {
    2787            4464 :         // We can only save a valid 'prev_record_lsn' value on disk if we
    2788            4464 :         // flushed *all* in-memory changes to disk. We only track
    2789            4464 :         // 'prev_record_lsn' in memory for the latest processed record, so we
    2790            4464 :         // don't remember what the correct value that corresponds to some old
    2791            4464 :         // LSN is. But if we flush everything, then the value corresponding
    2792            4464 :         // current 'last_record_lsn' is correct and we can store it on disk.
    2793            4464 :         let RecordLsn {
    2794            4464 :             last: last_record_lsn,
    2795            4464 :             prev: prev_record_lsn,
    2796            4464 :         } = self.last_record_lsn.load();
    2797            4464 :         let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
    2798            1283 :             Some(prev_record_lsn)
    2799                 :         } else {
    2800            3181 :             None
    2801                 :         };
    2802                 : 
    2803            4464 :         let ancestor_timeline_id = self
    2804            4464 :             .ancestor_timeline
    2805            4464 :             .as_ref()
    2806            4464 :             .map(|ancestor| ancestor.timeline_id);
    2807            4464 : 
    2808            4464 :         let metadata = TimelineMetadata::new(
    2809            4464 :             disk_consistent_lsn,
    2810            4464 :             ondisk_prev_record_lsn,
    2811            4464 :             ancestor_timeline_id,
    2812            4464 :             self.ancestor_lsn,
    2813            4464 :             *self.latest_gc_cutoff_lsn.read(),
    2814            4464 :             self.initdb_lsn,
    2815            4464 :             self.pg_version,
    2816            4464 :         );
    2817            4464 : 
    2818            4464 :         fail_point!("checkpoint-before-saving-metadata", |x| bail!(
    2819 UBC           0 :             "{}",
    2820               0 :             x.unwrap()
    2821 CBC        4464 :         ));
    2822                 : 
    2823            4464 :         if let Some(remote_client) = &self.remote_client {
    2824            8904 :             for layer in layers_to_upload {
    2825            4440 :                 remote_client.schedule_layer_file_upload(layer)?;
    2826                 :             }
    2827            4464 :             remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
    2828 UBC           0 :         }
    2829                 : 
    2830 CBC        4464 :         Ok(metadata)
    2831            4464 :     }
    2832                 : 
    2833              24 :     async fn update_metadata_file(
    2834              24 :         &self,
    2835              24 :         disk_consistent_lsn: Lsn,
    2836              24 :         layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
    2837              24 :     ) -> anyhow::Result<()> {
    2838              24 :         let metadata = self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?;
    2839                 : 
    2840              24 :         save_metadata(
    2841              24 :             self.conf,
    2842              24 :             &self.tenant_shard_id,
    2843              24 :             &self.timeline_id,
    2844              24 :             &metadata,
    2845              24 :         )
    2846 UBC           0 :         .await
    2847 CBC          24 :         .context("save_metadata")?;
    2848                 : 
    2849              24 :         Ok(())
    2850              24 :     }
    2851                 : 
    2852                 :     // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
    2853                 :     // in layer map immediately. The caller is responsible to put it into the layer map.
    2854            4399 :     async fn create_delta_layer(
    2855            4399 :         self: &Arc<Self>,
    2856            4399 :         frozen_layer: &Arc<InMemoryLayer>,
    2857            4399 :         ctx: &RequestContext,
    2858            4399 :     ) -> anyhow::Result<ResidentLayer> {
    2859            4399 :         let span = tracing::info_span!("blocking");
    2860            4399 :         let new_delta: ResidentLayer = tokio::task::spawn_blocking({
    2861            4399 :             let self_clone = Arc::clone(self);
    2862            4399 :             let frozen_layer = Arc::clone(frozen_layer);
    2863            4399 :             let ctx = ctx.attached_child();
    2864            4399 :             move || {
    2865            4399 :                 // Write it out
    2866            4399 :                 // Keep this inside `spawn_blocking` and `Handle::current`
    2867            4399 :                 // as long as the write path is still sync and the read impl
    2868            4399 :                 // is still not fully async. Otherwise executor threads would
    2869            4399 :                 // be blocked.
    2870            4399 :                 let _g = span.entered();
    2871            4399 :                 let new_delta =
    2872            4399 :                     Handle::current().block_on(frozen_layer.write_to_disk(&self_clone, &ctx))?;
    2873            4399 :                 let new_delta_path = new_delta.local_path().to_owned();
    2874            4399 : 
    2875            4399 :                 // Sync it to disk.
    2876            4399 :                 //
    2877            4399 :                 // We must also fsync the timeline dir to ensure the directory entries for
    2878            4399 :                 // new layer files are durable.
    2879            4399 :                 //
    2880            4399 :                 // NB: timeline dir must be synced _after_ the file contents are durable.
    2881            4399 :                 // So, two separate fsyncs are required, they mustn't be batched.
    2882            4399 :                 //
    2883            4399 :                 // TODO: If we're running inside 'flush_frozen_layers' and there are multiple
    2884            4399 :                 // files to flush, the fsync overhead can be reduces as follows:
    2885            4399 :                 // 1. write them all to temporary file names
    2886            4399 :                 // 2. fsync them
    2887            4399 :                 // 3. rename to the final name
    2888            4399 :                 // 4. fsync the parent directory.
    2889            4399 :                 // Note that (1),(2),(3) today happen inside write_to_disk().
    2890            4399 :                 //
    2891            4399 :                 // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
    2892            4399 :                 par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
    2893            4399 :                 par_fsync::par_fsync(&[self_clone
    2894            4399 :                     .conf
    2895            4399 :                     .timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id)])
    2896            4399 :                 .context("fsync of timeline dir")?;
    2897                 : 
    2898            4397 :                 anyhow::Ok(new_delta)
    2899            4399 :             }
    2900            4399 :         })
    2901            4397 :         .await
    2902            4397 :         .context("spawn_blocking")
    2903            4397 :         .and_then(|x| x)?;
    2904                 : 
    2905            4397 :         Ok(new_delta)
    2906            4397 :     }
    2907                 : 
    2908            1419 :     async fn repartition(
    2909            1419 :         &self,
    2910            1419 :         lsn: Lsn,
    2911            1419 :         partition_size: u64,
    2912            1419 :         flags: EnumSet<CompactFlags>,
    2913            1419 :         ctx: &RequestContext,
    2914            1419 :     ) -> anyhow::Result<(KeyPartitioning, Lsn)> {
    2915            1419 :         {
    2916            1419 :             let partitioning_guard = self.partitioning.lock().unwrap();
    2917            1419 :             let distance = lsn.0 - partitioning_guard.1 .0;
    2918            1419 :             if partitioning_guard.1 != Lsn(0)
    2919             851 :                 && distance <= self.repartition_threshold
    2920             703 :                 && !flags.contains(CompactFlags::ForceRepartition)
    2921                 :             {
    2922 UBC           0 :                 debug!(
    2923               0 :                     distance,
    2924               0 :                     threshold = self.repartition_threshold,
    2925               0 :                     "no repartitioning needed"
    2926               0 :                 );
    2927 CBC         701 :                 return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
    2928             718 :             }
    2929                 :         }
    2930          139421 :         let keyspace = self.collect_keyspace(lsn, ctx).await?;
    2931             715 :         let partitioning = keyspace.partition(partition_size);
    2932             715 : 
    2933             715 :         let mut partitioning_guard = self.partitioning.lock().unwrap();
    2934             715 :         if lsn > partitioning_guard.1 {
    2935             715 :             *partitioning_guard = (partitioning, lsn);
    2936             715 :         } else {
    2937 UBC           0 :             warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
    2938                 :         }
    2939 CBC         715 :         Ok((partitioning_guard.0.clone(), partitioning_guard.1))
    2940            1417 :     }
    2941                 : 
    2942                 :     // Is it time to create a new image layer for the given partition?
    2943           35656 :     async fn time_for_new_image_layer(
    2944           35656 :         &self,
    2945           35656 :         partition: &KeySpace,
    2946           35656 :         lsn: Lsn,
    2947           35656 :     ) -> anyhow::Result<bool> {
    2948           35656 :         let threshold = self.get_image_creation_threshold();
    2949                 : 
    2950           35656 :         let guard = self.layers.read().await;
    2951           35656 :         let layers = guard.layer_map();
    2952           35656 : 
    2953           35656 :         let mut max_deltas = 0;
    2954           35656 :         {
    2955           35656 :             let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
    2956           35656 :             if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
    2957            4970 :                 let img_range =
    2958            4970 :                     partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
    2959            4970 :                 if wanted.overlaps(&img_range) {
    2960                 :                     //
    2961                 :                     // gc_timeline only pays attention to image layers that are older than the GC cutoff,
    2962                 :                     // but create_image_layers creates image layers at last-record-lsn.
    2963                 :                     // So it's possible that gc_timeline wants a new image layer to be created for a key range,
    2964                 :                     // but the range is already covered by image layers at more recent LSNs. Before we
    2965                 :                     // create a new image layer, check if the range is already covered at more recent LSNs.
    2966 UBC           0 :                     if !layers
    2967               0 :                         .image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))?
    2968                 :                     {
    2969               0 :                         debug!(
    2970               0 :                             "Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
    2971               0 :                             img_range.start, img_range.end, cutoff_lsn, lsn
    2972               0 :                         );
    2973               0 :                         return Ok(true);
    2974               0 :                     }
    2975 CBC        4970 :                 }
    2976           30686 :             }
    2977                 :         }
    2978                 : 
    2979         2021409 :         for part_range in &partition.ranges {
    2980         1991352 :             let image_coverage = layers.image_coverage(part_range, lsn)?;
    2981         3777583 :             for (img_range, last_img) in image_coverage {
    2982         1791830 :                 let img_lsn = if let Some(last_img) = last_img {
    2983          348730 :                     last_img.get_lsn_range().end
    2984                 :                 } else {
    2985         1443100 :                     Lsn(0)
    2986                 :                 };
    2987                 :                 // Let's consider an example:
    2988                 :                 //
    2989                 :                 // delta layer with LSN range 71-81
    2990                 :                 // delta layer with LSN range 81-91
    2991                 :                 // delta layer with LSN range 91-101
    2992                 :                 // image layer at LSN 100
    2993                 :                 //
    2994                 :                 // If 'lsn' is still 100, i.e. no new WAL has been processed since the last image layer,
    2995                 :                 // there's no need to create a new one. We check this case explicitly, to avoid passing
    2996                 :                 // a bogus range to count_deltas below, with start > end. It's even possible that there
    2997                 :                 // are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
    2998                 :                 // after we read last_record_lsn, which is passed here in the 'lsn' argument.
    2999         1791830 :                 if img_lsn < lsn {
    3000         1770176 :                     let num_deltas =
    3001         1770176 :                         layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?;
    3002                 : 
    3003         1770176 :                     max_deltas = max_deltas.max(num_deltas);
    3004         1770176 :                     if num_deltas >= threshold {
    3005 UBC           0 :                         debug!(
    3006               0 :                             "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
    3007               0 :                             img_range.start, img_range.end, num_deltas, img_lsn, lsn
    3008               0 :                         );
    3009 CBC        5599 :                         return Ok(true);
    3010         1764577 :                     }
    3011           21654 :                 }
    3012                 :             }
    3013                 :         }
    3014                 : 
    3015 UBC           0 :         debug!(
    3016               0 :             max_deltas,
    3017               0 :             "none of the partitioned ranges had >= {threshold} deltas"
    3018               0 :         );
    3019 CBC       30057 :         Ok(false)
    3020           35656 :     }
    3021                 : 
    3022 UBC           0 :     #[tracing::instrument(skip_all, fields(%lsn, %force))]
    3023                 :     async fn create_image_layers(
    3024                 :         self: &Arc<Timeline>,
    3025                 :         partitioning: &KeyPartitioning,
    3026                 :         lsn: Lsn,
    3027                 :         force: bool,
    3028                 :         ctx: &RequestContext,
    3029                 :     ) -> Result<Vec<ResidentLayer>, PageReconstructError> {
    3030                 :         let timer = self.metrics.create_images_time_histo.start_timer();
    3031                 :         let mut image_layers = Vec::new();
    3032                 : 
    3033                 :         // We need to avoid holes between generated image layers.
    3034                 :         // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
    3035                 :         // image layer with hole between them. In this case such layer can not be utilized by GC.
    3036                 :         //
    3037                 :         // How such hole between partitions can appear?
    3038                 :         // if we have relation with relid=1 and size 100 and relation with relid=2 with size 200 then result of
    3039                 :         // KeySpace::partition may contain partitions <100000000..100000099> and <200000000..200000199>.
    3040                 :         // If there is delta layer <100000000..300000000> then it never be garbage collected because
    3041                 :         // image layers  <100000000..100000099> and <200000000..200000199> are not completely covering it.
    3042                 :         let mut start = Key::MIN;
    3043                 : 
    3044                 :         for partition in partitioning.parts.iter() {
    3045                 :             let img_range = start..partition.ranges.last().unwrap().end;
    3046                 :             start = img_range.end;
    3047                 :             if force || self.time_for_new_image_layer(partition, lsn).await? {
    3048                 :                 let mut image_layer_writer = ImageLayerWriter::new(
    3049                 :                     self.conf,
    3050                 :                     self.timeline_id,
    3051                 :                     self.tenant_shard_id,
    3052                 :                     &img_range,
    3053                 :                     lsn,
    3054                 :                 )
    3055                 :                 .await?;
    3056                 : 
    3057               0 :                 fail_point!("image-layer-writer-fail-before-finish", |_| {
    3058               0 :                     Err(PageReconstructError::Other(anyhow::anyhow!(
    3059               0 :                         "failpoint image-layer-writer-fail-before-finish"
    3060               0 :                     )))
    3061               0 :                 });
    3062                 :                 for range in &partition.ranges {
    3063                 :                     let mut key = range.start;
    3064                 :                     while key < range.end {
    3065                 :                         if self.shard_identity.is_key_disposable(&key) {
    3066               0 :                             debug!(
    3067               0 :                                 "Dropping key {} during compaction (it belongs on shard {:?})",
    3068               0 :                                 key,
    3069               0 :                                 self.shard_identity.get_shard_number(&key)
    3070               0 :                             );
    3071                 :                             key = key.next();
    3072                 :                             continue;
    3073                 :                         }
    3074                 :                         let img = match self.get(key, lsn, ctx).await {
    3075                 :                             Ok(img) => img,
    3076                 :                             Err(err) => {
    3077                 :                                 // If we fail to reconstruct a VM or FSM page, we can zero the
    3078                 :                                 // page without losing any actual user data. That seems better
    3079                 :                                 // than failing repeatedly and getting stuck.
    3080                 :                                 //
    3081                 :                                 // We had a bug at one point, where we truncated the FSM and VM
    3082                 :                                 // in the pageserver, but the Postgres didn't know about that
    3083                 :                                 // and continued to generate incremental WAL records for pages
    3084                 :                                 // that didn't exist in the pageserver. Trying to replay those
    3085                 :                                 // WAL records failed to find the previous image of the page.
    3086                 :                                 // This special case allows us to recover from that situation.
    3087                 :                                 // See https://github.com/neondatabase/neon/issues/2601.
    3088                 :                                 //
    3089                 :                                 // Unfortunately we cannot do this for the main fork, or for
    3090                 :                                 // any metadata keys, keys, as that would lead to actual data
    3091                 :                                 // loss.
    3092                 :                                 if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
    3093               0 :                                     warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
    3094                 :                                     ZERO_PAGE.clone()
    3095                 :                                 } else {
    3096                 :                                     return Err(err);
    3097                 :                                 }
    3098                 :                             }
    3099                 :                         };
    3100                 : 
    3101                 :                         image_layer_writer.put_image(key, &img).await?;
    3102                 :                         key = key.next();
    3103                 :                     }
    3104                 :                 }
    3105                 :                 let image_layer = image_layer_writer.finish(self).await?;
    3106                 :                 image_layers.push(image_layer);
    3107                 :             }
    3108                 :         }
    3109                 :         // All layers that the GC wanted us to create have now been created.
    3110                 :         //
    3111                 :         // It's possible that another GC cycle happened while we were compacting, and added
    3112                 :         // something new to wanted_image_layers, and we now clear that before processing it.
    3113                 :         // That's OK, because the next GC iteration will put it back in.
    3114                 :         *self.wanted_image_layers.lock().unwrap() = None;
    3115                 : 
    3116                 :         // Sync the new layer to disk before adding it to the layer map, to make sure
    3117                 :         // we don't garbage collect something based on the new layer, before it has
    3118                 :         // reached the disk.
    3119                 :         //
    3120                 :         // We must also fsync the timeline dir to ensure the directory entries for
    3121                 :         // new layer files are durable
    3122                 :         //
    3123                 :         // Compaction creates multiple image layers. It would be better to create them all
    3124                 :         // and fsync them all in parallel.
    3125                 :         let all_paths = image_layers
    3126                 :             .iter()
    3127 CBC        5638 :             .map(|layer| layer.local_path().to_owned())
    3128                 :             .collect::<Vec<_>>();
    3129                 : 
    3130                 :         par_fsync::par_fsync_async(&all_paths)
    3131                 :             .await
    3132                 :             .context("fsync of newly created layer files")?;
    3133                 : 
    3134                 :         par_fsync::par_fsync_async(&[self
    3135                 :             .conf
    3136                 :             .timeline_path(&self.tenant_shard_id, &self.timeline_id)])
    3137                 :         .await
    3138                 :         .context("fsync of timeline dir")?;
    3139                 : 
    3140                 :         let mut guard = self.layers.write().await;
    3141                 : 
    3142                 :         // FIXME: we could add the images to be uploaded *before* returning from here, but right
    3143                 :         // now they are being scheduled outside of write lock
    3144                 :         guard.track_new_image_layers(&image_layers, &self.metrics);
    3145                 :         drop_wlock(guard);
    3146                 :         timer.stop_and_record();
    3147                 : 
    3148                 :         Ok(image_layers)
    3149                 :     }
    3150                 : 
    3151                 :     /// Wait until the background initial logical size calculation is complete, or
    3152                 :     /// this Timeline is shut down.  Calling this function will cause the initial
    3153                 :     /// logical size calculation to skip waiting for the background jobs barrier.
    3154             231 :     pub(crate) async fn await_initial_logical_size(self: Arc<Self>) {
    3155             231 :         if let Some(await_bg_cancel) = self
    3156             231 :             .current_logical_size
    3157             231 :             .cancel_wait_for_background_loop_concurrency_limit_semaphore
    3158             231 :             .get()
    3159             225 :         {
    3160             225 :             await_bg_cancel.cancel();
    3161             225 :         } else {
    3162                 :             // We should not wait if we were not able to explicitly instruct
    3163                 :             // the logical size cancellation to skip the concurrency limit semaphore.
    3164                 :             // TODO: this is an unexpected case.  We should restructure so that it
    3165                 :             // can't happen.
    3166               6 :             tracing::info!(
    3167               6 :                 "await_initial_logical_size: can't get semaphore cancel token, skipping"
    3168               6 :             );
    3169                 :         }
    3170                 : 
    3171             231 :         tokio::select!(
    3172             454 :             _ = self.current_logical_size.initialized.acquire() => {},
    3173             454 :             _ = self.cancel.cancelled() => {}
    3174             454 :         )
    3175             223 :     }
    3176                 : }
    3177                 : 
    3178            1086 : #[derive(Default)]
    3179                 : struct CompactLevel0Phase1Result {
    3180                 :     new_layers: Vec<ResidentLayer>,
    3181                 :     deltas_to_compact: Vec<Layer>,
    3182                 : }
    3183                 : 
    3184                 : /// Top-level failure to compact.
    3185 UBC           0 : #[derive(Debug, thiserror::Error)]
    3186                 : pub(crate) enum CompactionError {
    3187                 :     #[error("The timeline or pageserver is shutting down")]
    3188                 :     ShuttingDown,
    3189                 :     /// Compaction cannot be done right now; page reconstruction and so on.
    3190                 :     #[error(transparent)]
    3191                 :     Other(#[from] anyhow::Error),
    3192                 : }
    3193                 : 
    3194                 : #[serde_as]
    3195 CBC        3906 : #[derive(serde::Serialize)]
    3196                 : struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
    3197                 : 
    3198            9604 : #[derive(Default)]
    3199                 : enum DurationRecorder {
    3200                 :     #[default]
    3201                 :     NotStarted,
    3202                 :     Recorded(RecordedDuration, tokio::time::Instant),
    3203                 : }
    3204                 : 
    3205                 : impl DurationRecorder {
    3206            2783 :     pub fn till_now(&self) -> DurationRecorder {
    3207            2783 :         match self {
    3208                 :             DurationRecorder::NotStarted => {
    3209 UBC           0 :                 panic!("must only call on recorded measurements")
    3210                 :             }
    3211 CBC        2783 :             DurationRecorder::Recorded(_, ended) => {
    3212            2783 :                 let now = tokio::time::Instant::now();
    3213            2783 :                 DurationRecorder::Recorded(RecordedDuration(now - *ended), now)
    3214            2783 :             }
    3215            2783 :         }
    3216            2783 :     }
    3217            1953 :     pub fn into_recorded(self) -> Option<RecordedDuration> {
    3218            1953 :         match self {
    3219 UBC           0 :             DurationRecorder::NotStarted => None,
    3220 CBC        1953 :             DurationRecorder::Recorded(recorded, _) => Some(recorded),
    3221                 :         }
    3222            1953 :     }
    3223                 : }
    3224                 : 
    3225            1372 : #[derive(Default)]
    3226                 : struct CompactLevel0Phase1StatsBuilder {
    3227                 :     version: Option<u64>,
    3228                 :     tenant_id: Option<TenantShardId>,
    3229                 :     timeline_id: Option<TimelineId>,
    3230                 :     read_lock_acquisition_micros: DurationRecorder,
    3231                 :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    3232                 :     read_lock_held_key_sort_micros: DurationRecorder,
    3233                 :     read_lock_held_prerequisites_micros: DurationRecorder,
    3234                 :     read_lock_held_compute_holes_micros: DurationRecorder,
    3235                 :     read_lock_drop_micros: DurationRecorder,
    3236                 :     write_layer_files_micros: DurationRecorder,
    3237                 :     level0_deltas_count: Option<usize>,
    3238                 :     new_deltas_count: Option<usize>,
    3239                 :     new_deltas_size: Option<u64>,
    3240                 : }
    3241                 : 
    3242             279 : #[derive(serde::Serialize)]
    3243                 : struct CompactLevel0Phase1Stats {
    3244                 :     version: u64,
    3245                 :     tenant_id: TenantShardId,
    3246                 :     timeline_id: TimelineId,
    3247                 :     read_lock_acquisition_micros: RecordedDuration,
    3248                 :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    3249                 :     read_lock_held_key_sort_micros: RecordedDuration,
    3250                 :     read_lock_held_prerequisites_micros: RecordedDuration,
    3251                 :     read_lock_held_compute_holes_micros: RecordedDuration,
    3252                 :     read_lock_drop_micros: RecordedDuration,
    3253                 :     write_layer_files_micros: RecordedDuration,
    3254                 :     level0_deltas_count: usize,
    3255                 :     new_deltas_count: usize,
    3256                 :     new_deltas_size: u64,
    3257                 : }
    3258                 : 
    3259                 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    3260                 :     type Error = anyhow::Error;
    3261                 : 
    3262             279 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    3263             279 :         Ok(Self {
    3264             279 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    3265             279 :             tenant_id: value
    3266             279 :                 .tenant_id
    3267             279 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    3268             279 :             timeline_id: value
    3269             279 :                 .timeline_id
    3270             279 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    3271             279 :             read_lock_acquisition_micros: value
    3272             279 :                 .read_lock_acquisition_micros
    3273             279 :                 .into_recorded()
    3274             279 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    3275             279 :             read_lock_held_spawn_blocking_startup_micros: value
    3276             279 :                 .read_lock_held_spawn_blocking_startup_micros
    3277             279 :                 .into_recorded()
    3278             279 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    3279             279 :             read_lock_held_key_sort_micros: value
    3280             279 :                 .read_lock_held_key_sort_micros
    3281             279 :                 .into_recorded()
    3282             279 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    3283             279 :             read_lock_held_prerequisites_micros: value
    3284             279 :                 .read_lock_held_prerequisites_micros
    3285             279 :                 .into_recorded()
    3286             279 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    3287             279 :             read_lock_held_compute_holes_micros: value
    3288             279 :                 .read_lock_held_compute_holes_micros
    3289             279 :                 .into_recorded()
    3290             279 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    3291             279 :             read_lock_drop_micros: value
    3292             279 :                 .read_lock_drop_micros
    3293             279 :                 .into_recorded()
    3294             279 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    3295             279 :             write_layer_files_micros: value
    3296             279 :                 .write_layer_files_micros
    3297             279 :                 .into_recorded()
    3298             279 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    3299             279 :             level0_deltas_count: value
    3300             279 :                 .level0_deltas_count
    3301             279 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    3302             279 :             new_deltas_count: value
    3303             279 :                 .new_deltas_count
    3304             279 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    3305             279 :             new_deltas_size: value
    3306             279 :                 .new_deltas_size
    3307             279 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    3308                 :         })
    3309             279 :     }
    3310                 : }
    3311                 : 
    3312                 : impl Timeline {
    3313                 :     /// Level0 files first phase of compaction, explained in the [`Self::compact`] comment.
    3314            1372 :     async fn compact_level0_phase1(
    3315            1372 :         self: &Arc<Self>,
    3316            1372 :         guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
    3317            1372 :         mut stats: CompactLevel0Phase1StatsBuilder,
    3318            1372 :         target_file_size: u64,
    3319            1372 :         ctx: &RequestContext,
    3320            1372 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
    3321            1372 :         stats.read_lock_held_spawn_blocking_startup_micros =
    3322            1372 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
    3323            1372 :         let layers = guard.layer_map();
    3324            1372 :         let level0_deltas = layers.get_level0_deltas()?;
    3325            1372 :         let mut level0_deltas = level0_deltas
    3326            1372 :             .into_iter()
    3327            6135 :             .map(|x| guard.get_from_desc(&x))
    3328            1372 :             .collect_vec();
    3329            1372 :         stats.level0_deltas_count = Some(level0_deltas.len());
    3330            1372 :         // Only compact if enough layers have accumulated.
    3331            1372 :         let threshold = self.get_compaction_threshold();
    3332            1372 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
    3333 UBC           0 :             debug!(
    3334               0 :                 level0_deltas = level0_deltas.len(),
    3335               0 :                 threshold, "too few deltas to compact"
    3336               0 :             );
    3337 CBC        1086 :             return Ok(CompactLevel0Phase1Result::default());
    3338             286 :         }
    3339             286 : 
    3340             286 :         // This failpoint is used together with `test_duplicate_layers` integration test.
    3341             286 :         // It returns the compaction result exactly the same layers as input to compaction.
    3342             286 :         // We want to ensure that this will not cause any problem when updating the layer map
    3343             286 :         // after the compaction is finished.
    3344             286 :         //
    3345             286 :         // Currently, there are two rare edge cases that will cause duplicated layers being
    3346             286 :         // inserted.
    3347             286 :         // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
    3348             286 :         //    is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
    3349             286 :         //    map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
    3350             286 :         //    point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
    3351             286 :         //    and this causes an overwrite. This is acceptable because the content is the same, and we should do a
    3352             286 :         //    layer replace instead of the normal remove / upload process.
    3353             286 :         // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
    3354             286 :         //    size length. Compaction will likely create the same set of n files afterwards.
    3355             286 :         //
    3356             286 :         // This failpoint is a superset of both of the cases.
    3357             286 :         if cfg!(feature = "testing") {
    3358             286 :             let active = (|| {
    3359             286 :                 ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
    3360             286 :                 false
    3361             286 :             })();
    3362             286 : 
    3363             286 :             if active {
    3364               3 :                 let mut new_layers = Vec::with_capacity(level0_deltas.len());
    3365              43 :                 for delta in &level0_deltas {
    3366                 :                     // we are just faking these layers as being produced again for this failpoint
    3367              40 :                     new_layers.push(
    3368              40 :                         delta
    3369              40 :                             .download_and_keep_resident()
    3370 UBC           0 :                             .await
    3371 CBC          40 :                             .context("download layer for failpoint")?,
    3372                 :                     );
    3373                 :                 }
    3374               3 :                 tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
    3375               3 :                 return Ok(CompactLevel0Phase1Result {
    3376               3 :                     new_layers,
    3377               3 :                     deltas_to_compact: level0_deltas,
    3378               3 :                 });
    3379             283 :             }
    3380 UBC           0 :         }
    3381                 : 
    3382                 :         // Gather the files to compact in this iteration.
    3383                 :         //
    3384                 :         // Start with the oldest Level 0 delta file, and collect any other
    3385                 :         // level 0 files that form a contiguous sequence, such that the end
    3386                 :         // LSN of previous file matches the start LSN of the next file.
    3387                 :         //
    3388                 :         // Note that if the files don't form such a sequence, we might
    3389                 :         // "compact" just a single file. That's a bit pointless, but it allows
    3390                 :         // us to get rid of the level 0 file, and compact the other files on
    3391                 :         // the next iteration. This could probably made smarter, but such
    3392                 :         // "gaps" in the sequence of level 0 files should only happen in case
    3393                 :         // of a crash, partial download from cloud storage, or something like
    3394                 :         // that, so it's not a big deal in practice.
    3395 CBC        7212 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
    3396             283 :         let mut level0_deltas_iter = level0_deltas.iter();
    3397             283 : 
    3398             283 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
    3399             283 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
    3400             283 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
    3401             283 : 
    3402             283 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
    3403            3670 :         for l in level0_deltas_iter {
    3404            3387 :             let lsn_range = &l.layer_desc().lsn_range;
    3405            3387 : 
    3406            3387 :             if lsn_range.start != prev_lsn_end {
    3407 UBC           0 :                 break;
    3408 CBC        3387 :             }
    3409            3387 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
    3410            3387 :             prev_lsn_end = lsn_range.end;
    3411                 :         }
    3412             283 :         let lsn_range = Range {
    3413             283 :             start: deltas_to_compact
    3414             283 :                 .first()
    3415             283 :                 .unwrap()
    3416             283 :                 .layer_desc()
    3417             283 :                 .lsn_range
    3418             283 :                 .start,
    3419             283 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
    3420             283 :         };
    3421                 : 
    3422             283 :         info!(
    3423             283 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
    3424             283 :             lsn_range.start,
    3425             283 :             lsn_range.end,
    3426             283 :             deltas_to_compact.len(),
    3427             283 :             level0_deltas.len()
    3428             283 :         );
    3429                 : 
    3430            3670 :         for l in deltas_to_compact.iter() {
    3431            3670 :             info!("compact includes {l}");
    3432                 :         }
    3433                 : 
    3434                 :         // We don't need the original list of layers anymore. Drop it so that
    3435                 :         // we don't accidentally use it later in the function.
    3436             283 :         drop(level0_deltas);
    3437             283 : 
    3438             283 :         stats.read_lock_held_prerequisites_micros = stats
    3439             283 :             .read_lock_held_spawn_blocking_startup_micros
    3440             283 :             .till_now();
    3441             283 : 
    3442             283 :         // Determine N largest holes where N is number of compacted layers.
    3443             283 :         let max_holes = deltas_to_compact.len();
    3444             283 :         let last_record_lsn = self.get_last_record_lsn();
    3445             283 :         let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
    3446             283 :         let min_hole_coverage_size = 3; // TODO: something more flexible?
    3447             283 : 
    3448             283 :         // min-heap (reserve space for one more element added before eviction)
    3449             283 :         let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
    3450             283 :         let mut prev: Option<Key> = None;
    3451             283 : 
    3452             283 :         let mut all_keys = Vec::new();
    3453                 : 
    3454            3670 :         for l in deltas_to_compact.iter() {
    3455            3670 :             all_keys.extend(l.load_keys(ctx).await?);
    3456                 :         }
    3457                 : 
    3458                 :         // FIXME: should spawn_blocking the rest of this function
    3459                 : 
    3460                 :         // The current stdlib sorting implementation is designed in a way where it is
    3461                 :         // particularly fast where the slice is made up of sorted sub-ranges.
    3462       161096304 :         all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    3463             283 : 
    3464             283 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
    3465                 : 
    3466        16792830 :         for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
    3467        16792830 :             if let Some(prev_key) = prev {
    3468                 :                 // just first fast filter
    3469        16792547 :                 if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
    3470          193991 :                     let key_range = prev_key..next_key;
    3471                 :                     // Measuring hole by just subtraction of i128 representation of key range boundaries
    3472                 :                     // has not so much sense, because largest holes will corresponds field1/field2 changes.
    3473                 :                     // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
    3474                 :                     // That is why it is better to measure size of hole as number of covering image layers.
    3475          193991 :                     let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
    3476          193991 :                     if coverage_size >= min_hole_coverage_size {
    3477             192 :                         heap.push(Hole {
    3478             192 :                             key_range,
    3479             192 :                             coverage_size,
    3480             192 :                         });
    3481             192 :                         if heap.len() > max_holes {
    3482             107 :                             heap.pop(); // remove smallest hole
    3483             107 :                         }
    3484          193799 :                     }
    3485        16598556 :                 }
    3486             283 :             }
    3487        16792830 :             prev = Some(next_key.next());
    3488                 :         }
    3489             283 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
    3490             283 :         drop_rlock(guard);
    3491             283 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
    3492             283 :         let mut holes = heap.into_vec();
    3493             283 :         holes.sort_unstable_by_key(|hole| hole.key_range.start);
    3494             283 :         let mut next_hole = 0; // index of next hole in holes vector
    3495             283 : 
    3496             283 :         // This iterator walks through all key-value pairs from all the layers
    3497             283 :         // we're compacting, in key, LSN order.
    3498             283 :         let all_values_iter = all_keys.iter();
    3499             283 : 
    3500             283 :         // This iterator walks through all keys and is needed to calculate size used by each key
    3501             283 :         let mut all_keys_iter = all_keys
    3502             283 :             .iter()
    3503        16394012 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
    3504        16393729 :             .coalesce(|mut prev, cur| {
    3505        16393729 :                 // Coalesce keys that belong to the same key pair.
    3506        16393729 :                 // This ensures that compaction doesn't put them
    3507        16393729 :                 // into different layer files.
    3508        16393729 :                 // Still limit this by the target file size,
    3509        16393729 :                 // so that we keep the size of the files in
    3510        16393729 :                 // check.
    3511        16393729 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
    3512        14636511 :                     prev.2 += cur.2;
    3513        14636511 :                     Ok(prev)
    3514                 :                 } else {
    3515         1757218 :                     Err((prev, cur))
    3516                 :                 }
    3517        16393729 :             });
    3518             283 : 
    3519             283 :         // Merge the contents of all the input delta layers into a new set
    3520             283 :         // of delta layers, based on the current partitioning.
    3521             283 :         //
    3522             283 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
    3523             283 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
    3524             283 :         // would be too large. In that case, we also split on the LSN dimension.
    3525             283 :         //
    3526             283 :         // LSN
    3527             283 :         //  ^
    3528             283 :         //  |
    3529             283 :         //  | +-----------+            +--+--+--+--+
    3530             283 :         //  | |           |            |  |  |  |  |
    3531             283 :         //  | +-----------+            |  |  |  |  |
    3532             283 :         //  | |           |            |  |  |  |  |
    3533             283 :         //  | +-----------+     ==>    |  |  |  |  |
    3534             283 :         //  | |           |            |  |  |  |  |
    3535             283 :         //  | +-----------+            |  |  |  |  |
    3536             283 :         //  | |           |            |  |  |  |  |
    3537             283 :         //  | +-----------+            +--+--+--+--+
    3538             283 :         //  |
    3539             283 :         //  +--------------> key
    3540             283 :         //
    3541             283 :         //
    3542             283 :         // If one key (X) has a lot of page versions:
    3543             283 :         //
    3544             283 :         // LSN
    3545             283 :         //  ^
    3546             283 :         //  |                                 (X)
    3547             283 :         //  | +-----------+            +--+--+--+--+
    3548             283 :         //  | |           |            |  |  |  |  |
    3549             283 :         //  | +-----------+            |  |  +--+  |
    3550             283 :         //  | |           |            |  |  |  |  |
    3551             283 :         //  | +-----------+     ==>    |  |  |  |  |
    3552             283 :         //  | |           |            |  |  +--+  |
    3553             283 :         //  | +-----------+            |  |  |  |  |
    3554             283 :         //  | |           |            |  |  |  |  |
    3555             283 :         //  | +-----------+            +--+--+--+--+
    3556             283 :         //  |
    3557             283 :         //  +--------------> key
    3558             283 :         // TODO: this actually divides the layers into fixed-size chunks, not
    3559             283 :         // based on the partitioning.
    3560             283 :         //
    3561             283 :         // TODO: we should also opportunistically materialize and
    3562             283 :         // garbage collect what we can.
    3563             283 :         let mut new_layers = Vec::new();
    3564             283 :         let mut prev_key: Option<Key> = None;
    3565             283 :         let mut writer: Option<DeltaLayerWriter> = None;
    3566             283 :         let mut key_values_total_size = 0u64;
    3567             283 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
    3568             283 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
    3569                 : 
    3570                 :         for &DeltaEntry {
    3571        16392715 :             key, lsn, ref val, ..
    3572        16392994 :         } in all_values_iter
    3573                 :         {
    3574        16392715 :             let value = val.load(ctx).await?;
    3575        16392713 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
    3576        16392713 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    3577        16392713 :             if !same_key || lsn == dup_end_lsn {
    3578         1757493 :                 let mut next_key_size = 0u64;
    3579         1757493 :                 let is_dup_layer = dup_end_lsn.is_valid();
    3580         1757493 :                 dup_start_lsn = Lsn::INVALID;
    3581         1757493 :                 if !same_key {
    3582         1757464 :                     dup_end_lsn = Lsn::INVALID;
    3583         1757464 :                 }
    3584                 :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    3585         1757497 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    3586         1757497 :                     next_key_size = next_size;
    3587         1757497 :                     if key != next_key {
    3588         1757185 :                         if dup_end_lsn.is_valid() {
    3589              12 :                             // We are writting segment with duplicates:
    3590              12 :                             // place all remaining values of this key in separate segment
    3591              12 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    3592              12 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    3593         1757173 :                         }
    3594         1757185 :                         break;
    3595             312 :                     }
    3596             312 :                     key_values_total_size += next_size;
    3597             312 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    3598             312 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    3599             312 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    3600                 :                         // Split key between multiple layers: such layer can contain only single key
    3601              29 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    3602              17 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    3603                 :                         } else {
    3604              12 :                             lsn // start with the first LSN for this key
    3605                 :                         };
    3606              29 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    3607              29 :                         break;
    3608             283 :                     }
    3609                 :                 }
    3610                 :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    3611         1757493 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    3612 UBC           0 :                     dup_start_lsn = dup_end_lsn;
    3613               0 :                     dup_end_lsn = lsn_range.end;
    3614 CBC     1757493 :                 }
    3615         1757493 :                 if writer.is_some() {
    3616         1757210 :                     let written_size = writer.as_mut().unwrap().size();
    3617         1757210 :                     let contains_hole =
    3618         1757210 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    3619                 :                     // check if key cause layer overflow or contains hole...
    3620         1757210 :                     if is_dup_layer
    3621         1757169 :                         || dup_end_lsn.is_valid()
    3622         1757159 :                         || written_size + key_values_total_size > target_file_size
    3623         1747160 :                         || contains_hole
    3624                 :                     {
    3625                 :                         // ... if so, flush previous layer and prepare to write new one
    3626           10133 :                         new_layers.push(
    3627           10133 :                             writer
    3628           10133 :                                 .take()
    3629           10133 :                                 .unwrap()
    3630           10133 :                                 .finish(prev_key.unwrap().next(), self)
    3631 UBC           0 :                                 .await?,
    3632                 :                         );
    3633 CBC       10133 :                         writer = None;
    3634           10133 : 
    3635           10133 :                         if contains_hole {
    3636              85 :                             // skip hole
    3637              85 :                             next_hole += 1;
    3638           10048 :                         }
    3639         1747077 :                     }
    3640             283 :                 }
    3641                 :                 // Remember size of key value because at next iteration we will access next item
    3642         1757493 :                 key_values_total_size = next_key_size;
    3643        14635220 :             }
    3644        16392713 :             if writer.is_none() {
    3645           10416 :                 // Create writer if not initiaized yet
    3646           10416 :                 writer = Some(
    3647                 :                     DeltaLayerWriter::new(
    3648           10416 :                         self.conf,
    3649           10416 :                         self.timeline_id,
    3650           10416 :                         self.tenant_shard_id,
    3651           10416 :                         key,
    3652           10416 :                         if dup_end_lsn.is_valid() {
    3653                 :                             // this is a layer containing slice of values of the same key
    3654 UBC           0 :                             debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    3655 CBC          41 :                             dup_start_lsn..dup_end_lsn
    3656                 :                         } else {
    3657 UBC           0 :                             debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    3658 CBC       10375 :                             lsn_range.clone()
    3659                 :                         },
    3660                 :                     )
    3661 UBC           0 :                     .await?,
    3662                 :                 );
    3663 CBC    16382297 :             }
    3664                 : 
    3665        16392713 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    3666 UBC           0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    3667               0 :                     "failpoint delta-layer-writer-fail-before-finish"
    3668               0 :                 )))
    3669 CBC    16392713 :             });
    3670                 : 
    3671        16392713 :             if !self.shard_identity.is_key_disposable(&key) {
    3672        16392713 :                 writer.as_mut().unwrap().put_value(key, lsn, value).await?;
    3673                 :             } else {
    3674 UBC           0 :                 debug!(
    3675               0 :                     "Dropping key {} during compaction (it belongs on shard {:?})",
    3676               0 :                     key,
    3677               0 :                     self.shard_identity.get_shard_number(&key)
    3678               0 :                 );
    3679                 :             }
    3680                 : 
    3681 CBC    16392711 :             if !new_layers.is_empty() {
    3682        13626799 :                 fail_point!("after-timeline-compacted-first-L1");
    3683        13626799 :             }
    3684                 : 
    3685        16392711 :             prev_key = Some(key);
    3686                 :         }
    3687             279 :         if let Some(writer) = writer {
    3688             279 :             new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
    3689 UBC           0 :         }
    3690                 : 
    3691                 :         // Sync layers
    3692 CBC         279 :         if !new_layers.is_empty() {
    3693                 :             // Print a warning if the created layer is larger than double the target size
    3694                 :             // Add two pages for potential overhead. This should in theory be already
    3695                 :             // accounted for in the target calculation, but for very small targets,
    3696                 :             // we still might easily hit the limit otherwise.
    3697             279 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    3698           10385 :             for layer in new_layers.iter() {
    3699           10385 :                 if layer.layer_desc().file_size > warn_limit {
    3700 UBC           0 :                     warn!(
    3701               0 :                         %layer,
    3702               0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    3703               0 :                     );
    3704 CBC       10385 :                 }
    3705                 :             }
    3706                 : 
    3707                 :             // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
    3708             279 :             let layer_paths: Vec<Utf8PathBuf> = new_layers
    3709             279 :                 .iter()
    3710           10385 :                 .map(|l| l.local_path().to_owned())
    3711             279 :                 .collect();
    3712             279 : 
    3713             279 :             // Fsync all the layer files and directory using multiple threads to
    3714             279 :             // minimize latency.
    3715             279 :             par_fsync::par_fsync_async(&layer_paths)
    3716             483 :                 .await
    3717             279 :                 .context("fsync all new layers")?;
    3718                 : 
    3719             279 :             let timeline_dir = self
    3720             279 :                 .conf
    3721             279 :                 .timeline_path(&self.tenant_shard_id, &self.timeline_id);
    3722             279 : 
    3723             279 :             par_fsync::par_fsync_async(&[timeline_dir])
    3724             306 :                 .await
    3725             279 :                 .context("fsync of timeline dir")?;
    3726 UBC           0 :         }
    3727                 : 
    3728 CBC         279 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    3729             279 :         stats.new_deltas_count = Some(new_layers.len());
    3730           10385 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    3731             279 : 
    3732             279 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    3733             279 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    3734                 :         {
    3735             279 :             Ok(stats_json) => {
    3736             279 :                 info!(
    3737             279 :                     stats_json = stats_json.as_str(),
    3738             279 :                     "compact_level0_phase1 stats available"
    3739             279 :                 )
    3740                 :             }
    3741 UBC           0 :             Err(e) => {
    3742               0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    3743                 :             }
    3744                 :         }
    3745                 : 
    3746 CBC         279 :         Ok(CompactLevel0Phase1Result {
    3747             279 :             new_layers,
    3748             279 :             deltas_to_compact: deltas_to_compact
    3749             279 :                 .into_iter()
    3750            3603 :                 .map(|x| x.drop_eviction_guard())
    3751             279 :                 .collect::<Vec<_>>(),
    3752             279 :         })
    3753            1368 :     }
    3754                 : 
    3755                 :     ///
    3756                 :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
    3757                 :     /// as Level 1 files.
    3758                 :     ///
    3759            1372 :     async fn compact_level0(
    3760            1372 :         self: &Arc<Self>,
    3761            1372 :         target_file_size: u64,
    3762            1372 :         ctx: &RequestContext,
    3763            1372 :     ) -> Result<(), CompactionError> {
    3764                 :         let CompactLevel0Phase1Result {
    3765            1368 :             new_layers,
    3766            1368 :             deltas_to_compact,
    3767                 :         } = {
    3768            1372 :             let phase1_span = info_span!("compact_level0_phase1");
    3769            1372 :             let ctx = ctx.attached_child();
    3770            1372 :             let mut stats = CompactLevel0Phase1StatsBuilder {
    3771            1372 :                 version: Some(2),
    3772            1372 :                 tenant_id: Some(self.tenant_shard_id),
    3773            1372 :                 timeline_id: Some(self.timeline_id),
    3774            1372 :                 ..Default::default()
    3775            1372 :             };
    3776            1372 : 
    3777            1372 :             let begin = tokio::time::Instant::now();
    3778            1372 :             let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
    3779            1372 :             let now = tokio::time::Instant::now();
    3780            1372 :             stats.read_lock_acquisition_micros =
    3781            1372 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
    3782            1372 :             self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
    3783            1372 :                 .instrument(phase1_span)
    3784          269983 :                 .await?
    3785                 :         };
    3786                 : 
    3787            1368 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
    3788                 :             // nothing to do
    3789            1086 :             return Ok(());
    3790             282 :         }
    3791                 : 
    3792             282 :         let mut guard = self.layers.write().await;
    3793                 : 
    3794             282 :         let mut duplicated_layers = HashSet::new();
    3795             282 : 
    3796             282 :         let mut insert_layers = Vec::with_capacity(new_layers.len());
    3797                 : 
    3798           10707 :         for l in &new_layers {
    3799           10425 :             if guard.contains(l.as_ref()) {
    3800                 :                 // expected in tests
    3801              40 :                 tracing::error!(layer=%l, "duplicated L1 layer");
    3802                 : 
    3803                 :                 // good ways to cause a duplicate: we repeatedly error after taking the writelock
    3804                 :                 // `guard`  on self.layers. as of writing this, there are no error returns except
    3805                 :                 // for compact_level0_phase1 creating an L0, which does not happen in practice
    3806                 :                 // because we have not implemented L0 => L0 compaction.
    3807              40 :                 duplicated_layers.insert(l.layer_desc().key());
    3808           10385 :             } else if LayerMap::is_l0(l.layer_desc()) {
    3809 UBC           0 :                 return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
    3810 CBC       10385 :             } else {
    3811           10385 :                 insert_layers.push(l.clone());
    3812           10385 :             }
    3813                 :         }
    3814                 : 
    3815             282 :         let remove_layers = {
    3816             282 :             let mut deltas_to_compact = deltas_to_compact;
    3817             282 :             // only remove those inputs which were not outputs
    3818            3643 :             deltas_to_compact.retain(|l| !duplicated_layers.contains(&l.layer_desc().key()));
    3819             282 :             deltas_to_compact
    3820             282 :         };
    3821             282 : 
    3822             282 :         // deletion will happen later, the layer file manager calls garbage_collect_on_drop
    3823             282 :         guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
    3824                 : 
    3825             282 :         if let Some(remote_client) = self.remote_client.as_ref() {
    3826             282 :             remote_client.schedule_compaction_update(&remove_layers, &new_layers)?;
    3827 UBC           0 :         }
    3828                 : 
    3829 CBC         282 :         drop_wlock(guard);
    3830             282 : 
    3831             282 :         Ok(())
    3832            1368 :     }
    3833                 : 
    3834                 :     /// Update information about which layer files need to be retained on
    3835                 :     /// garbage collection. This is separate from actually performing the GC,
    3836                 :     /// and is updated more frequently, so that compaction can remove obsolete
    3837                 :     /// page versions more aggressively.
    3838                 :     ///
    3839                 :     /// TODO: that's wishful thinking, compaction doesn't actually do that
    3840                 :     /// currently.
    3841                 :     ///
    3842                 :     /// The caller specifies how much history is needed with the 3 arguments:
    3843                 :     ///
    3844                 :     /// retain_lsns: keep a version of each page at these LSNs
    3845                 :     /// cutoff_horizon: also keep everything newer than this LSN
    3846                 :     /// pitr: the time duration required to keep data for PITR
    3847                 :     ///
    3848                 :     /// The 'retain_lsns' list is currently used to prevent removing files that
    3849                 :     /// are needed by child timelines. In the future, the user might be able to
    3850                 :     /// name additional points in time to retain. The caller is responsible for
    3851                 :     /// collecting that information.
    3852                 :     ///
    3853                 :     /// The 'cutoff_horizon' point is used to retain recent versions that might still be
    3854                 :     /// needed by read-only nodes. (As of this writing, the caller just passes
    3855                 :     /// the latest LSN subtracted by a constant, and doesn't do anything smart
    3856                 :     /// to figure out what read-only nodes might actually need.)
    3857                 :     ///
    3858                 :     /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
    3859                 :     /// whether a record is needed for PITR.
    3860                 :     ///
    3861                 :     /// NOTE: This function holds a short-lived lock to protect the 'gc_info'
    3862                 :     /// field, so that the three values passed as argument are stored
    3863                 :     /// atomically. But the caller is responsible for ensuring that no new
    3864                 :     /// branches are created that would need to be included in 'retain_lsns',
    3865                 :     /// for example. The caller should hold `Tenant::gc_cs` lock to ensure
    3866                 :     /// that.
    3867                 :     ///
    3868 UBC           0 :     #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
    3869                 :     pub(super) async fn update_gc_info(
    3870                 :         &self,
    3871                 :         retain_lsns: Vec<Lsn>,
    3872                 :         cutoff_horizon: Lsn,
    3873                 :         pitr: Duration,
    3874                 :         cancel: &CancellationToken,
    3875                 :         ctx: &RequestContext,
    3876                 :     ) -> anyhow::Result<()> {
    3877                 :         // First, calculate pitr_cutoff_timestamp and then convert it to LSN.
    3878                 :         //
    3879                 :         // Some unit tests depend on garbage-collection working even when
    3880                 :         // CLOG data is missing, so that find_lsn_for_timestamp() doesn't
    3881                 :         // work, so avoid calling it altogether if time-based retention is not
    3882                 :         // configured. It would be pointless anyway.
    3883                 :         let pitr_cutoff = if pitr != Duration::ZERO {
    3884                 :             let now = SystemTime::now();
    3885                 :             if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
    3886                 :                 let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
    3887                 : 
    3888                 :                 match self
    3889                 :                     .find_lsn_for_timestamp(pitr_timestamp, cancel, ctx)
    3890                 :                     .await?
    3891                 :                 {
    3892                 :                     LsnForTimestamp::Present(lsn) => lsn,
    3893                 :                     LsnForTimestamp::Future(lsn) => {
    3894                 :                         // The timestamp is in the future. That sounds impossible,
    3895                 :                         // but what it really means is that there hasn't been
    3896                 :                         // any commits since the cutoff timestamp.
    3897               0 :                         debug!("future({})", lsn);
    3898                 :                         cutoff_horizon
    3899                 :                     }
    3900                 :                     LsnForTimestamp::Past(lsn) => {
    3901               0 :                         debug!("past({})", lsn);
    3902                 :                         // conservative, safe default is to remove nothing, when we
    3903                 :                         // have no commit timestamp data available
    3904                 :                         *self.get_latest_gc_cutoff_lsn()
    3905                 :                     }
    3906                 :                     LsnForTimestamp::NoData(lsn) => {
    3907               0 :                         debug!("nodata({})", lsn);
    3908                 :                         // conservative, safe default is to remove nothing, when we
    3909                 :                         // have no commit timestamp data available
    3910                 :                         *self.get_latest_gc_cutoff_lsn()
    3911                 :                     }
    3912                 :                 }
    3913                 :             } else {
    3914                 :                 // If we don't have enough data to convert to LSN,
    3915                 :                 // play safe and don't remove any layers.
    3916                 :                 *self.get_latest_gc_cutoff_lsn()
    3917                 :             }
    3918                 :         } else {
    3919                 :             // No time-based retention was configured. Set time-based cutoff to
    3920                 :             // same as LSN based.
    3921                 :             cutoff_horizon
    3922                 :         };
    3923                 : 
    3924                 :         // Grab the lock and update the values
    3925                 :         *self.gc_info.write().unwrap() = GcInfo {
    3926                 :             retain_lsns,
    3927                 :             horizon_cutoff: cutoff_horizon,
    3928                 :             pitr_cutoff,
    3929                 :         };
    3930                 : 
    3931                 :         Ok(())
    3932                 :     }
    3933                 : 
    3934                 :     /// Garbage collect layer files on a timeline that are no longer needed.
    3935                 :     ///
    3936                 :     /// Currently, we don't make any attempt at removing unneeded page versions
    3937                 :     /// within a layer file. We can only remove the whole file if it's fully
    3938                 :     /// obsolete.
    3939 CBC         646 :     pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
    3940             646 :         // this is most likely the background tasks, but it might be the spawned task from
    3941             646 :         // immediate_gc
    3942             646 :         let cancel = crate::task_mgr::shutdown_token();
    3943             646 :         let _g = tokio::select! {
    3944             646 :             guard = self.gc_lock.lock() => guard,
    3945                 :             _ = self.cancel.cancelled() => return Ok(GcResult::default()),
    3946                 :             _ = cancel.cancelled() => return Ok(GcResult::default()),
    3947                 :         };
    3948             646 :         let timer = self.metrics.garbage_collect_histo.start_timer();
    3949                 : 
    3950 UBC           0 :         fail_point!("before-timeline-gc");
    3951                 : 
    3952                 :         // Is the timeline being deleted?
    3953 CBC         646 :         if self.is_stopping() {
    3954 UBC           0 :             anyhow::bail!("timeline is Stopping");
    3955 CBC         646 :         }
    3956             646 : 
    3957             646 :         let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
    3958             646 :             let gc_info = self.gc_info.read().unwrap();
    3959             646 : 
    3960             646 :             let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
    3961             646 :             let pitr_cutoff = gc_info.pitr_cutoff;
    3962             646 :             let retain_lsns = gc_info.retain_lsns.clone();
    3963             646 :             (horizon_cutoff, pitr_cutoff, retain_lsns)
    3964             646 :         };
    3965             646 : 
    3966             646 :         let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
    3967                 : 
    3968             646 :         let res = self
    3969             646 :             .gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
    3970             646 :             .instrument(
    3971             646 :                 info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
    3972                 :             )
    3973             178 :             .await?;
    3974                 : 
    3975                 :         // only record successes
    3976             641 :         timer.stop_and_record();
    3977             641 : 
    3978             641 :         Ok(res)
    3979             641 :     }
    3980                 : 
    3981             646 :     async fn gc_timeline(
    3982             646 :         &self,
    3983             646 :         horizon_cutoff: Lsn,
    3984             646 :         pitr_cutoff: Lsn,
    3985             646 :         retain_lsns: Vec<Lsn>,
    3986             646 :         new_gc_cutoff: Lsn,
    3987             646 :     ) -> anyhow::Result<GcResult> {
    3988             646 :         let now = SystemTime::now();
    3989             646 :         let mut result: GcResult = GcResult::default();
    3990             646 : 
    3991             646 :         // Nothing to GC. Return early.
    3992             646 :         let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
    3993             646 :         if latest_gc_cutoff >= new_gc_cutoff {
    3994             104 :             info!(
    3995             104 :                 "Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
    3996             104 :             );
    3997             104 :             return Ok(result);
    3998             542 :         }
    3999                 : 
    4000                 :         // We need to ensure that no one tries to read page versions or create
    4001                 :         // branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
    4002                 :         // for details. This will block until the old value is no longer in use.
    4003                 :         //
    4004                 :         // The GC cutoff should only ever move forwards.
    4005             542 :         let waitlist = {
    4006             542 :             let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
    4007             542 :             ensure!(
    4008             542 :                 *write_guard <= new_gc_cutoff,
    4009 UBC           0 :                 "Cannot move GC cutoff LSN backwards (was {}, new {})",
    4010               0 :                 *write_guard,
    4011                 :                 new_gc_cutoff
    4012                 :             );
    4013 CBC         542 :             write_guard.store_and_unlock(new_gc_cutoff)
    4014             542 :         };
    4015             542 :         waitlist.wait().await;
    4016                 : 
    4017             542 :         info!("GC starting");
    4018                 : 
    4019 UBC           0 :         debug!("retain_lsns: {:?}", retain_lsns);
    4020                 : 
    4021 CBC         542 :         let mut layers_to_remove = Vec::new();
    4022             542 :         let mut wanted_image_layers = KeySpaceRandomAccum::default();
    4023                 : 
    4024                 :         // Scan all layers in the timeline (remote or on-disk).
    4025                 :         //
    4026                 :         // Garbage collect the layer if all conditions are satisfied:
    4027                 :         // 1. it is older than cutoff LSN;
    4028                 :         // 2. it is older than PITR interval;
    4029                 :         // 3. it doesn't need to be retained for 'retain_lsns';
    4030                 :         // 4. newer on-disk image layers cover the layer's whole key range
    4031                 :         //
    4032                 :         // TODO holding a write lock is too agressive and avoidable
    4033             542 :         let mut guard = self.layers.write().await;
    4034             542 :         let layers = guard.layer_map();
    4035           16763 :         'outer: for l in layers.iter_historic_layers() {
    4036           16763 :             result.layers_total += 1;
    4037           16763 : 
    4038           16763 :             // 1. Is it newer than GC horizon cutoff point?
    4039           16763 :             if l.get_lsn_range().end > horizon_cutoff {
    4040 UBC           0 :                 debug!(
    4041               0 :                     "keeping {} because it's newer than horizon_cutoff {}",
    4042               0 :                     l.filename(),
    4043               0 :                     horizon_cutoff,
    4044               0 :                 );
    4045 CBC        1770 :                 result.layers_needed_by_cutoff += 1;
    4046            1770 :                 continue 'outer;
    4047           14993 :             }
    4048           14993 : 
    4049           14993 :             // 2. It is newer than PiTR cutoff point?
    4050           14993 :             if l.get_lsn_range().end > pitr_cutoff {
    4051 UBC           0 :                 debug!(
    4052               0 :                     "keeping {} because it's newer than pitr_cutoff {}",
    4053               0 :                     l.filename(),
    4054               0 :                     pitr_cutoff,
    4055               0 :                 );
    4056 CBC          47 :                 result.layers_needed_by_pitr += 1;
    4057              47 :                 continue 'outer;
    4058           14946 :             }
    4059                 : 
    4060                 :             // 3. Is it needed by a child branch?
    4061                 :             // NOTE With that we would keep data that
    4062                 :             // might be referenced by child branches forever.
    4063                 :             // We can track this in child timeline GC and delete parent layers when
    4064                 :             // they are no longer needed. This might be complicated with long inheritance chains.
    4065                 :             //
    4066                 :             // TODO Vec is not a great choice for `retain_lsns`
    4067           15090 :             for retain_lsn in &retain_lsns {
    4068                 :                 // start_lsn is inclusive
    4069             557 :                 if &l.get_lsn_range().start <= retain_lsn {
    4070 UBC           0 :                     debug!(
    4071               0 :                         "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
    4072               0 :                         l.filename(),
    4073               0 :                         retain_lsn,
    4074               0 :                         l.is_incremental(),
    4075               0 :                     );
    4076 CBC         413 :                     result.layers_needed_by_branches += 1;
    4077             413 :                     continue 'outer;
    4078             144 :                 }
    4079                 :             }
    4080                 : 
    4081                 :             // 4. Is there a later on-disk layer for this relation?
    4082                 :             //
    4083                 :             // The end-LSN is exclusive, while disk_consistent_lsn is
    4084                 :             // inclusive. For example, if disk_consistent_lsn is 100, it is
    4085                 :             // OK for a delta layer to have end LSN 101, but if the end LSN
    4086                 :             // is 102, then it might not have been fully flushed to disk
    4087                 :             // before crash.
    4088                 :             //
    4089                 :             // For example, imagine that the following layers exist:
    4090                 :             //
    4091                 :             // 1000      - image (A)
    4092                 :             // 1000-2000 - delta (B)
    4093                 :             // 2000      - image (C)
    4094                 :             // 2000-3000 - delta (D)
    4095                 :             // 3000      - image (E)
    4096                 :             //
    4097                 :             // If GC horizon is at 2500, we can remove layers A and B, but
    4098                 :             // we cannot remove C, even though it's older than 2500, because
    4099                 :             // the delta layer 2000-3000 depends on it.
    4100           14533 :             if !layers
    4101           14533 :                 .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))?
    4102                 :             {
    4103 UBC           0 :                 debug!("keeping {} because it is the latest layer", l.filename());
    4104                 :                 // Collect delta key ranges that need image layers to allow garbage
    4105                 :                 // collecting the layers.
    4106                 :                 // It is not so obvious whether we need to propagate information only about
    4107                 :                 // delta layers. Image layers can form "stairs" preventing old image from been deleted.
    4108                 :                 // But image layers are in any case less sparse than delta layers. Also we need some
    4109                 :                 // protection from replacing recent image layers with new one after each GC iteration.
    4110 CBC       13131 :                 if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
    4111 UBC           0 :                     wanted_image_layers.add_range(l.get_key_range());
    4112 CBC       13131 :                 }
    4113           13131 :                 result.layers_not_updated += 1;
    4114           13131 :                 continue 'outer;
    4115            1402 :             }
    4116                 : 
    4117                 :             // We didn't find any reason to keep this file, so remove it.
    4118 UBC           0 :             debug!(
    4119               0 :                 "garbage collecting {} is_dropped: xx is_incremental: {}",
    4120               0 :                 l.filename(),
    4121               0 :                 l.is_incremental(),
    4122               0 :             );
    4123 CBC        1402 :             layers_to_remove.push(l);
    4124                 :         }
    4125             542 :         self.wanted_image_layers
    4126             542 :             .lock()
    4127             542 :             .unwrap()
    4128             542 :             .replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
    4129             542 : 
    4130             542 :         if !layers_to_remove.is_empty() {
    4131                 :             // Persist the new GC cutoff value in the metadata file, before
    4132                 :             // we actually remove anything.
    4133                 :             //
    4134                 :             // This does not in fact have any effect as we no longer consider local metadata unless
    4135                 :             // running without remote storage.
    4136                 :             //
    4137                 :             // This unconditionally schedules also an index_part.json update, even though, we will
    4138                 :             // be doing one a bit later with the unlinked gc'd layers.
    4139                 :             //
    4140                 :             // TODO: remove when implementing <https://github.com/neondatabase/neon/issues/4099>.
    4141              24 :             self.update_metadata_file(self.disk_consistent_lsn.load(), None)
    4142 UBC           0 :                 .await?;
    4143                 : 
    4144 CBC          24 :             let gc_layers = layers_to_remove
    4145              24 :                 .iter()
    4146            1402 :                 .map(|x| guard.get_from_desc(x))
    4147              24 :                 .collect::<Vec<Layer>>();
    4148              24 : 
    4149              24 :             result.layers_removed = gc_layers.len() as u64;
    4150                 : 
    4151              24 :             if let Some(remote_client) = self.remote_client.as_ref() {
    4152              24 :                 remote_client.schedule_gc_update(&gc_layers)?;
    4153 UBC           0 :             }
    4154                 : 
    4155 CBC          24 :             guard.finish_gc_timeline(&gc_layers);
    4156              24 : 
    4157              24 :             if result.layers_removed != 0 {
    4158              24 :                 fail_point!("after-timeline-gc-removed-layers");
    4159              24 :             }
    4160                 : 
    4161                 :             #[cfg(feature = "testing")]
    4162              24 :             {
    4163              24 :                 result.doomed_layers = gc_layers;
    4164              24 :             }
    4165             518 :         }
    4166                 : 
    4167             537 :         info!(
    4168             537 :             "GC completed removing {} layers, cutoff {}",
    4169             537 :             result.layers_removed, new_gc_cutoff
    4170             537 :         );
    4171                 : 
    4172             537 :         result.elapsed = now.elapsed()?;
    4173             537 :         Ok(result)
    4174             641 :     }
    4175                 : 
    4176                 :     /// Reconstruct a value, using the given base image and WAL records in 'data'.
    4177         5861458 :     async fn reconstruct_value(
    4178         5861458 :         &self,
    4179         5861458 :         key: Key,
    4180         5861458 :         request_lsn: Lsn,
    4181         5861458 :         mut data: ValueReconstructState,
    4182         5861458 :     ) -> Result<Bytes, PageReconstructError> {
    4183         5861444 :         // Perform WAL redo if needed
    4184         5861444 :         data.records.reverse();
    4185         5861444 : 
    4186         5861444 :         // If we have a page image, and no WAL, we're all set
    4187         5861444 :         if data.records.is_empty() {
    4188         3801226 :             if let Some((img_lsn, img)) = &data.img {
    4189 UBC           0 :                 trace!(
    4190               0 :                     "found page image for key {} at {}, no WAL redo required, req LSN {}",
    4191               0 :                     key,
    4192               0 :                     img_lsn,
    4193               0 :                     request_lsn,
    4194               0 :                 );
    4195 CBC     3801226 :                 Ok(img.clone())
    4196                 :             } else {
    4197 UBC           0 :                 Err(PageReconstructError::from(anyhow!(
    4198               0 :                     "base image for {key} at {request_lsn} not found"
    4199               0 :                 )))
    4200                 :             }
    4201                 :         } else {
    4202                 :             // We need to do WAL redo.
    4203                 :             //
    4204                 :             // If we don't have a base image, then the oldest WAL record better initialize
    4205                 :             // the page
    4206 CBC     2060218 :             if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
    4207 UBC           0 :                 Err(PageReconstructError::from(anyhow!(
    4208               0 :                     "Base image for {} at {} not found, but got {} WAL records",
    4209               0 :                     key,
    4210               0 :                     request_lsn,
    4211               0 :                     data.records.len()
    4212               0 :                 )))
    4213                 :             } else {
    4214 CBC     2060218 :                 if data.img.is_some() {
    4215 UBC           0 :                     trace!(
    4216               0 :                         "found {} WAL records and a base image for {} at {}, performing WAL redo",
    4217               0 :                         data.records.len(),
    4218               0 :                         key,
    4219               0 :                         request_lsn
    4220               0 :                     );
    4221                 :                 } else {
    4222               0 :                     trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
    4223                 :                 };
    4224                 : 
    4225 CBC     2060218 :                 let last_rec_lsn = data.records.last().unwrap().0;
    4226                 : 
    4227         2060218 :                 let img = match self
    4228         2060218 :                     .walredo_mgr
    4229         2060218 :                     .request_redo(key, request_lsn, data.img, data.records, self.pg_version)
    4230 UBC           0 :                     .await
    4231 CBC     2060218 :                     .context("Failed to reconstruct a page image:")
    4232                 :                 {
    4233         2060218 :                     Ok(img) => img,
    4234 UBC           0 :                     Err(e) => return Err(PageReconstructError::WalRedo(e)),
    4235                 :                 };
    4236                 : 
    4237 CBC     2060218 :                 if img.len() == page_cache::PAGE_SZ {
    4238         2056879 :                     let cache = page_cache::get();
    4239         2056879 :                     if let Err(e) = cache
    4240         2056879 :                         .memorize_materialized_page(
    4241         2056879 :                             self.tenant_shard_id,
    4242         2056879 :                             self.timeline_id,
    4243         2056879 :                             key,
    4244         2056879 :                             last_rec_lsn,
    4245         2056879 :                             &img,
    4246         2056879 :                         )
    4247            9032 :                         .await
    4248         2056879 :                         .context("Materialized page memoization failed")
    4249                 :                     {
    4250 UBC           0 :                         return Err(PageReconstructError::from(e));
    4251 CBC     2056879 :                     }
    4252            3339 :                 }
    4253                 : 
    4254         2060218 :                 Ok(img)
    4255                 :             }
    4256                 :         }
    4257         5861444 :     }
    4258                 : 
    4259               2 :     pub(crate) async fn spawn_download_all_remote_layers(
    4260               2 :         self: Arc<Self>,
    4261               2 :         request: DownloadRemoteLayersTaskSpawnRequest,
    4262               2 :     ) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
    4263               2 :         use pageserver_api::models::DownloadRemoteLayersTaskState;
    4264               2 : 
    4265               2 :         // this is not really needed anymore; it has tests which really check the return value from
    4266               2 :         // http api. it would be better not to maintain this anymore.
    4267               2 : 
    4268               2 :         let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
    4269               2 :         if let Some(st) = &*status_guard {
    4270               1 :             match &st.state {
    4271                 :                 DownloadRemoteLayersTaskState::Running => {
    4272 UBC           0 :                     return Err(st.clone());
    4273                 :                 }
    4274                 :                 DownloadRemoteLayersTaskState::ShutDown
    4275 CBC           1 :                 | DownloadRemoteLayersTaskState::Completed => {
    4276               1 :                     *status_guard = None;
    4277               1 :                 }
    4278                 :             }
    4279               1 :         }
    4280                 : 
    4281               2 :         let self_clone = Arc::clone(&self);
    4282               2 :         let task_id = task_mgr::spawn(
    4283               2 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    4284               2 :             task_mgr::TaskKind::DownloadAllRemoteLayers,
    4285               2 :             Some(self.tenant_shard_id),
    4286               2 :             Some(self.timeline_id),
    4287               2 :             "download all remote layers task",
    4288                 :             false,
    4289               2 :             async move {
    4290              10 :                 self_clone.download_all_remote_layers(request).await;
    4291               2 :                 let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
    4292               2 :                  match &mut *status_guard {
    4293                 :                     None => {
    4294 UBC           0 :                         warn!("tasks status is supposed to be Some(), since we are running");
    4295                 :                     }
    4296 CBC           2 :                     Some(st) => {
    4297               2 :                         let exp_task_id = format!("{}", task_mgr::current_task_id().unwrap());
    4298               2 :                         if st.task_id != exp_task_id {
    4299 UBC           0 :                             warn!("task id changed while we were still running, expecting {} but have {}", exp_task_id, st.task_id);
    4300 CBC           2 :                         } else {
    4301               2 :                             st.state = DownloadRemoteLayersTaskState::Completed;
    4302               2 :                         }
    4303                 :                     }
    4304                 :                 };
    4305               2 :                 Ok(())
    4306               2 :             }
    4307               2 :             .instrument(info_span!(parent: None, "download_all_remote_layers", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
    4308                 :         );
    4309                 : 
    4310               2 :         let initial_info = DownloadRemoteLayersTaskInfo {
    4311               2 :             task_id: format!("{task_id}"),
    4312               2 :             state: DownloadRemoteLayersTaskState::Running,
    4313               2 :             total_layer_count: 0,
    4314               2 :             successful_download_count: 0,
    4315               2 :             failed_download_count: 0,
    4316               2 :         };
    4317               2 :         *status_guard = Some(initial_info.clone());
    4318               2 : 
    4319               2 :         Ok(initial_info)
    4320               2 :     }
    4321                 : 
    4322               2 :     async fn download_all_remote_layers(
    4323               2 :         self: &Arc<Self>,
    4324               2 :         request: DownloadRemoteLayersTaskSpawnRequest,
    4325               2 :     ) {
    4326                 :         use pageserver_api::models::DownloadRemoteLayersTaskState;
    4327                 : 
    4328               2 :         let remaining = {
    4329               2 :             let guard = self.layers.read().await;
    4330               2 :             guard
    4331               2 :                 .layer_map()
    4332               2 :                 .iter_historic_layers()
    4333              10 :                 .map(|desc| guard.get_from_desc(&desc))
    4334               2 :                 .collect::<Vec<_>>()
    4335               2 :         };
    4336               2 :         let total_layer_count = remaining.len();
    4337               2 : 
    4338               2 :         macro_rules! lock_status {
    4339               2 :             ($st:ident) => {
    4340               2 :                 let mut st = self.download_all_remote_layers_task_info.write().unwrap();
    4341               2 :                 let st = st
    4342               2 :                     .as_mut()
    4343               2 :                     .expect("this function is only called after the task has been spawned");
    4344               2 :                 assert_eq!(
    4345               2 :                     st.task_id,
    4346               2 :                     format!(
    4347               2 :                         "{}",
    4348               2 :                         task_mgr::current_task_id().expect("we run inside a task_mgr task")
    4349               2 :                     )
    4350               2 :                 );
    4351               2 :                 let $st = st;
    4352               2 :             };
    4353               2 :         }
    4354               2 : 
    4355               2 :         {
    4356               2 :             lock_status!(st);
    4357               2 :             st.total_layer_count = total_layer_count as u64;
    4358               2 :         }
    4359               2 : 
    4360               2 :         let mut remaining = remaining.into_iter();
    4361               2 :         let mut have_remaining = true;
    4362               2 :         let mut js = tokio::task::JoinSet::new();
    4363               2 : 
    4364               2 :         let cancel = task_mgr::shutdown_token();
    4365               2 : 
    4366               2 :         let limit = request.max_concurrent_downloads;
    4367                 : 
    4368                 :         loop {
    4369              12 :             while js.len() < limit.get() && have_remaining && !cancel.is_cancelled() {
    4370              12 :                 let Some(next) = remaining.next() else {
    4371               2 :                     have_remaining = false;
    4372               2 :                     break;
    4373                 :                 };
    4374                 : 
    4375              10 :                 let span = tracing::info_span!("download", layer = %next);
    4376                 : 
    4377              10 :                 js.spawn(
    4378              10 :                     async move {
    4379              26 :                         let res = next.download().await;
    4380              10 :                         (next, res)
    4381              10 :                     }
    4382              10 :                     .instrument(span),
    4383              10 :                 );
    4384                 :             }
    4385                 : 
    4386              12 :             while let Some(res) = js.join_next().await {
    4387              10 :                 match res {
    4388                 :                     Ok((_, Ok(_))) => {
    4389               5 :                         lock_status!(st);
    4390               5 :                         st.successful_download_count += 1;
    4391                 :                     }
    4392               5 :                     Ok((layer, Err(e))) => {
    4393               5 :                         tracing::error!(%layer, "download failed: {e:#}");
    4394               5 :                         lock_status!(st);
    4395               5 :                         st.failed_download_count += 1;
    4396                 :                     }
    4397 UBC           0 :                     Err(je) if je.is_cancelled() => unreachable!("not used here"),
    4398               0 :                     Err(je) if je.is_panic() => {
    4399               0 :                         lock_status!(st);
    4400               0 :                         st.failed_download_count += 1;
    4401                 :                     }
    4402               0 :                     Err(je) => tracing::warn!("unknown joinerror: {je:?}"),
    4403                 :                 }
    4404                 :             }
    4405                 : 
    4406 CBC           2 :             if js.is_empty() && (!have_remaining || cancel.is_cancelled()) {
    4407               2 :                 break;
    4408 UBC           0 :             }
    4409                 :         }
    4410                 : 
    4411                 :         {
    4412 CBC           2 :             lock_status!(st);
    4413               2 :             st.state = DownloadRemoteLayersTaskState::Completed;
    4414               2 :         }
    4415               2 :     }
    4416                 : 
    4417              57 :     pub fn get_download_all_remote_layers_task_info(&self) -> Option<DownloadRemoteLayersTaskInfo> {
    4418              57 :         self.download_all_remote_layers_task_info
    4419              57 :             .read()
    4420              57 :             .unwrap()
    4421              57 :             .clone()
    4422              57 :     }
    4423                 : }
    4424                 : 
    4425                 : pub(crate) struct DiskUsageEvictionInfo {
    4426                 :     /// Timeline's largest layer (remote or resident)
    4427                 :     pub max_layer_size: Option<u64>,
    4428                 :     /// Timeline's resident layers
    4429                 :     pub resident_layers: Vec<LocalLayerInfoForDiskUsageEviction>,
    4430                 : }
    4431                 : 
    4432                 : pub(crate) struct LocalLayerInfoForDiskUsageEviction {
    4433                 :     pub layer: Layer,
    4434                 :     pub last_activity_ts: SystemTime,
    4435                 : }
    4436                 : 
    4437                 : impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction {
    4438 UBC           0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    4439               0 :         // format the tv_sec, tv_nsec into rfc3339 in case someone is looking at it
    4440               0 :         // having to allocate a string to this is bad, but it will rarely be formatted
    4441               0 :         let ts = chrono::DateTime::<chrono::Utc>::from(self.last_activity_ts);
    4442               0 :         let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
    4443               0 :         struct DisplayIsDebug<'a, T>(&'a T);
    4444               0 :         impl<'a, T: std::fmt::Display> std::fmt::Debug for DisplayIsDebug<'a, T> {
    4445               0 :             fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    4446               0 :                 write!(f, "{}", self.0)
    4447               0 :             }
    4448               0 :         }
    4449               0 :         f.debug_struct("LocalLayerInfoForDiskUsageEviction")
    4450               0 :             .field("layer", &DisplayIsDebug(&self.layer))
    4451               0 :             .field("last_activity", &ts)
    4452               0 :             .finish()
    4453               0 :     }
    4454                 : }
    4455                 : 
    4456                 : impl LocalLayerInfoForDiskUsageEviction {
    4457 CBC         440 :     pub fn file_size(&self) -> u64 {
    4458             440 :         self.layer.layer_desc().file_size
    4459             440 :     }
    4460                 : }
    4461                 : 
    4462                 : impl Timeline {
    4463                 :     /// Returns non-remote layers for eviction.
    4464              29 :     pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
    4465              29 :         let guard = self.layers.read().await;
    4466              29 :         let layers = guard.layer_map();
    4467              29 : 
    4468              29 :         let mut max_layer_size: Option<u64> = None;
    4469              29 :         let mut resident_layers = Vec::new();
    4470                 : 
    4471            2894 :         for l in layers.iter_historic_layers() {
    4472            2894 :             let file_size = l.file_size();
    4473            2894 :             max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
    4474            2894 : 
    4475            2894 :             let l = guard.get_from_desc(&l);
    4476                 : 
    4477            2894 :             let l = match l.keep_resident().await {
    4478            2893 :                 Ok(Some(l)) => l,
    4479               1 :                 Ok(None) => continue,
    4480 UBC           0 :                 Err(e) => {
    4481                 :                     // these should not happen, but we cannot make them statically impossible right
    4482                 :                     // now.
    4483               0 :                     tracing::warn!(layer=%l, "failed to keep the layer resident: {e:#}");
    4484               0 :                     continue;
    4485                 :                 }
    4486                 :             };
    4487                 : 
    4488 CBC        2893 :             let last_activity_ts = l.access_stats().latest_activity().unwrap_or_else(|| {
    4489 UBC           0 :                 // We only use this fallback if there's an implementation error.
    4490               0 :                 // `latest_activity` already does rate-limited warn!() log.
    4491               0 :                 debug!(layer=%l, "last_activity returns None, using SystemTime::now");
    4492               0 :                 SystemTime::now()
    4493 CBC        2893 :             });
    4494            2893 : 
    4495            2893 :             resident_layers.push(LocalLayerInfoForDiskUsageEviction {
    4496            2893 :                 layer: l.drop_eviction_guard(),
    4497            2893 :                 last_activity_ts,
    4498            2893 :             });
    4499                 :         }
    4500                 : 
    4501              29 :         DiskUsageEvictionInfo {
    4502              29 :             max_layer_size,
    4503              29 :             resident_layers,
    4504              29 :         }
    4505              29 :     }
    4506                 : 
    4507           20805 :     pub(crate) fn get_shard_index(&self) -> ShardIndex {
    4508           20805 :         ShardIndex {
    4509           20805 :             shard_number: self.tenant_shard_id.shard_number,
    4510           20805 :             shard_count: self.tenant_shard_id.shard_count,
    4511           20805 :         }
    4512           20805 :     }
    4513                 : }
    4514                 : 
    4515                 : type TraversalPathItem = (
    4516                 :     ValueReconstructResult,
    4517                 :     Lsn,
    4518                 :     Box<dyn Send + FnOnce() -> TraversalId>,
    4519                 : );
    4520                 : 
    4521                 : /// Helper function for get_reconstruct_data() to add the path of layers traversed
    4522                 : /// to an error, as anyhow context information.
    4523            1149 : fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructError {
    4524            1149 :     // We want the original 'msg' to be the outermost context. The outermost context
    4525            1149 :     // is the most high-level information, which also gets propagated to the client.
    4526            1149 :     let mut msg_iter = path
    4527            1149 :         .into_iter()
    4528            1217 :         .map(|(r, c, l)| {
    4529            1217 :             format!(
    4530            1217 :                 "layer traversal: result {:?}, cont_lsn {}, layer: {}",
    4531            1217 :                 r,
    4532            1217 :                 c,
    4533            1217 :                 l(),
    4534            1217 :             )
    4535            1217 :         })
    4536            1149 :         .chain(std::iter::once(msg));
    4537            1149 :     // Construct initial message from the first traversed layer
    4538            1149 :     let err = anyhow!(msg_iter.next().unwrap());
    4539            1149 : 
    4540            1149 :     // Append all subsequent traversals, and the error message 'msg', as contexts.
    4541            1217 :     let msg = msg_iter.fold(err, |err, msg| err.context(msg));
    4542            1149 :     PageReconstructError::from(msg)
    4543            1149 : }
    4544                 : 
    4545                 : /// Various functions to mutate the timeline.
    4546                 : // TODO Currently, Deref is used to allow easy access to read methods from this trait.
    4547                 : // This is probably considered a bad practice in Rust and should be fixed eventually,
    4548                 : // but will cause large code changes.
    4549                 : pub struct TimelineWriter<'a> {
    4550                 :     tl: &'a Timeline,
    4551                 :     _write_guard: tokio::sync::MutexGuard<'a, ()>,
    4552                 : }
    4553                 : 
    4554                 : impl Deref for TimelineWriter<'_> {
    4555                 :     type Target = Timeline;
    4556                 : 
    4557 UBC           0 :     fn deref(&self) -> &Self::Target {
    4558               0 :         self.tl
    4559               0 :     }
    4560                 : }
    4561                 : 
    4562                 : impl<'a> TimelineWriter<'a> {
    4563                 :     /// Put a new page version that can be constructed from a WAL record
    4564                 :     ///
    4565                 :     /// This will implicitly extend the relation, if the page is beyond the
    4566                 :     /// current end-of-file.
    4567 CBC      607051 :     pub async fn put(
    4568          607051 :         &self,
    4569          607051 :         key: Key,
    4570          607051 :         lsn: Lsn,
    4571          607051 :         value: &Value,
    4572          607051 :         ctx: &RequestContext,
    4573          607051 :     ) -> anyhow::Result<()> {
    4574          607051 :         self.tl.put_value(key, lsn, value, ctx).await
    4575          607051 :     }
    4576                 : 
    4577         1168370 :     pub(crate) async fn put_batch(
    4578         1168370 :         &self,
    4579         1168370 :         batch: &HashMap<Key, Vec<(Lsn, Value)>>,
    4580         1168370 :         ctx: &RequestContext,
    4581         1168370 :     ) -> anyhow::Result<()> {
    4582         1168369 :         self.tl.put_values(batch, ctx).await
    4583         1168369 :     }
    4584                 : 
    4585            6569 :     pub(crate) async fn delete_batch(&self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
    4586            6569 :         self.tl.put_tombstones(batch).await
    4587            6569 :     }
    4588                 : 
    4589                 :     /// Track the end of the latest digested WAL record.
    4590                 :     /// Remember the (end of) last valid WAL record remembered in the timeline.
    4591                 :     ///
    4592                 :     /// Call this after you have finished writing all the WAL up to 'lsn'.
    4593                 :     ///
    4594                 :     /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for
    4595                 :     /// the 'lsn' or anything older. The previous last record LSN is stored alongside
    4596                 :     /// the latest and can be read.
    4597        49362591 :     pub(crate) fn finish_write(&self, new_lsn: Lsn) {
    4598        49362591 :         self.tl.finish_write(new_lsn);
    4599        49362591 :     }
    4600                 : 
    4601          427894 :     pub(crate) fn update_current_logical_size(&self, delta: i64) {
    4602          427894 :         self.tl.update_current_logical_size(delta)
    4603          427894 :     }
    4604                 : }
    4605                 : 
    4606                 : // We need TimelineWriter to be send in upcoming conversion of
    4607                 : // Timeline::layers to tokio::sync::RwLock.
    4608               1 : #[test]
    4609               1 : fn is_send() {
    4610               1 :     fn _assert_send<T: Send>() {}
    4611               1 :     _assert_send::<TimelineWriter<'_>>();
    4612               1 : }
    4613                 : 
    4614                 : /// Add a suffix to a layer file's name: .{num}.old
    4615                 : /// Uses the first available num (starts at 0)
    4616               1 : fn rename_to_backup(path: &Utf8Path) -> anyhow::Result<()> {
    4617               1 :     let filename = path
    4618               1 :         .file_name()
    4619               1 :         .ok_or_else(|| anyhow!("Path {path} don't have a file name"))?;
    4620               1 :     let mut new_path = path.to_owned();
    4621                 : 
    4622               1 :     for i in 0u32.. {
    4623               1 :         new_path.set_file_name(format!("{filename}.{i}.old"));
    4624               1 :         if !new_path.exists() {
    4625               1 :             std::fs::rename(path, &new_path)
    4626               1 :                 .with_context(|| format!("rename {path:?} to {new_path:?}"))?;
    4627               1 :             return Ok(());
    4628 UBC           0 :         }
    4629                 :     }
    4630                 : 
    4631               0 :     bail!("couldn't find an unused backup number for {:?}", path)
    4632 CBC           1 : }
    4633                 : 
    4634                 : #[cfg(test)]
    4635                 : mod tests {
    4636                 :     use utils::{id::TimelineId, lsn::Lsn};
    4637                 : 
    4638                 :     use crate::tenant::{
    4639                 :         harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline,
    4640                 :     };
    4641                 : 
    4642               1 :     #[tokio::test]
    4643               1 :     async fn two_layer_eviction_attempts_at_the_same_time() {
    4644               1 :         let harness =
    4645               1 :             TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
    4646               1 : 
    4647               1 :         let ctx = any_context();
    4648               1 :         let tenant = harness.try_load(&ctx).await.unwrap();
    4649               1 :         let timeline = tenant
    4650               1 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
    4651               1 :             .await
    4652               1 :             .unwrap();
    4653               1 : 
    4654               1 :         let rtc = timeline
    4655               1 :             .remote_client
    4656               1 :             .clone()
    4657               1 :             .expect("just configured this");
    4658                 : 
    4659               1 :         let layer = find_some_layer(&timeline).await;
    4660               1 :         let layer = layer
    4661               1 :             .keep_resident()
    4662 UBC           0 :             .await
    4663 CBC           1 :             .expect("no download => no downloading errors")
    4664               1 :             .expect("should had been resident")
    4665               1 :             .drop_eviction_guard();
    4666               1 : 
    4667               1 :         let first = async { layer.evict_and_wait(&rtc).await };
    4668               1 :         let second = async { layer.evict_and_wait(&rtc).await };
    4669                 : 
    4670               1 :         let (first, second) = tokio::join!(first, second);
    4671                 : 
    4672               1 :         let res = layer.keep_resident().await;
    4673               1 :         assert!(matches!(res, Ok(None)), "{res:?}");
    4674                 : 
    4675               1 :         match (first, second) {
    4676 UBC           0 :             (Ok(()), Ok(())) => {
    4677               0 :                 // because there are no more timeline locks being taken on eviction path, we can
    4678               0 :                 // witness all three outcomes here.
    4679               0 :             }
    4680 CBC           1 :             (Ok(()), Err(EvictionError::NotFound)) | (Err(EvictionError::NotFound), Ok(())) => {
    4681               1 :                 // if one completes before the other, this is fine just as well.
    4682               1 :             }
    4683 UBC           0 :             other => unreachable!("unexpected {:?}", other),
    4684                 :         }
    4685                 :     }
    4686                 : 
    4687 CBC           1 :     fn any_context() -> crate::context::RequestContext {
    4688               1 :         use crate::context::*;
    4689               1 :         use crate::task_mgr::*;
    4690               1 :         RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
    4691               1 :     }
    4692                 : 
    4693               1 :     async fn find_some_layer(timeline: &Timeline) -> Layer {
    4694               1 :         let layers = timeline.layers.read().await;
    4695               1 :         let desc = layers
    4696               1 :             .layer_map()
    4697               1 :             .iter_historic_layers()
    4698               1 :             .next()
    4699               1 :             .expect("must find one layer to evict");
    4700               1 : 
    4701               1 :         layers.get_from_desc(&desc)
    4702               1 :     }
    4703                 : }
        

Generated by: LCOV version 2.1-beta