LCOV - code coverage report
Current view: top level - pageserver/src/tenant - timeline.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 89.3 % 2844 2541
Test Date: 2024-02-12 20:26:03 Functions: 66.6 % 407 271

            Line data    Source code
       1              : pub mod delete;
       2              : mod eviction_task;
       3              : mod init;
       4              : pub mod layer_manager;
       5              : pub(crate) mod logical_size;
       6              : pub mod span;
       7              : pub mod uninit;
       8              : mod walreceiver;
       9              : 
      10              : use anyhow::{anyhow, bail, ensure, Context, Result};
      11              : use bytes::Bytes;
      12              : use camino::{Utf8Path, Utf8PathBuf};
      13              : use enumset::EnumSet;
      14              : use fail::fail_point;
      15              : use futures::stream::StreamExt;
      16              : use itertools::Itertools;
      17              : use pageserver_api::{
      18              :     keyspace::{key_range_size, KeySpaceAccum},
      19              :     models::{
      20              :         DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
      21              :         LayerMapInfo, TimelineState,
      22              :     },
      23              :     reltag::BlockNumber,
      24              :     shard::{ShardIdentity, TenantShardId},
      25              : };
      26              : use rand::Rng;
      27              : use serde_with::serde_as;
      28              : use storage_broker::BrokerClientChannel;
      29              : use tokio::{
      30              :     runtime::Handle,
      31              :     sync::{oneshot, watch},
      32              : };
      33              : use tokio_util::sync::CancellationToken;
      34              : use tracing::*;
      35              : use utils::sync::gate::Gate;
      36              : 
      37              : use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet};
      38              : use std::ops::{Deref, Range};
      39              : use std::pin::pin;
      40              : use std::sync::atomic::Ordering as AtomicOrdering;
      41              : use std::sync::{Arc, Mutex, RwLock, Weak};
      42              : use std::time::{Duration, Instant, SystemTime};
      43              : use std::{
      44              :     cmp::{max, min, Ordering},
      45              :     ops::ControlFlow,
      46              : };
      47              : 
      48              : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
      49              : use crate::tenant::{
      50              :     layer_map::{LayerMap, SearchResult},
      51              :     metadata::{save_metadata, TimelineMetadata},
      52              :     par_fsync,
      53              : };
      54              : use crate::{
      55              :     context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder},
      56              :     disk_usage_eviction_task::DiskUsageEvictionInfo,
      57              : };
      58              : use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
      59              : use crate::{
      60              :     disk_usage_eviction_task::finite_f32,
      61              :     tenant::storage_layer::{
      62              :         AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
      63              :         LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
      64              :         ValueReconstructState,
      65              :     },
      66              : };
      67              : use crate::{
      68              :     disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
      69              : };
      70              : use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
      71              : 
      72              : use crate::config::PageServerConf;
      73              : use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
      74              : use crate::metrics::{
      75              :     TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
      76              : };
      77              : use crate::pgdatadir_mapping::CalculateLogicalSizeError;
      78              : use crate::tenant::config::TenantConfOpt;
      79              : use pageserver_api::key::{is_inherited_key, is_rel_fsm_block_key, is_rel_vm_block_key};
      80              : use pageserver_api::reltag::RelTag;
      81              : use pageserver_api::shard::ShardIndex;
      82              : 
      83              : use postgres_connection::PgConnectionConfig;
      84              : use postgres_ffi::to_pg_timestamp;
      85              : use utils::{
      86              :     completion,
      87              :     generation::Generation,
      88              :     id::TimelineId,
      89              :     lsn::{AtomicLsn, Lsn, RecordLsn},
      90              :     seqwait::SeqWait,
      91              :     simple_rcu::{Rcu, RcuReadGuard},
      92              : };
      93              : 
      94              : use crate::page_cache;
      95              : use crate::repository::GcResult;
      96              : use crate::repository::{Key, Value};
      97              : use crate::task_mgr;
      98              : use crate::task_mgr::TaskKind;
      99              : use crate::ZERO_PAGE;
     100              : 
     101              : use self::delete::DeleteTimelineFlow;
     102              : pub(super) use self::eviction_task::EvictionTaskTenantState;
     103              : use self::eviction_task::EvictionTaskTimelineState;
     104              : use self::layer_manager::LayerManager;
     105              : use self::logical_size::LogicalSize;
     106              : use self::walreceiver::{WalReceiver, WalReceiverConf};
     107              : 
     108              : use super::config::TenantConf;
     109              : use super::remote_timeline_client::index::IndexPart;
     110              : use super::remote_timeline_client::RemoteTimelineClient;
     111              : use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
     112              : use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
     113              : 
     114           45 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
     115              : pub(super) enum FlushLoopState {
     116              :     NotStarted,
     117              :     Running {
     118              :         #[cfg(test)]
     119              :         expect_initdb_optimization: bool,
     120              :         #[cfg(test)]
     121              :         initdb_optimization_count: usize,
     122              :     },
     123              :     Exited,
     124              : }
     125              : 
     126              : /// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
     127            0 : #[derive(Debug, Clone, PartialEq, Eq)]
     128              : pub(crate) struct Hole {
     129              :     key_range: Range<Key>,
     130              :     coverage_size: usize,
     131              : }
     132              : 
     133              : impl Ord for Hole {
     134          274 :     fn cmp(&self, other: &Self) -> Ordering {
     135          274 :         other.coverage_size.cmp(&self.coverage_size) // inverse order
     136          274 :     }
     137              : }
     138              : 
     139              : impl PartialOrd for Hole {
     140          274 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     141          274 :         Some(self.cmp(other))
     142          274 :     }
     143              : }
     144              : 
     145              : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
     146              : /// Can be removed after all refactors are done.
     147          295 : fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
     148          295 :     drop(rlock)
     149          295 : }
     150              : 
     151              : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
     152              : /// Can be removed after all refactors are done.
     153         2016 : fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
     154         2016 :     drop(rlock)
     155         2016 : }
     156              : 
     157              : /// The outward-facing resources required to build a Timeline
     158              : pub struct TimelineResources {
     159              :     pub remote_client: Option<RemoteTimelineClient>,
     160              :     pub deletion_queue_client: DeletionQueueClient,
     161              : }
     162              : 
     163              : pub struct Timeline {
     164              :     conf: &'static PageServerConf,
     165              :     tenant_conf: Arc<RwLock<AttachedTenantConf>>,
     166              : 
     167              :     myself: Weak<Self>,
     168              : 
     169              :     pub(crate) tenant_shard_id: TenantShardId,
     170              :     pub timeline_id: TimelineId,
     171              : 
     172              :     /// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
     173              :     /// Never changes for the lifetime of this [`Timeline`] object.
     174              :     ///
     175              :     /// This duplicates the generation stored in LocationConf, but that structure is mutable:
     176              :     /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
     177              :     pub(crate) generation: Generation,
     178              : 
     179              :     /// The detailed sharding information from our parent Tenant.  This enables us to map keys
     180              :     /// to shards, and is constant through the lifetime of this Timeline.
     181              :     shard_identity: ShardIdentity,
     182              : 
     183              :     pub pg_version: u32,
     184              : 
     185              :     /// The tuple has two elements.
     186              :     /// 1. `LayerFileManager` keeps track of the various physical representations of the layer files (inmem, local, remote).
     187              :     /// 2. `LayerMap`, the acceleration data structure for `get_reconstruct_data`.
     188              :     ///
     189              :     /// `LayerMap` maps out the `(PAGE,LSN) / (KEY,LSN)` space, which is composed of `(KeyRange, LsnRange)` rectangles.
     190              :     /// We describe these rectangles through the `PersistentLayerDesc` struct.
     191              :     ///
     192              :     /// When we want to reconstruct a page, we first find the `PersistentLayerDesc`'s that we need for page reconstruction,
     193              :     /// using `LayerMap`. Then, we use `LayerFileManager` to get the `PersistentLayer`'s that correspond to the
     194              :     /// `PersistentLayerDesc`'s.
     195              :     ///
     196              :     /// Hence, it's important to keep things coherent. The `LayerFileManager` must always have an entry for all
     197              :     /// `PersistentLayerDesc`'s in the `LayerMap`. If it doesn't, `LayerFileManager::get_from_desc` will panic at
     198              :     /// runtime, e.g., during page reconstruction.
     199              :     ///
     200              :     /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
     201              :     /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
     202              :     pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
     203              : 
     204              :     /// Set of key ranges which should be covered by image layers to
     205              :     /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
     206              :     /// It is used by compaction task when it checks if new image layer should be created.
     207              :     /// Newly created image layer doesn't help to remove the delta layer, until the
     208              :     /// newly created image layer falls off the PITR horizon. So on next GC cycle,
     209              :     /// gc_timeline may still want the new image layer to be created. To avoid redundant
     210              :     /// image layers creation we should check if image layer exists but beyond PITR horizon.
     211              :     /// This is why we need remember GC cutoff LSN.
     212              :     ///
     213              :     wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
     214              : 
     215              :     last_freeze_at: AtomicLsn,
     216              :     // Atomic would be more appropriate here.
     217              :     last_freeze_ts: RwLock<Instant>,
     218              : 
     219              :     // WAL redo manager. `None` only for broken tenants.
     220              :     walredo_mgr: Option<Arc<super::WalRedoManager>>,
     221              : 
     222              :     /// Remote storage client.
     223              :     /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
     224              :     pub remote_client: Option<Arc<RemoteTimelineClient>>,
     225              : 
     226              :     // What page versions do we hold in the repository? If we get a
     227              :     // request > last_record_lsn, we need to wait until we receive all
     228              :     // the WAL up to the request. The SeqWait provides functions for
     229              :     // that. TODO: If we get a request for an old LSN, such that the
     230              :     // versions have already been garbage collected away, we should
     231              :     // throw an error, but we don't track that currently.
     232              :     //
     233              :     // last_record_lsn.load().last points to the end of last processed WAL record.
     234              :     //
     235              :     // We also remember the starting point of the previous record in
     236              :     // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
     237              :     // first WAL record when the node is started up. But here, we just
     238              :     // keep track of it.
     239              :     last_record_lsn: SeqWait<RecordLsn, Lsn>,
     240              : 
     241              :     // All WAL records have been processed and stored durably on files on
     242              :     // local disk, up to this LSN. On crash and restart, we need to re-process
     243              :     // the WAL starting from this point.
     244              :     //
     245              :     // Some later WAL records might have been processed and also flushed to disk
     246              :     // already, so don't be surprised to see some, but there's no guarantee on
     247              :     // them yet.
     248              :     disk_consistent_lsn: AtomicLsn,
     249              : 
     250              :     // Parent timeline that this timeline was branched from, and the LSN
     251              :     // of the branch point.
     252              :     ancestor_timeline: Option<Arc<Timeline>>,
     253              :     ancestor_lsn: Lsn,
     254              : 
     255              :     pub(super) metrics: TimelineMetrics,
     256              : 
     257              :     // `Timeline` doesn't write these metrics itself, but it manages the lifetime.  Code
     258              :     // in `crate::page_service` writes these metrics.
     259              :     pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
     260              : 
     261              :     /// Ensures layers aren't frozen by checkpointer between
     262              :     /// [`Timeline::get_layer_for_write`] and layer reads.
     263              :     /// Locked automatically by [`TimelineWriter`] and checkpointer.
     264              :     /// Must always be acquired before the layer map/individual layer lock
     265              :     /// to avoid deadlock.
     266              :     write_lock: tokio::sync::Mutex<()>,
     267              : 
     268              :     /// Used to avoid multiple `flush_loop` tasks running
     269              :     pub(super) flush_loop_state: Mutex<FlushLoopState>,
     270              : 
     271              :     /// layer_flush_start_tx can be used to wake up the layer-flushing task.
     272              :     /// The value is a counter, incremented every time a new flush cycle is requested.
     273              :     /// The flush cycle counter is sent back on the layer_flush_done channel when
     274              :     /// the flush finishes. You can use that to wait for the flush to finish.
     275              :     layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
     276              :     /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
     277              :     layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
     278              : 
     279              :     // Needed to ensure that we can't create a branch at a point that was already garbage collected
     280              :     pub latest_gc_cutoff_lsn: Rcu<Lsn>,
     281              : 
     282              :     // List of child timelines and their branch points. This is needed to avoid
     283              :     // garbage collecting data that is still needed by the child timelines.
     284              :     pub gc_info: std::sync::RwLock<GcInfo>,
     285              : 
     286              :     // It may change across major versions so for simplicity
     287              :     // keep it after running initdb for a timeline.
     288              :     // It is needed in checks when we want to error on some operations
     289              :     // when they are requested for pre-initdb lsn.
     290              :     // It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn",
     291              :     // though let's keep them both for better error visibility.
     292              :     pub initdb_lsn: Lsn,
     293              : 
     294              :     /// When did we last calculate the partitioning?
     295              :     partitioning: Mutex<(KeyPartitioning, Lsn)>,
     296              : 
     297              :     /// Configuration: how often should the partitioning be recalculated.
     298              :     repartition_threshold: u64,
     299              : 
     300              :     /// Current logical size of the "datadir", at the last LSN.
     301              :     current_logical_size: LogicalSize,
     302              : 
     303              :     /// Information about the last processed message by the WAL receiver,
     304              :     /// or None if WAL receiver has not received anything for this timeline
     305              :     /// yet.
     306              :     pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
     307              :     pub walreceiver: Mutex<Option<WalReceiver>>,
     308              : 
     309              :     /// Relation size cache
     310              :     pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
     311              : 
     312              :     download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
     313              : 
     314              :     state: watch::Sender<TimelineState>,
     315              : 
     316              :     /// Prevent two tasks from deleting the timeline at the same time. If held, the
     317              :     /// timeline is being deleted. If 'true', the timeline has already been deleted.
     318              :     pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
     319              : 
     320              :     eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
     321              : 
     322              :     /// Load or creation time information about the disk_consistent_lsn and when the loading
     323              :     /// happened. Used for consumption metrics.
     324              :     pub(crate) loaded_at: (Lsn, SystemTime),
     325              : 
     326              :     /// Gate to prevent shutdown completing while I/O is still happening to this timeline's data
     327              :     pub(crate) gate: Gate,
     328              : 
     329              :     /// Cancellation token scoped to this timeline: anything doing long-running work relating
     330              :     /// to the timeline should drop out when this token fires.
     331              :     pub(crate) cancel: CancellationToken,
     332              : 
     333              :     /// Make sure we only have one running compaction at a time in tests.
     334              :     ///
     335              :     /// Must only be taken in two places:
     336              :     /// - [`Timeline::compact`] (this file)
     337              :     /// - [`delete::delete_local_layer_files`]
     338              :     ///
     339              :     /// Timeline deletion will acquire both compaction and gc locks in whatever order.
     340              :     compaction_lock: tokio::sync::Mutex<()>,
     341              : 
     342              :     /// Make sure we only have one running gc at a time.
     343              :     ///
     344              :     /// Must only be taken in two places:
     345              :     /// - [`Timeline::gc`] (this file)
     346              :     /// - [`delete::delete_local_layer_files`]
     347              :     ///
     348              :     /// Timeline deletion will acquire both compaction and gc locks in whatever order.
     349              :     gc_lock: tokio::sync::Mutex<()>,
     350              : }
     351              : 
     352              : pub struct WalReceiverInfo {
     353              :     pub wal_source_connconf: PgConnectionConfig,
     354              :     pub last_received_msg_lsn: Lsn,
     355              :     pub last_received_msg_ts: u128,
     356              : }
     357              : 
     358              : ///
     359              : /// Information about how much history needs to be retained, needed by
     360              : /// Garbage Collection.
     361              : ///
     362              : pub struct GcInfo {
     363              :     /// Specific LSNs that are needed.
     364              :     ///
     365              :     /// Currently, this includes all points where child branches have
     366              :     /// been forked off from. In the future, could also include
     367              :     /// explicit user-defined snapshot points.
     368              :     pub retain_lsns: Vec<Lsn>,
     369              : 
     370              :     /// In addition to 'retain_lsns', keep everything newer than this
     371              :     /// point.
     372              :     ///
     373              :     /// This is calculated by subtracting 'gc_horizon' setting from
     374              :     /// last-record LSN
     375              :     ///
     376              :     /// FIXME: is this inclusive or exclusive?
     377              :     pub horizon_cutoff: Lsn,
     378              : 
     379              :     /// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this
     380              :     /// point.
     381              :     ///
     382              :     /// This is calculated by finding a number such that a record is needed for PITR
     383              :     /// if only if its LSN is larger than 'pitr_cutoff'.
     384              :     pub pitr_cutoff: Lsn,
     385              : }
     386              : 
     387              : /// An error happened in a get() operation.
     388           76 : #[derive(thiserror::Error, Debug)]
     389              : pub(crate) enum PageReconstructError {
     390              :     #[error(transparent)]
     391              :     Other(#[from] anyhow::Error),
     392              : 
     393              :     #[error("Ancestor LSN wait error: {0}")]
     394              :     AncestorLsnTimeout(#[from] WaitLsnError),
     395              : 
     396              :     #[error("timeline shutting down")]
     397              :     Cancelled,
     398              : 
     399              :     /// The ancestor of this is being stopped
     400              :     #[error("ancestor timeline {0} is being stopped")]
     401              :     AncestorStopping(TimelineId),
     402              : 
     403              :     /// An error happened replaying WAL records
     404              :     #[error(transparent)]
     405              :     WalRedo(anyhow::Error),
     406              : }
     407              : 
     408              : impl PageReconstructError {
     409              :     /// Returns true if this error indicates a tenant/timeline shutdown alike situation
     410            0 :     pub(crate) fn is_stopping(&self) -> bool {
     411            0 :         use PageReconstructError::*;
     412            0 :         match self {
     413            0 :             Other(_) => false,
     414            0 :             AncestorLsnTimeout(_) => false,
     415            0 :             Cancelled | AncestorStopping(_) => true,
     416            0 :             WalRedo(_) => false,
     417              :         }
     418            0 :     }
     419              : }
     420              : 
     421            0 : #[derive(thiserror::Error, Debug)]
     422              : enum CreateImageLayersError {
     423              :     #[error("timeline shutting down")]
     424              :     Cancelled,
     425              : 
     426              :     #[error(transparent)]
     427              :     GetVectoredError(GetVectoredError),
     428              : 
     429              :     #[error(transparent)]
     430              :     PageReconstructError(PageReconstructError),
     431              : 
     432              :     #[error(transparent)]
     433              :     Other(#[from] anyhow::Error),
     434              : }
     435              : 
     436            0 : #[derive(thiserror::Error, Debug)]
     437              : enum FlushLayerError {
     438              :     /// Timeline cancellation token was cancelled
     439              :     #[error("timeline shutting down")]
     440              :     Cancelled,
     441              : 
     442              :     #[error(transparent)]
     443              :     CreateImageLayersError(CreateImageLayersError),
     444              : 
     445              :     #[error(transparent)]
     446              :     Other(#[from] anyhow::Error),
     447              : }
     448              : 
     449            0 : #[derive(thiserror::Error, Debug)]
     450              : pub(crate) enum GetVectoredError {
     451              :     #[error("timeline shutting down")]
     452              :     Cancelled,
     453              : 
     454              :     #[error("Requested too many keys: {0} > {}", Timeline::MAX_GET_VECTORED_KEYS)]
     455              :     Oversized(u64),
     456              : 
     457              :     #[error("Requested at invalid LSN: {0}")]
     458              :     InvalidLsn(Lsn),
     459              : }
     460              : 
     461            0 : #[derive(thiserror::Error, Debug)]
     462              : pub(crate) enum GetReadyAncestorError {
     463              :     #[error("ancestor timeline {0} is being stopped")]
     464              :     AncestorStopping(TimelineId),
     465              : 
     466              :     #[error("Ancestor LSN wait error: {0}")]
     467              :     AncestorLsnTimeout(#[from] WaitLsnError),
     468              : 
     469              :     #[error("Cancelled")]
     470              :     Cancelled,
     471              : 
     472              :     #[error(transparent)]
     473              :     Other(#[from] anyhow::Error),
     474              : }
     475              : 
     476            0 : #[derive(Clone, Copy)]
     477              : pub enum LogicalSizeCalculationCause {
     478              :     Initial,
     479              :     ConsumptionMetricsSyntheticSize,
     480              :     EvictionTaskImitation,
     481              :     TenantSizeHandler,
     482              : }
     483              : 
     484              : pub enum GetLogicalSizePriority {
     485              :     User,
     486              :     Background,
     487              : }
     488              : 
     489          850 : #[derive(enumset::EnumSetType)]
     490              : pub(crate) enum CompactFlags {
     491              :     ForceRepartition,
     492              : }
     493              : 
     494              : impl std::fmt::Debug for Timeline {
     495            0 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
     496            0 :         write!(f, "Timeline<{}>", self.timeline_id)
     497            0 :     }
     498              : }
     499              : 
     500           48 : #[derive(thiserror::Error, Debug)]
     501              : pub(crate) enum WaitLsnError {
     502              :     // Called on a timeline which is shutting down
     503              :     #[error("Shutdown")]
     504              :     Shutdown,
     505              : 
     506              :     // Called on an timeline not in active state or shutting down
     507              :     #[error("Bad state (not active)")]
     508              :     BadState,
     509              : 
     510              :     // Timeout expired while waiting for LSN to catch up with goal.
     511              :     #[error("{0}")]
     512              :     Timeout(String),
     513              : }
     514              : 
     515              : // The impls below achieve cancellation mapping for errors.
     516              : // Perhaps there's a way of achieving this with less cruft.
     517              : 
     518              : impl From<CreateImageLayersError> for CompactionError {
     519            0 :     fn from(e: CreateImageLayersError) -> Self {
     520            0 :         match e {
     521            0 :             CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
     522            0 :             _ => CompactionError::Other(e.into()),
     523              :         }
     524            0 :     }
     525              : }
     526              : 
     527              : impl From<CreateImageLayersError> for FlushLayerError {
     528            0 :     fn from(e: CreateImageLayersError) -> Self {
     529            0 :         match e {
     530            0 :             CreateImageLayersError::Cancelled => FlushLayerError::Cancelled,
     531            0 :             any => FlushLayerError::CreateImageLayersError(any),
     532              :         }
     533            0 :     }
     534              : }
     535              : 
     536              : impl From<PageReconstructError> for CreateImageLayersError {
     537            0 :     fn from(e: PageReconstructError) -> Self {
     538            0 :         match e {
     539            0 :             PageReconstructError::Cancelled => CreateImageLayersError::Cancelled,
     540            0 :             _ => CreateImageLayersError::PageReconstructError(e),
     541              :         }
     542            0 :     }
     543              : }
     544              : 
     545              : impl From<GetVectoredError> for CreateImageLayersError {
     546            0 :     fn from(e: GetVectoredError) -> Self {
     547            0 :         match e {
     548            0 :             GetVectoredError::Cancelled => CreateImageLayersError::Cancelled,
     549            0 :             _ => CreateImageLayersError::GetVectoredError(e),
     550              :         }
     551            0 :     }
     552              : }
     553              : 
     554              : impl From<GetReadyAncestorError> for PageReconstructError {
     555            4 :     fn from(e: GetReadyAncestorError) -> Self {
     556            4 :         use GetReadyAncestorError::*;
     557            4 :         match e {
     558            2 :             AncestorStopping(tid) => PageReconstructError::AncestorStopping(tid),
     559            0 :             AncestorLsnTimeout(wait_err) => PageReconstructError::AncestorLsnTimeout(wait_err),
     560            0 :             Cancelled => PageReconstructError::Cancelled,
     561            2 :             Other(other) => PageReconstructError::Other(other),
     562              :         }
     563            4 :     }
     564              : }
     565              : 
     566              : /// Public interface functions
     567              : impl Timeline {
     568              :     /// Get the LSN where this branch was created
     569         3669 :     pub(crate) fn get_ancestor_lsn(&self) -> Lsn {
     570         3669 :         self.ancestor_lsn
     571         3669 :     }
     572              : 
     573              :     /// Get the ancestor's timeline id
     574         5252 :     pub(crate) fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
     575         5252 :         self.ancestor_timeline
     576         5252 :             .as_ref()
     577         5252 :             .map(|ancestor| ancestor.timeline_id)
     578         5252 :     }
     579              : 
     580              :     /// Lock and get timeline's GC cutoff
     581      4514450 :     pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
     582      4514450 :         self.latest_gc_cutoff_lsn.read()
     583      4514450 :     }
     584              : 
     585              :     /// Look up given page version.
     586              :     ///
     587              :     /// If a remote layer file is needed, it is downloaded as part of this
     588              :     /// call.
     589              :     ///
     590              :     /// NOTE: It is considered an error to 'get' a key that doesn't exist. The
     591              :     /// abstraction above this needs to store suitable metadata to track what
     592              :     /// data exists with what keys, in separate metadata entries. If a
     593              :     /// non-existent key is requested, we may incorrectly return a value from
     594              :     /// an ancestor branch, for example, or waste a lot of cycles chasing the
     595              :     /// non-existing key.
     596              :     ///
     597              :     /// # Cancel-Safety
     598              :     ///
     599              :     /// This method is cancellation-safe.
     600      7427636 :     pub(crate) async fn get(
     601      7427636 :         &self,
     602      7427636 :         key: Key,
     603      7427636 :         lsn: Lsn,
     604      7427636 :         ctx: &RequestContext,
     605      7427661 :     ) -> Result<Bytes, PageReconstructError> {
     606      7427661 :         if !lsn.is_valid() {
     607            1 :             return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
     608      7427660 :         }
     609              : 
     610              :         // This check is debug-only because of the cost of hashing, and because it's a double-check: we
     611              :         // already checked the key against the shard_identity when looking up the Timeline from
     612              :         // page_service.
     613      7427660 :         debug_assert!(!self.shard_identity.is_key_disposable(&key));
     614              : 
     615              :         // XXX: structured stats collection for layer eviction here.
     616            0 :         trace!(
     617            0 :             "get page request for {}@{} from task kind {:?}",
     618            0 :             key,
     619            0 :             lsn,
     620            0 :             ctx.task_kind()
     621            0 :         );
     622              : 
     623              :         // Check the page cache. We will get back the most recent page with lsn <= `lsn`.
     624              :         // The cached image can be returned directly if there is no WAL between the cached image
     625              :         // and requested LSN. The cached image can also be used to reduce the amount of WAL needed
     626              :         // for redo.
     627      7427660 :         let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
     628      1797170 :             Some((cached_lsn, cached_img)) => {
     629      1797170 :                 match cached_lsn.cmp(&lsn) {
     630      1720526 :                     Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
     631              :                     Ordering::Equal => {
     632        76644 :                         MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
     633        76644 :                         return Ok(cached_img); // exact LSN match, return the image
     634              :                     }
     635              :                     Ordering::Greater => {
     636            0 :                         unreachable!("the returned lsn should never be after the requested lsn")
     637              :                     }
     638              :                 }
     639      1720526 :                 Some((cached_lsn, cached_img))
     640              :             }
     641      5630490 :             None => None,
     642              :         };
     643              : 
     644      7351016 :         let mut reconstruct_state = ValueReconstructState {
     645      7351016 :             records: Vec::new(),
     646      7351016 :             img: cached_page_img,
     647      7351016 :         };
     648      7351016 : 
     649      7351016 :         let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
     650      7351016 :         let path = self
     651      7351016 :             .get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
     652      2225518 :             .await?;
     653      7349680 :         timer.stop_and_record();
     654      7349680 : 
     655      7349680 :         let start = Instant::now();
     656      7349680 :         let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
     657      7349680 :         let elapsed = start.elapsed();
     658      7349680 :         crate::metrics::RECONSTRUCT_TIME
     659      7349680 :             .for_result(&res)
     660      7349680 :             .observe(elapsed.as_secs_f64());
     661      7349680 : 
     662      7349680 :         if cfg!(feature = "testing") && res.is_err() {
     663              :             // it can only be walredo issue
     664              :             use std::fmt::Write;
     665              : 
     666            0 :             let mut msg = String::new();
     667            0 : 
     668            0 :             path.into_iter().for_each(|(res, cont_lsn, layer)| {
     669            0 :                 writeln!(
     670            0 :                     msg,
     671            0 :                     "- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
     672            0 :                     layer(),
     673            0 :                 )
     674            0 :                 .expect("string grows")
     675            0 :             });
     676              : 
     677              :             // this is to rule out or provide evidence that we could in some cases read a duplicate
     678              :             // walrecord
     679            0 :             tracing::info!("walredo failed, path:\n{msg}");
     680      7349680 :         }
     681              : 
     682      7349680 :         res
     683      7427653 :     }
     684              : 
     685              :     pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
     686              : 
     687              :     /// Look up multiple page versions at a given LSN
     688              :     ///
     689              :     /// This naive implementation will be replaced with a more efficient one
     690              :     /// which actually vectorizes the read path.
     691        78952 :     pub(crate) async fn get_vectored(
     692        78952 :         &self,
     693        78952 :         key_ranges: &[Range<Key>],
     694        78952 :         lsn: Lsn,
     695        78952 :         ctx: &RequestContext,
     696        78952 :     ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
     697        78952 :         if !lsn.is_valid() {
     698            0 :             return Err(GetVectoredError::InvalidLsn(lsn));
     699        78952 :         }
     700        78952 : 
     701        78952 :         let key_count = key_ranges
     702        78952 :             .iter()
     703        80151 :             .map(|range| key_range_size(range) as u64)
     704        78952 :             .sum();
     705        78952 :         if key_count > Timeline::MAX_GET_VECTORED_KEYS {
     706            0 :             return Err(GetVectoredError::Oversized(key_count));
     707        78952 :         }
     708        78952 : 
     709        78952 :         let _timer = crate::metrics::GET_VECTORED_LATENCY
     710        78952 :             .for_task_kind(ctx.task_kind())
     711        78952 :             .map(|t| t.start_timer());
     712        78952 : 
     713        78952 :         let mut values = BTreeMap::new();
     714       159103 :         for range in key_ranges {
     715        80151 :             let mut key = range.start;
     716       312439 :             while key != range.end {
     717       232288 :                 assert!(!self.shard_identity.is_key_disposable(&key));
     718              : 
     719       232288 :                 let block = self.get(key, lsn, ctx).await;
     720              : 
     721              :                 if matches!(
     722            0 :                     block,
     723              :                     Err(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_))
     724              :                 ) {
     725            0 :                     return Err(GetVectoredError::Cancelled);
     726       232288 :                 }
     727       232288 : 
     728       232288 :                 values.insert(key, block);
     729       232288 :                 key = key.next();
     730              :             }
     731              :         }
     732              : 
     733        78952 :         Ok(values)
     734        78952 :     }
     735              : 
     736              :     /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
     737      9570130 :     pub(crate) fn get_last_record_lsn(&self) -> Lsn {
     738      9570130 :         self.last_record_lsn.load().last
     739      9570130 :     }
     740              : 
     741         3090 :     pub(crate) fn get_prev_record_lsn(&self) -> Lsn {
     742         3090 :         self.last_record_lsn.load().prev
     743         3090 :     }
     744              : 
     745              :     /// Atomically get both last and prev.
     746         1080 :     pub(crate) fn get_last_record_rlsn(&self) -> RecordLsn {
     747         1080 :         self.last_record_lsn.load()
     748         1080 :     }
     749              : 
     750       798339 :     pub(crate) fn get_disk_consistent_lsn(&self) -> Lsn {
     751       798339 :         self.disk_consistent_lsn.load()
     752       798339 :     }
     753              : 
     754              :     /// remote_consistent_lsn from the perspective of the tenant's current generation,
     755              :     /// not validated with control plane yet.
     756              :     /// See [`Self::get_remote_consistent_lsn_visible`].
     757         3090 :     pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
     758         3090 :         if let Some(remote_client) = &self.remote_client {
     759         3090 :             remote_client.remote_consistent_lsn_projected()
     760              :         } else {
     761            0 :             None
     762              :         }
     763         3090 :     }
     764              : 
     765              :     /// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
     766              :     /// i.e. a value of remote_consistent_lsn_projected which has undergone
     767              :     /// generation validation in the deletion queue.
     768       795952 :     pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
     769       795952 :         if let Some(remote_client) = &self.remote_client {
     770       795952 :             remote_client.remote_consistent_lsn_visible()
     771              :         } else {
     772            0 :             None
     773              :         }
     774       795952 :     }
     775              : 
     776              :     /// The sum of the file size of all historic layers in the layer map.
     777              :     /// This method makes no distinction between local and remote layers.
     778              :     /// Hence, the result **does not represent local filesystem usage**.
     779         3570 :     pub(crate) async fn layer_size_sum(&self) -> u64 {
     780         3570 :         let guard = self.layers.read().await;
     781         3570 :         let layer_map = guard.layer_map();
     782         3570 :         let mut size = 0;
     783       327026 :         for l in layer_map.iter_historic_layers() {
     784       327026 :             size += l.file_size();
     785       327026 :         }
     786         3570 :         size
     787         3570 :     }
     788              : 
     789           12 :     pub(crate) fn resident_physical_size(&self) -> u64 {
     790           12 :         self.metrics.resident_physical_size_get()
     791           12 :     }
     792              : 
     793              :     ///
     794              :     /// Wait until WAL has been received and processed up to this LSN.
     795              :     ///
     796              :     /// You should call this before any of the other get_* or list_* functions. Calling
     797              :     /// those functions with an LSN that has been processed yet is an error.
     798              :     ///
     799      1527253 :     pub(crate) async fn wait_lsn(
     800      1527253 :         &self,
     801      1527253 :         lsn: Lsn,
     802      1527253 :         _ctx: &RequestContext, /* Prepare for use by cancellation */
     803      1527255 :     ) -> Result<(), WaitLsnError> {
     804      1527255 :         if self.cancel.is_cancelled() {
     805            0 :             return Err(WaitLsnError::Shutdown);
     806      1527255 :         } else if !self.is_active() {
     807            0 :             return Err(WaitLsnError::BadState);
     808      1527255 :         }
     809              : 
     810              :         // This should never be called from the WAL receiver, because that could lead
     811              :         // to a deadlock.
     812              :         debug_assert!(
     813      1527255 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
     814            0 :             "wait_lsn cannot be called in WAL receiver"
     815              :         );
     816              :         debug_assert!(
     817      1527255 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
     818            0 :             "wait_lsn cannot be called in WAL receiver"
     819              :         );
     820              :         debug_assert!(
     821      1527255 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
     822            0 :             "wait_lsn cannot be called in WAL receiver"
     823              :         );
     824              : 
     825      1527255 :         let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
     826      1527255 : 
     827      1527255 :         match self
     828      1527255 :             .last_record_lsn
     829      1527255 :             .wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
     830       110510 :             .await
     831              :         {
     832      1527231 :             Ok(()) => Ok(()),
     833           24 :             Err(e) => {
     834           24 :                 use utils::seqwait::SeqWaitError::*;
     835           24 :                 match e {
     836            0 :                     Shutdown => Err(WaitLsnError::Shutdown),
     837              :                     Timeout => {
     838              :                         // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
     839           24 :                         drop(_timer);
     840           24 :                         let walreceiver_status = self.walreceiver_status();
     841           24 :                         Err(WaitLsnError::Timeout(format!(
     842           24 :                         "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
     843           24 :                         lsn,
     844           24 :                         self.get_last_record_lsn(),
     845           24 :                         self.get_disk_consistent_lsn(),
     846           24 :                         walreceiver_status,
     847           24 :                     )))
     848              :                     }
     849              :                 }
     850              :             }
     851              :         }
     852      1527255 :     }
     853              : 
     854         3114 :     pub(crate) fn walreceiver_status(&self) -> String {
     855         3114 :         match &*self.walreceiver.lock().unwrap() {
     856          107 :             None => "stopping or stopped".to_string(),
     857         3007 :             Some(walreceiver) => match walreceiver.status() {
     858         1865 :                 Some(status) => status.to_human_readable_string(),
     859         1142 :                 None => "Not active".to_string(),
     860              :             },
     861              :         }
     862         3114 :     }
     863              : 
     864              :     /// Check that it is valid to request operations with that lsn.
     865          678 :     pub(crate) fn check_lsn_is_in_scope(
     866          678 :         &self,
     867          678 :         lsn: Lsn,
     868          678 :         latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
     869          678 :     ) -> anyhow::Result<()> {
     870          678 :         ensure!(
     871          678 :             lsn >= **latest_gc_cutoff_lsn,
     872           16 :             "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
     873           16 :             lsn,
     874           16 :             **latest_gc_cutoff_lsn,
     875              :         );
     876          662 :         Ok(())
     877          678 :     }
     878              : 
     879              :     /// Flush to disk all data that was written with the put_* functions
     880         3904 :     #[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
     881              :     pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
     882              :         self.freeze_inmem_layer(false).await;
     883              :         self.flush_frozen_layers_and_wait().await
     884              :     }
     885              : 
     886              :     /// Outermost timeline compaction operation; downloads needed layers.
     887         1644 :     pub(crate) async fn compact(
     888         1644 :         self: &Arc<Self>,
     889         1644 :         cancel: &CancellationToken,
     890         1644 :         flags: EnumSet<CompactFlags>,
     891         1644 :         ctx: &RequestContext,
     892         1644 :     ) -> Result<(), CompactionError> {
     893         1644 :         // most likely the cancellation token is from background task, but in tests it could be the
     894         1644 :         // request task as well.
     895         1644 : 
     896         1644 :         let prepare = async move {
     897         1644 :             let guard = self.compaction_lock.lock().await;
     898              : 
     899         1644 :             let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
     900         1644 :                 BackgroundLoopKind::Compaction,
     901         1644 :                 ctx,
     902         1644 :             )
     903            1 :             .await;
     904              : 
     905         1644 :             (guard, permit)
     906         1644 :         };
     907              : 
     908              :         // this wait probably never needs any "long time spent" logging, because we already nag if
     909              :         // compaction task goes over it's period (20s) which is quite often in production.
     910         1644 :         let (_guard, _permit) = tokio::select! {
     911         1644 :             tuple = prepare => { tuple },
     912              :             _ = self.cancel.cancelled() => return Ok(()),
     913              :             _ = cancel.cancelled() => return Ok(()),
     914              :         };
     915              : 
     916         1644 :         let last_record_lsn = self.get_last_record_lsn();
     917         1644 : 
     918         1644 :         // Last record Lsn could be zero in case the timeline was just created
     919         1644 :         if !last_record_lsn.is_valid() {
     920            0 :             warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
     921            0 :             return Ok(());
     922         1644 :         }
     923         1644 : 
     924         1644 :         // High level strategy for compaction / image creation:
     925         1644 :         //
     926         1644 :         // 1. First, calculate the desired "partitioning" of the
     927         1644 :         // currently in-use key space. The goal is to partition the
     928         1644 :         // key space into roughly fixed-size chunks, but also take into
     929         1644 :         // account any existing image layers, and try to align the
     930         1644 :         // chunk boundaries with the existing image layers to avoid
     931         1644 :         // too much churn. Also try to align chunk boundaries with
     932         1644 :         // relation boundaries.  In principle, we don't know about
     933         1644 :         // relation boundaries here, we just deal with key-value
     934         1644 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
     935         1644 :         // map relations into key-value pairs. But in practice we know
     936         1644 :         // that 'field6' is the block number, and the fields 1-5
     937         1644 :         // identify a relation. This is just an optimization,
     938         1644 :         // though.
     939         1644 :         //
     940         1644 :         // 2. Once we know the partitioning, for each partition,
     941         1644 :         // decide if it's time to create a new image layer. The
     942         1644 :         // criteria is: there has been too much "churn" since the last
     943         1644 :         // image layer? The "churn" is fuzzy concept, it's a
     944         1644 :         // combination of too many delta files, or too much WAL in
     945         1644 :         // total in the delta file. Or perhaps: if creating an image
     946         1644 :         // file would allow to delete some older files.
     947         1644 :         //
     948         1644 :         // 3. After that, we compact all level0 delta files if there
     949         1644 :         // are too many of them.  While compacting, we also garbage
     950         1644 :         // collect any page versions that are no longer needed because
     951         1644 :         // of the new image layers we created in step 2.
     952         1644 :         //
     953         1644 :         // TODO: This high level strategy hasn't been implemented yet.
     954         1644 :         // Below are functions compact_level0() and create_image_layers()
     955         1644 :         // but they are a bit ad hoc and don't quite work like it's explained
     956         1644 :         // above. Rewrite it.
     957         1644 : 
     958         1644 :         // Is the timeline being deleted?
     959         1644 :         if self.is_stopping() {
     960            0 :             trace!("Dropping out of compaction on timeline shutdown");
     961            0 :             return Err(CompactionError::ShuttingDown);
     962         1644 :         }
     963         1644 : 
     964         1644 :         let target_file_size = self.get_checkpoint_distance();
     965         1644 : 
     966         1644 :         // Define partitioning schema if needed
     967         1644 : 
     968         1644 :         // FIXME: the match should only cover repartitioning, not the next steps
     969         1644 :         match self
     970         1644 :             .repartition(
     971         1644 :                 self.get_last_record_lsn(),
     972         1644 :                 self.get_compaction_target_size(),
     973         1644 :                 flags,
     974         1644 :                 ctx,
     975         1644 :             )
     976       171920 :             .await
     977              :         {
     978         1641 :             Ok((partitioning, lsn)) => {
     979         1641 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     980         1641 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     981         1641 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     982         1641 :                     .build();
     983         1641 : 
     984         1641 :                 // 2. Compact
     985         1641 :                 let timer = self.metrics.compact_time_histo.start_timer();
     986       326269 :                 self.compact_level0(target_file_size, ctx).await?;
     987         1640 :                 timer.stop_and_record();
     988              : 
     989              :                 // 3. Create new image layers for partitions that have been modified
     990              :                 // "enough".
     991         1640 :                 let layers = self
     992         1640 :                     .create_image_layers(&partitioning, lsn, false, &image_ctx)
     993        56359 :                     .await
     994         1640 :                     .map_err(anyhow::Error::from)?;
     995         1640 :                 if let Some(remote_client) = &self.remote_client {
     996         8032 :                     for layer in layers {
     997         6392 :                         remote_client.schedule_layer_file_upload(layer)?;
     998              :                     }
     999            0 :                 }
    1000              : 
    1001         1640 :                 if let Some(remote_client) = &self.remote_client {
    1002              :                     // should any new image layer been created, not uploading index_part will
    1003              :                     // result in a mismatch between remote_physical_size and layermap calculated
    1004              :                     // size, which will fail some tests, but should not be an issue otherwise.
    1005         1640 :                     remote_client.schedule_index_upload_for_file_changes()?;
    1006            0 :                 }
    1007              :             }
    1008            2 :             Err(err) => {
    1009            2 :                 // no partitioning? This is normal, if the timeline was just created
    1010            2 :                 // as an empty timeline. Also in unit tests, when we use the timeline
    1011            2 :                 // as a simple key-value store, ignoring the datadir layout. Log the
    1012            2 :                 // error but continue.
    1013            2 :                 //
    1014            2 :                 // Suppress error when it's due to cancellation
    1015            2 :                 if !self.cancel.is_cancelled() {
    1016            1 :                     error!("could not compact, repartitioning keyspace failed: {err:?}");
    1017            1 :                 }
    1018              :             }
    1019              :         };
    1020              : 
    1021         1642 :         Ok(())
    1022         1642 :     }
    1023              : 
    1024              :     /// Mutate the timeline with a [`TimelineWriter`].
    1025      3350865 :     pub(crate) async fn writer(&self) -> TimelineWriter<'_> {
    1026      3350865 :         TimelineWriter {
    1027      3350865 :             tl: self,
    1028      3350865 :             _write_guard: self.write_lock.lock().await,
    1029              :         }
    1030      3350865 :     }
    1031              : 
    1032              :     /// Check if more than 'checkpoint_distance' of WAL has been accumulated in
    1033              :     /// the in-memory layer, and initiate flushing it if so.
    1034              :     ///
    1035              :     /// Also flush after a period of time without new data -- it helps
    1036              :     /// safekeepers to regard pageserver as caught up and suspend activity.
    1037      1397166 :     pub(crate) async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
    1038      1397166 :         let last_lsn = self.get_last_record_lsn();
    1039      1396420 :         let open_layer_size = {
    1040      1397166 :             let guard = self.layers.read().await;
    1041      1397166 :             let layers = guard.layer_map();
    1042      1397166 :             let Some(open_layer) = layers.open_layer.as_ref() else {
    1043          746 :                 return Ok(());
    1044              :             };
    1045      1396420 :             open_layer.size().await?
    1046              :         };
    1047      1396420 :         let last_freeze_at = self.last_freeze_at.load();
    1048      1396420 :         let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
    1049      1396420 :         let distance = last_lsn.widening_sub(last_freeze_at);
    1050      1396420 :         // Checkpointing the open layer can be triggered by layer size or LSN range.
    1051      1396420 :         // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
    1052      1396420 :         // we want to stay below that with a big margin.  The LSN distance determines how
    1053      1396420 :         // much WAL the safekeepers need to store.
    1054      1396420 :         if distance >= self.get_checkpoint_distance().into()
    1055      1395842 :             || open_layer_size > self.get_checkpoint_distance()
    1056      1392900 :             || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
    1057              :         {
    1058         3520 :             info!(
    1059         3520 :                 "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
    1060         3520 :                 distance,
    1061         3520 :                 open_layer_size,
    1062         3520 :                 last_freeze_ts.elapsed()
    1063         3520 :             );
    1064              : 
    1065         3520 :             self.freeze_inmem_layer(true).await;
    1066         3520 :             self.last_freeze_at.store(last_lsn);
    1067         3520 :             *(self.last_freeze_ts.write().unwrap()) = Instant::now();
    1068         3520 : 
    1069         3520 :             // Wake up the layer flusher
    1070         3520 :             self.flush_frozen_layers();
    1071      1392900 :         }
    1072      1396420 :         Ok(())
    1073      1397166 :     }
    1074              : 
    1075         1257 :     pub(crate) fn activate(
    1076         1257 :         self: &Arc<Self>,
    1077         1257 :         broker_client: BrokerClientChannel,
    1078         1257 :         background_jobs_can_start: Option<&completion::Barrier>,
    1079         1257 :         ctx: &RequestContext,
    1080         1257 :     ) {
    1081         1257 :         if self.tenant_shard_id.is_zero() {
    1082         1206 :             // Logical size is only maintained accurately on shard zero.
    1083         1206 :             self.spawn_initial_logical_size_computation_task(ctx);
    1084         1206 :         }
    1085         1257 :         self.launch_wal_receiver(ctx, broker_client);
    1086         1257 :         self.set_state(TimelineState::Active);
    1087         1257 :         self.launch_eviction_task(background_jobs_can_start);
    1088         1257 :     }
    1089              : 
    1090              :     /// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
    1091              :     /// also to remote storage.  This method can easily take multiple seconds for a busy timeline.
    1092              :     ///
    1093              :     /// While we are flushing, we continue to accept read I/O.
    1094          261 :     pub(crate) async fn flush_and_shutdown(&self) {
    1095          261 :         debug_assert_current_span_has_tenant_and_timeline_id();
    1096              : 
    1097              :         // Stop ingesting data, so that we are not still writing to an InMemoryLayer while
    1098              :         // trying to flush
    1099            0 :         tracing::debug!("Waiting for WalReceiverManager...");
    1100          261 :         task_mgr::shutdown_tasks(
    1101          261 :             Some(TaskKind::WalReceiverManager),
    1102          261 :             Some(self.tenant_shard_id),
    1103          261 :             Some(self.timeline_id),
    1104          261 :         )
    1105          161 :         .await;
    1106              : 
    1107              :         // Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
    1108          261 :         self.last_record_lsn.shutdown();
    1109          261 : 
    1110          261 :         // now all writers to InMemory layer are gone, do the final flush if requested
    1111          261 :         match self.freeze_and_flush().await {
    1112              :             Ok(_) => {
    1113              :                 // drain the upload queue
    1114          216 :                 if let Some(client) = self.remote_client.as_ref() {
    1115              :                     // if we did not wait for completion here, it might be our shutdown process
    1116              :                     // didn't wait for remote uploads to complete at all, as new tasks can forever
    1117              :                     // be spawned.
    1118              :                     //
    1119              :                     // what is problematic is the shutting down of RemoteTimelineClient, because
    1120              :                     // obviously it does not make sense to stop while we wait for it, but what
    1121              :                     // about corner cases like s3 suddenly hanging up?
    1122          216 :                     if let Err(e) = client.shutdown().await {
    1123              :                         // Non-fatal.  Shutdown is infallible.  Failures to flush just mean that
    1124              :                         // we have some extra WAL replay to do next time the timeline starts.
    1125            0 :                         warn!("failed to flush to remote storage: {e:#}");
    1126          216 :                     }
    1127            0 :                 }
    1128              :             }
    1129           45 :             Err(e) => {
    1130              :                 // Non-fatal.  Shutdown is infallible.  Failures to flush just mean that
    1131              :                 // we have some extra WAL replay to do next time the timeline starts.
    1132           45 :                 warn!("failed to freeze and flush: {e:#}");
    1133              :             }
    1134              :         }
    1135              : 
    1136          435 :         self.shutdown().await;
    1137          261 :     }
    1138              : 
    1139              :     /// Shut down immediately, without waiting for any open layers to flush to disk.  This is a subset of
    1140              :     /// the graceful [`Timeline::flush_and_shutdown`] function.
    1141          592 :     pub(crate) async fn shutdown(&self) {
    1142          592 :         debug_assert_current_span_has_tenant_and_timeline_id();
    1143              : 
    1144              :         // Signal any subscribers to our cancellation token to drop out
    1145            0 :         tracing::debug!("Cancelling CancellationToken");
    1146          592 :         self.cancel.cancel();
    1147          592 : 
    1148          592 :         // Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
    1149          592 :         // while doing so.
    1150          592 :         self.last_record_lsn.shutdown();
    1151          592 : 
    1152          592 :         // Shut down the layer flush task before the remote client, as one depends on the other
    1153          592 :         task_mgr::shutdown_tasks(
    1154          592 :             Some(TaskKind::LayerFlushTask),
    1155          592 :             Some(self.tenant_shard_id),
    1156          592 :             Some(self.timeline_id),
    1157          592 :         )
    1158          504 :         .await;
    1159              : 
    1160              :         // Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
    1161              :         // case our caller wants to use that for a deletion
    1162          592 :         if let Some(remote_client) = self.remote_client.as_ref() {
    1163          592 :             match remote_client.stop() {
    1164          592 :                 Ok(()) => {}
    1165            0 :                 Err(StopError::QueueUninitialized) => {
    1166            0 :                     // Shutting down during initialization is legal
    1167            0 :                 }
    1168              :             }
    1169            0 :         }
    1170              : 
    1171            0 :         tracing::debug!("Waiting for tasks...");
    1172              : 
    1173          619 :         task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;
    1174              : 
    1175              :         // Finally wait until any gate-holders are complete
    1176          592 :         self.gate.close().await;
    1177          592 :     }
    1178              : 
    1179         2279 :     pub(crate) fn set_state(&self, new_state: TimelineState) {
    1180         2279 :         match (self.current_state(), new_state) {
    1181         2279 :             (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
    1182          125 :                 info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
    1183              :             }
    1184            0 :             (st, TimelineState::Loading) => {
    1185            0 :                 error!("ignoring transition from {st:?} into Loading state");
    1186              :             }
    1187            7 :             (TimelineState::Broken { .. }, new_state) => {
    1188            7 :                 error!("Ignoring state update {new_state:?} for broken timeline");
    1189              :             }
    1190              :             (TimelineState::Stopping, TimelineState::Active) => {
    1191            0 :                 error!("Not activating a Stopping timeline");
    1192              :             }
    1193         2147 :             (_, new_state) => {
    1194         2147 :                 self.state.send_replace(new_state);
    1195         2147 :             }
    1196              :         }
    1197         2279 :     }
    1198              : 
    1199           19 :     pub(crate) fn set_broken(&self, reason: String) {
    1200           19 :         let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
    1201           19 :         let broken_state = TimelineState::Broken {
    1202           19 :             reason,
    1203           19 :             backtrace: backtrace_str,
    1204           19 :         };
    1205           19 :         self.set_state(broken_state);
    1206           19 : 
    1207           19 :         // Although the Broken state is not equivalent to shutdown() (shutdown will be called
    1208           19 :         // later when this tenant is detach or the process shuts down), firing the cancellation token
    1209           19 :         // here avoids the need for other tasks to watch for the Broken state explicitly.
    1210           19 :         self.cancel.cancel();
    1211           19 :     }
    1212              : 
    1213      2277272 :     pub(crate) fn current_state(&self) -> TimelineState {
    1214      2277272 :         self.state.borrow().clone()
    1215      2277272 :     }
    1216              : 
    1217          944 :     pub(crate) fn is_broken(&self) -> bool {
    1218          944 :         matches!(&*self.state.borrow(), TimelineState::Broken { .. })
    1219          944 :     }
    1220              : 
    1221      1542206 :     pub(crate) fn is_active(&self) -> bool {
    1222      1542206 :         self.current_state() == TimelineState::Active
    1223      1542206 :     }
    1224              : 
    1225         2854 :     pub(crate) fn is_stopping(&self) -> bool {
    1226         2854 :         self.current_state() == TimelineState::Stopping
    1227         2854 :     }
    1228              : 
    1229         1256 :     pub(crate) fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
    1230         1256 :         self.state.subscribe()
    1231         1256 :     }
    1232              : 
    1233      1299917 :     pub(crate) async fn wait_to_become_active(
    1234      1299917 :         &self,
    1235      1299917 :         _ctx: &RequestContext, // Prepare for use by cancellation
    1236      1299919 :     ) -> Result<(), TimelineState> {
    1237      1299919 :         let mut receiver = self.state.subscribe();
    1238      1299942 :         loop {
    1239      1299942 :             let current_state = receiver.borrow().clone();
    1240      1299942 :             match current_state {
    1241              :                 TimelineState::Loading => {
    1242           23 :                     receiver
    1243           23 :                         .changed()
    1244           23 :                         .await
    1245           23 :                         .expect("holding a reference to self");
    1246              :                 }
    1247              :                 TimelineState::Active { .. } => {
    1248      1299914 :                     return Ok(());
    1249              :                 }
    1250              :                 TimelineState::Broken { .. } | TimelineState::Stopping => {
    1251              :                     // There's no chance the timeline can transition back into ::Active
    1252            5 :                     return Err(current_state);
    1253              :                 }
    1254              :             }
    1255              :         }
    1256      1299919 :     }
    1257              : 
    1258          116 :     pub(crate) async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
    1259          116 :         let guard = self.layers.read().await;
    1260          116 :         let layer_map = guard.layer_map();
    1261          116 :         let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
    1262          116 :         if let Some(open_layer) = &layer_map.open_layer {
    1263           27 :             in_memory_layers.push(open_layer.info());
    1264           89 :         }
    1265          116 :         for frozen_layer in &layer_map.frozen_layers {
    1266            0 :             in_memory_layers.push(frozen_layer.info());
    1267            0 :         }
    1268              : 
    1269          116 :         let mut historic_layers = Vec::new();
    1270         3151 :         for historic_layer in layer_map.iter_historic_layers() {
    1271         3151 :             let historic_layer = guard.get_from_desc(&historic_layer);
    1272         3151 :             historic_layers.push(historic_layer.info(reset));
    1273         3151 :         }
    1274              : 
    1275          116 :         LayerMapInfo {
    1276          116 :             in_memory_layers,
    1277          116 :             historic_layers,
    1278          116 :         }
    1279          116 :     }
    1280              : 
    1281            4 :     #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
    1282              :     pub(crate) async fn download_layer(
    1283              :         &self,
    1284              :         layer_file_name: &str,
    1285              :     ) -> anyhow::Result<Option<bool>> {
    1286              :         let Some(layer) = self.find_layer(layer_file_name).await else {
    1287              :             return Ok(None);
    1288              :         };
    1289              : 
    1290              :         if self.remote_client.is_none() {
    1291              :             return Ok(Some(false));
    1292              :         }
    1293              : 
    1294              :         layer.download().await?;
    1295              : 
    1296              :         Ok(Some(true))
    1297              :     }
    1298              : 
    1299              :     /// Evict just one layer.
    1300              :     ///
    1301              :     /// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`.
    1302         2242 :     pub(crate) async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
    1303         2242 :         let _gate = self
    1304         2242 :             .gate
    1305         2242 :             .enter()
    1306         2242 :             .map_err(|_| anyhow::anyhow!("Shutting down"))?;
    1307              : 
    1308         2242 :         let Some(local_layer) = self.find_layer(layer_file_name).await else {
    1309            0 :             return Ok(None);
    1310              :         };
    1311              : 
    1312         2242 :         match local_layer.evict_and_wait().await {
    1313         2242 :             Ok(()) => Ok(Some(true)),
    1314            0 :             Err(EvictionError::NotFound) => Ok(Some(false)),
    1315            0 :             Err(EvictionError::Downloaded) => Ok(Some(false)),
    1316              :         }
    1317         2242 :     }
    1318              : }
    1319              : 
    1320              : /// Number of times we will compute partition within a checkpoint distance.
    1321              : const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
    1322              : 
    1323              : // Private functions
    1324              : impl Timeline {
    1325          607 :     pub(crate) fn get_lazy_slru_download(&self) -> bool {
    1326          607 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1327          607 :         tenant_conf
    1328          607 :             .lazy_slru_download
    1329          607 :             .unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
    1330          607 :     }
    1331              : 
    1332      2795496 :     fn get_checkpoint_distance(&self) -> u64 {
    1333      2795496 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1334      2795496 :         tenant_conf
    1335      2795496 :             .checkpoint_distance
    1336      2795496 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
    1337      2795496 :     }
    1338              : 
    1339      1392900 :     fn get_checkpoint_timeout(&self) -> Duration {
    1340      1392900 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1341      1392900 :         tenant_conf
    1342      1392900 :             .checkpoint_timeout
    1343      1392900 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
    1344      1392900 :     }
    1345              : 
    1346         1724 :     fn get_compaction_target_size(&self) -> u64 {
    1347         1724 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1348         1724 :         tenant_conf
    1349         1724 :             .compaction_target_size
    1350         1724 :             .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
    1351         1724 :     }
    1352              : 
    1353         1641 :     fn get_compaction_threshold(&self) -> usize {
    1354         1641 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1355         1641 :         tenant_conf
    1356         1641 :             .compaction_threshold
    1357         1641 :             .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
    1358         1641 :     }
    1359              : 
    1360        36566 :     fn get_image_creation_threshold(&self) -> usize {
    1361        36566 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1362        36566 :         tenant_conf
    1363        36566 :             .image_creation_threshold
    1364        36566 :             .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
    1365        36566 :     }
    1366              : 
    1367         2599 :     fn get_eviction_policy(&self) -> EvictionPolicy {
    1368         2599 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1369         2599 :         tenant_conf
    1370         2599 :             .eviction_policy
    1371         2599 :             .unwrap_or(self.conf.default_tenant_conf.eviction_policy)
    1372         2599 :     }
    1373              : 
    1374         1651 :     fn get_evictions_low_residence_duration_metric_threshold(
    1375         1651 :         tenant_conf: &TenantConfOpt,
    1376         1651 :         default_tenant_conf: &TenantConf,
    1377         1651 :     ) -> Duration {
    1378         1651 :         tenant_conf
    1379         1651 :             .evictions_low_residence_duration_metric_threshold
    1380         1651 :             .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
    1381         1651 :     }
    1382              : 
    1383        13896 :     fn get_gc_feedback(&self) -> bool {
    1384        13896 :         let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
    1385        13896 :         tenant_conf
    1386        13896 :             .gc_feedback
    1387        13896 :             .unwrap_or(self.conf.default_tenant_conf.gc_feedback)
    1388        13896 :     }
    1389              : 
    1390           61 :     pub(super) fn tenant_conf_updated(&self) {
    1391           61 :         // NB: Most tenant conf options are read by background loops, so,
    1392           61 :         // changes will automatically be picked up.
    1393           61 : 
    1394           61 :         // The threshold is embedded in the metric. So, we need to update it.
    1395           61 :         {
    1396           61 :             let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
    1397           61 :                 &self.tenant_conf.read().unwrap().tenant_conf,
    1398           61 :                 &self.conf.default_tenant_conf,
    1399           61 :             );
    1400           61 : 
    1401           61 :             let tenant_id_str = self.tenant_shard_id.tenant_id.to_string();
    1402           61 :             let shard_id_str = format!("{}", self.tenant_shard_id.shard_slug());
    1403           61 : 
    1404           61 :             let timeline_id_str = self.timeline_id.to_string();
    1405           61 :             self.metrics
    1406           61 :                 .evictions_with_low_residence_duration
    1407           61 :                 .write()
    1408           61 :                 .unwrap()
    1409           61 :                 .change_threshold(
    1410           61 :                     &tenant_id_str,
    1411           61 :                     &shard_id_str,
    1412           61 :                     &timeline_id_str,
    1413           61 :                     new_threshold,
    1414           61 :                 );
    1415           61 :         }
    1416           61 :     }
    1417              : 
    1418              :     /// Open a Timeline handle.
    1419              :     ///
    1420              :     /// Loads the metadata for the timeline into memory, but not the layer map.
    1421              :     #[allow(clippy::too_many_arguments)]
    1422         1590 :     pub(super) fn new(
    1423         1590 :         conf: &'static PageServerConf,
    1424         1590 :         tenant_conf: Arc<RwLock<AttachedTenantConf>>,
    1425         1590 :         metadata: &TimelineMetadata,
    1426         1590 :         ancestor: Option<Arc<Timeline>>,
    1427         1590 :         timeline_id: TimelineId,
    1428         1590 :         tenant_shard_id: TenantShardId,
    1429         1590 :         generation: Generation,
    1430         1590 :         shard_identity: ShardIdentity,
    1431         1590 :         walredo_mgr: Option<Arc<super::WalRedoManager>>,
    1432         1590 :         resources: TimelineResources,
    1433         1590 :         pg_version: u32,
    1434         1590 :         state: TimelineState,
    1435         1590 :         cancel: CancellationToken,
    1436         1590 :     ) -> Arc<Self> {
    1437         1590 :         let disk_consistent_lsn = metadata.disk_consistent_lsn();
    1438         1590 :         let (state, _) = watch::channel(state);
    1439         1590 : 
    1440         1590 :         let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
    1441         1590 :         let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
    1442         1590 : 
    1443         1590 :         let tenant_conf_guard = tenant_conf.read().unwrap();
    1444         1590 : 
    1445         1590 :         let evictions_low_residence_duration_metric_threshold =
    1446         1590 :             Self::get_evictions_low_residence_duration_metric_threshold(
    1447         1590 :                 &tenant_conf_guard.tenant_conf,
    1448         1590 :                 &conf.default_tenant_conf,
    1449         1590 :             );
    1450         1590 :         drop(tenant_conf_guard);
    1451         1590 : 
    1452         1590 :         Arc::new_cyclic(|myself| {
    1453         1590 :             let mut result = Timeline {
    1454         1590 :                 conf,
    1455         1590 :                 tenant_conf,
    1456         1590 :                 myself: myself.clone(),
    1457         1590 :                 timeline_id,
    1458         1590 :                 tenant_shard_id,
    1459         1590 :                 generation,
    1460         1590 :                 shard_identity,
    1461         1590 :                 pg_version,
    1462         1590 :                 layers: Default::default(),
    1463         1590 :                 wanted_image_layers: Mutex::new(None),
    1464         1590 : 
    1465         1590 :                 walredo_mgr,
    1466         1590 :                 walreceiver: Mutex::new(None),
    1467         1590 : 
    1468         1590 :                 remote_client: resources.remote_client.map(Arc::new),
    1469         1590 : 
    1470         1590 :                 // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
    1471         1590 :                 last_record_lsn: SeqWait::new(RecordLsn {
    1472         1590 :                     last: disk_consistent_lsn,
    1473         1590 :                     prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
    1474         1590 :                 }),
    1475         1590 :                 disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
    1476         1590 : 
    1477         1590 :                 last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
    1478         1590 :                 last_freeze_ts: RwLock::new(Instant::now()),
    1479         1590 : 
    1480         1590 :                 loaded_at: (disk_consistent_lsn, SystemTime::now()),
    1481         1590 : 
    1482         1590 :                 ancestor_timeline: ancestor,
    1483         1590 :                 ancestor_lsn: metadata.ancestor_lsn(),
    1484         1590 : 
    1485         1590 :                 metrics: TimelineMetrics::new(
    1486         1590 :                     &tenant_shard_id,
    1487         1590 :                     &timeline_id,
    1488         1590 :                     crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
    1489         1590 :                         "mtime",
    1490         1590 :                         evictions_low_residence_duration_metric_threshold,
    1491         1590 :                     ),
    1492         1590 :                 ),
    1493         1590 : 
    1494         1590 :                 query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new(
    1495         1590 :                     &tenant_shard_id,
    1496         1590 :                     &timeline_id,
    1497         1590 :                 ),
    1498         1590 : 
    1499         1590 :                 flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
    1500         1590 : 
    1501         1590 :                 layer_flush_start_tx,
    1502         1590 :                 layer_flush_done_tx,
    1503         1590 : 
    1504         1590 :                 write_lock: tokio::sync::Mutex::new(()),
    1505         1590 : 
    1506         1590 :                 gc_info: std::sync::RwLock::new(GcInfo {
    1507         1590 :                     retain_lsns: Vec::new(),
    1508         1590 :                     horizon_cutoff: Lsn(0),
    1509         1590 :                     pitr_cutoff: Lsn(0),
    1510         1590 :                 }),
    1511         1590 : 
    1512         1590 :                 latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
    1513         1590 :                 initdb_lsn: metadata.initdb_lsn(),
    1514         1590 : 
    1515         1590 :                 current_logical_size: if disk_consistent_lsn.is_valid() {
    1516              :                     // we're creating timeline data with some layer files existing locally,
    1517              :                     // need to recalculate timeline's logical size based on data in the layers.
    1518          909 :                     LogicalSize::deferred_initial(disk_consistent_lsn)
    1519              :                 } else {
    1520              :                     // we're creating timeline data without any layers existing locally,
    1521              :                     // initial logical size is 0.
    1522          681 :                     LogicalSize::empty_initial()
    1523              :                 },
    1524         1590 :                 partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
    1525         1590 :                 repartition_threshold: 0,
    1526         1590 : 
    1527         1590 :                 last_received_wal: Mutex::new(None),
    1528         1590 :                 rel_size_cache: RwLock::new(HashMap::new()),
    1529         1590 : 
    1530         1590 :                 download_all_remote_layers_task_info: RwLock::new(None),
    1531         1590 : 
    1532         1590 :                 state,
    1533         1590 : 
    1534         1590 :                 eviction_task_timeline_state: tokio::sync::Mutex::new(
    1535         1590 :                     EvictionTaskTimelineState::default(),
    1536         1590 :                 ),
    1537         1590 :                 delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
    1538         1590 : 
    1539         1590 :                 cancel,
    1540         1590 :                 gate: Gate::default(),
    1541         1590 : 
    1542         1590 :                 compaction_lock: tokio::sync::Mutex::default(),
    1543         1590 :                 gc_lock: tokio::sync::Mutex::default(),
    1544         1590 :             };
    1545         1590 :             result.repartition_threshold =
    1546         1590 :                 result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
    1547         1590 :             result
    1548         1590 :                 .metrics
    1549         1590 :                 .last_record_gauge
    1550         1590 :                 .set(disk_consistent_lsn.0 as i64);
    1551         1590 :             result
    1552         1590 :         })
    1553         1590 :     }
    1554              : 
    1555         2243 :     pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
    1556         2243 :         let Ok(guard) = self.gate.enter() else {
    1557            0 :             info!("cannot start flush loop when the timeline gate has already been closed");
    1558            0 :             return;
    1559              :         };
    1560         2243 :         let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
    1561         2243 :         match *flush_loop_state {
    1562         1573 :             FlushLoopState::NotStarted => (),
    1563              :             FlushLoopState::Running { .. } => {
    1564          670 :                 info!(
    1565          670 :                     "skipping attempt to start flush_loop twice {}/{}",
    1566          670 :                     self.tenant_shard_id, self.timeline_id
    1567          670 :                 );
    1568          670 :                 return;
    1569              :             }
    1570              :             FlushLoopState::Exited => {
    1571            0 :                 warn!(
    1572            0 :                     "ignoring attempt to restart exited flush_loop {}/{}",
    1573            0 :                     self.tenant_shard_id, self.timeline_id
    1574            0 :                 );
    1575            0 :                 return;
    1576              :             }
    1577              :         }
    1578              : 
    1579         1573 :         let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
    1580         1573 :         let self_clone = Arc::clone(self);
    1581         1573 : 
    1582         1573 :         debug!("spawning flush loop");
    1583         1573 :         *flush_loop_state = FlushLoopState::Running {
    1584         1573 :             #[cfg(test)]
    1585         1573 :             expect_initdb_optimization: false,
    1586         1573 :             #[cfg(test)]
    1587         1573 :             initdb_optimization_count: 0,
    1588         1573 :         };
    1589         1573 :         task_mgr::spawn(
    1590         1573 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    1591         1573 :             task_mgr::TaskKind::LayerFlushTask,
    1592         1573 :             Some(self.tenant_shard_id),
    1593         1573 :             Some(self.timeline_id),
    1594         1573 :             "layer flush task",
    1595              :             false,
    1596         1573 :             async move {
    1597         1573 :                 let _guard = guard;
    1598         1573 :                 let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
    1599        27794 :                 self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
    1600          591 :                 let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
    1601          591 :                 assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
    1602          591 :                 *flush_loop_state  = FlushLoopState::Exited;
    1603          591 :                 Ok(())
    1604          591 :             }
    1605         1573 :             .instrument(info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
    1606              :         );
    1607         2243 :     }
    1608              : 
    1609              :     /// Creates and starts the wal receiver.
    1610              :     ///
    1611              :     /// This function is expected to be called at most once per Timeline's lifecycle
    1612              :     /// when the timeline is activated.
    1613         1257 :     fn launch_wal_receiver(
    1614         1257 :         self: &Arc<Self>,
    1615         1257 :         ctx: &RequestContext,
    1616         1257 :         broker_client: BrokerClientChannel,
    1617         1257 :     ) {
    1618         1257 :         info!(
    1619         1257 :             "launching WAL receiver for timeline {} of tenant {}",
    1620         1257 :             self.timeline_id, self.tenant_shard_id
    1621         1257 :         );
    1622              : 
    1623         1257 :         let tenant_conf_guard = self.tenant_conf.read().unwrap();
    1624         1257 :         let wal_connect_timeout = tenant_conf_guard
    1625         1257 :             .tenant_conf
    1626         1257 :             .walreceiver_connect_timeout
    1627         1257 :             .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
    1628         1257 :         let lagging_wal_timeout = tenant_conf_guard
    1629         1257 :             .tenant_conf
    1630         1257 :             .lagging_wal_timeout
    1631         1257 :             .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
    1632         1257 :         let max_lsn_wal_lag = tenant_conf_guard
    1633         1257 :             .tenant_conf
    1634         1257 :             .max_lsn_wal_lag
    1635         1257 :             .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
    1636         1257 :         drop(tenant_conf_guard);
    1637         1257 : 
    1638         1257 :         let mut guard = self.walreceiver.lock().unwrap();
    1639         1257 :         assert!(
    1640         1257 :             guard.is_none(),
    1641            0 :             "multiple launches / re-launches of WAL receiver are not supported"
    1642              :         );
    1643         1257 :         *guard = Some(WalReceiver::start(
    1644         1257 :             Arc::clone(self),
    1645         1257 :             WalReceiverConf {
    1646         1257 :                 wal_connect_timeout,
    1647         1257 :                 lagging_wal_timeout,
    1648         1257 :                 max_lsn_wal_lag,
    1649         1257 :                 auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
    1650         1257 :                 availability_zone: self.conf.availability_zone.clone(),
    1651         1257 :                 ingest_batch_size: self.conf.ingest_batch_size,
    1652         1257 :             },
    1653         1257 :             broker_client,
    1654         1257 :             ctx,
    1655         1257 :         ));
    1656         1257 :     }
    1657              : 
    1658              :     /// Initialize with an empty layer map. Used when creating a new timeline.
    1659         1145 :     pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
    1660         1145 :         let mut layers = self.layers.try_write().expect(
    1661         1145 :             "in the context where we call this function, no other task has access to the object",
    1662         1145 :         );
    1663         1145 :         layers.initialize_empty(Lsn(start_lsn.0));
    1664         1145 :     }
    1665              : 
    1666              :     /// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
    1667              :     /// files.
    1668          433 :     pub(super) async fn load_layer_map(
    1669          433 :         &self,
    1670          433 :         disk_consistent_lsn: Lsn,
    1671          433 :         index_part: Option<IndexPart>,
    1672          433 :     ) -> anyhow::Result<()> {
    1673              :         use init::{Decision::*, Discovered, DismissedLayer};
    1674              :         use LayerFileName::*;
    1675              : 
    1676          433 :         let mut guard = self.layers.write().await;
    1677              : 
    1678          433 :         let timer = self.metrics.load_layer_map_histo.start_timer();
    1679          433 : 
    1680          433 :         // Scan timeline directory and create ImageFileName and DeltaFilename
    1681          433 :         // structs representing all files on disk
    1682          433 :         let timeline_path = self
    1683          433 :             .conf
    1684          433 :             .timeline_path(&self.tenant_shard_id, &self.timeline_id);
    1685          433 :         let conf = self.conf;
    1686          433 :         let span = tracing::Span::current();
    1687          433 : 
    1688          433 :         // Copy to move into the task we're about to spawn
    1689          433 :         let generation = self.generation;
    1690          433 :         let shard = self.get_shard_index();
    1691          433 :         let this = self.myself.upgrade().expect("&self method holds the arc");
    1692              : 
    1693          433 :         let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
    1694          433 :             move || {
    1695          433 :                 let _g = span.entered();
    1696          433 :                 let discovered = init::scan_timeline_dir(&timeline_path)?;
    1697          433 :                 let mut discovered_layers = Vec::with_capacity(discovered.len());
    1698          433 :                 let mut unrecognized_files = Vec::new();
    1699          433 : 
    1700          433 :                 let mut path = timeline_path;
    1701              : 
    1702        13739 :                 for discovered in discovered {
    1703        13306 :                     let (name, kind) = match discovered {
    1704        12821 :                         Discovered::Layer(file_name, file_size) => {
    1705        12821 :                             discovered_layers.push((file_name, file_size));
    1706        12821 :                             continue;
    1707              :                         }
    1708              :                         Discovered::Metadata | Discovered::IgnoredBackup => {
    1709          433 :                             continue;
    1710              :                         }
    1711            0 :                         Discovered::Unknown(file_name) => {
    1712            0 :                             // we will later error if there are any
    1713            0 :                             unrecognized_files.push(file_name);
    1714            0 :                             continue;
    1715              :                         }
    1716           51 :                         Discovered::Ephemeral(name) => (name, "old ephemeral file"),
    1717            1 :                         Discovered::Temporary(name) => (name, "temporary timeline file"),
    1718            0 :                         Discovered::TemporaryDownload(name) => (name, "temporary download"),
    1719              :                     };
    1720           52 :                     path.push(Utf8Path::new(&name));
    1721           52 :                     init::cleanup(&path, kind)?;
    1722           52 :                     path.pop();
    1723              :                 }
    1724              : 
    1725          433 :                 if !unrecognized_files.is_empty() {
    1726              :                     // assume that if there are any there are many many.
    1727            0 :                     let n = unrecognized_files.len();
    1728            0 :                     let first = &unrecognized_files[..n.min(10)];
    1729            0 :                     anyhow::bail!(
    1730            0 :                         "unrecognized files in timeline dir (total {n}), first 10: {first:?}"
    1731            0 :                     );
    1732          433 :                 }
    1733          433 : 
    1734          433 :                 let decided = init::reconcile(
    1735          433 :                     discovered_layers,
    1736          433 :                     index_part.as_ref(),
    1737          433 :                     disk_consistent_lsn,
    1738          433 :                     generation,
    1739          433 :                     shard,
    1740          433 :                 );
    1741          433 : 
    1742          433 :                 let mut loaded_layers = Vec::new();
    1743          433 :                 let mut needs_cleanup = Vec::new();
    1744          433 :                 let mut total_physical_size = 0;
    1745              : 
    1746        54013 :                 for (name, decision) in decided {
    1747        53580 :                     let decision = match decision {
    1748        11910 :                         Ok(UseRemote { local, remote }) => {
    1749        11910 :                             // Remote is authoritative, but we may still choose to retain
    1750        11910 :                             // the local file if the contents appear to match
    1751        11910 :                             if local.file_size() == remote.file_size() {
    1752              :                                 // Use the local file, but take the remote metadata so that we pick up
    1753              :                                 // the correct generation.
    1754        11909 :                                 UseLocal(remote)
    1755              :                             } else {
    1756            1 :                                 path.push(name.file_name());
    1757            1 :                                 init::cleanup_local_file_for_remote(&path, &local, &remote)?;
    1758            1 :                                 path.pop();
    1759            1 :                                 UseRemote { local, remote }
    1760              :                             }
    1761              :                         }
    1762        41277 :                         Ok(decision) => decision,
    1763            1 :                         Err(DismissedLayer::Future { local }) => {
    1764            1 :                             if local.is_some() {
    1765            0 :                                 path.push(name.file_name());
    1766            0 :                                 init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
    1767            0 :                                 path.pop();
    1768            1 :                             }
    1769            1 :                             needs_cleanup.push(name);
    1770            1 :                             continue;
    1771              :                         }
    1772          392 :                         Err(DismissedLayer::LocalOnly(local)) => {
    1773          392 :                             path.push(name.file_name());
    1774          392 :                             init::cleanup_local_only_file(&path, &name, &local)?;
    1775          392 :                             path.pop();
    1776          392 :                             // this file never existed remotely, we will have to do rework
    1777          392 :                             continue;
    1778              :                         }
    1779              :                     };
    1780              : 
    1781        53187 :                     match &name {
    1782        23047 :                         Delta(d) => assert!(d.lsn_range.end <= disk_consistent_lsn + 1),
    1783        30140 :                         Image(i) => assert!(i.lsn <= disk_consistent_lsn),
    1784              :                     }
    1785              : 
    1786        53187 :                     tracing::debug!(layer=%name, ?decision, "applied");
    1787              : 
    1788        53187 :                     let layer = match decision {
    1789        12428 :                         UseLocal(m) => {
    1790        12428 :                             total_physical_size += m.file_size();
    1791        12428 :                             Layer::for_resident(conf, &this, name, m).drop_eviction_guard()
    1792              :                         }
    1793        40758 :                         Evicted(remote) | UseRemote { remote, .. } => {
    1794        40759 :                             Layer::for_evicted(conf, &this, name, remote)
    1795              :                         }
    1796              :                     };
    1797              : 
    1798        53187 :                     loaded_layers.push(layer);
    1799              :                 }
    1800          433 :                 Ok((loaded_layers, needs_cleanup, total_physical_size))
    1801          433 :             }
    1802          433 :         })
    1803          433 :         .await
    1804          433 :         .map_err(anyhow::Error::new)
    1805          433 :         .and_then(|x| x)?;
    1806              : 
    1807          433 :         let num_layers = loaded_layers.len();
    1808          433 : 
    1809          433 :         guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
    1810              : 
    1811          433 :         if let Some(rtc) = self.remote_client.as_ref() {
    1812          433 :             rtc.schedule_layer_file_deletion(&needs_cleanup)?;
    1813          433 :             rtc.schedule_index_upload_for_file_changes()?;
    1814              :             // This barrier orders above DELETEs before any later operations.
    1815              :             // This is critical because code executing after the barrier might
    1816              :             // create again objects with the same key that we just scheduled for deletion.
    1817              :             // For example, if we just scheduled deletion of an image layer "from the future",
    1818              :             // later compaction might run again and re-create the same image layer.
    1819              :             // "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
    1820              :             // "same" here means same key range and LSN.
    1821              :             //
    1822              :             // Without a barrier between above DELETEs and the re-creation's PUTs,
    1823              :             // the upload queue may execute the PUT first, then the DELETE.
    1824              :             // In our example, we will end up with an IndexPart referencing a non-existent object.
    1825              :             //
    1826              :             // 1. a future image layer is created and uploaded
    1827              :             // 2. ps restart
    1828              :             // 3. the future layer from (1) is deleted during load layer map
    1829              :             // 4. image layer is re-created and uploaded
    1830              :             // 5. deletion queue would like to delete (1) but actually deletes (4)
    1831              :             // 6. delete by name works as expected, but it now deletes the wrong (later) version
    1832              :             //
    1833              :             // See https://github.com/neondatabase/neon/issues/5878
    1834              :             //
    1835              :             // NB: generation numbers naturally protect against this because they disambiguate
    1836              :             //     (1) and (4)
    1837          433 :             rtc.schedule_barrier()?;
    1838              :             // Tenant::create_timeline will wait for these uploads to happen before returning, or
    1839              :             // on retry.
    1840            0 :         }
    1841              : 
    1842          433 :         info!(
    1843          433 :             "loaded layer map with {} layers at {}, total physical size: {}",
    1844          433 :             num_layers, disk_consistent_lsn, total_physical_size
    1845          433 :         );
    1846              : 
    1847          433 :         timer.stop_and_record();
    1848          433 :         Ok(())
    1849          433 :     }
    1850              : 
    1851              :     /// Retrieve current logical size of the timeline.
    1852              :     ///
    1853              :     /// The size could be lagging behind the actual number, in case
    1854              :     /// the initial size calculation has not been run (gets triggered on the first size access).
    1855              :     ///
    1856              :     /// return size and boolean flag that shows if the size is exact
    1857       705403 :     pub(crate) fn get_current_logical_size(
    1858       705403 :         self: &Arc<Self>,
    1859       705403 :         priority: GetLogicalSizePriority,
    1860       705403 :         ctx: &RequestContext,
    1861       705403 :     ) -> logical_size::CurrentLogicalSize {
    1862       705403 :         if !self.tenant_shard_id.is_zero() {
    1863              :             // Logical size is only accurately maintained on shard zero: when called elsewhere, for example
    1864              :             // when HTTP API is serving a GET for timeline zero, return zero
    1865          183 :             return logical_size::CurrentLogicalSize::Approximate(logical_size::Approximate::zero());
    1866       705220 :         }
    1867       705220 : 
    1868       705220 :         let current_size = self.current_logical_size.current_size();
    1869       705220 :         debug!("Current size: {current_size:?}");
    1870              : 
    1871       705220 :         match (current_size.accuracy(), priority) {
    1872       704506 :             (logical_size::Accuracy::Exact, _) => (), // nothing to do
    1873          135 :             (logical_size::Accuracy::Approximate, GetLogicalSizePriority::Background) => {
    1874          135 :                 // background task will eventually deliver an exact value, we're in no rush
    1875          135 :             }
    1876              :             (logical_size::Accuracy::Approximate, GetLogicalSizePriority::User) => {
    1877              :                 // background task is not ready, but user is asking for it now;
    1878              :                 // => make the background task skip the line
    1879              :                 // (The alternative would be to calculate the size here, but,
    1880              :                 //  it can actually take a long time if the user has a lot of rels.
    1881              :                 //  And we'll inevitable need it again; So, let the background task do the work.)
    1882          579 :                 match self
    1883          579 :                     .current_logical_size
    1884          579 :                     .cancel_wait_for_background_loop_concurrency_limit_semaphore
    1885          579 :                     .get()
    1886              :                 {
    1887          579 :                     Some(cancel) => cancel.cancel(),
    1888              :                     None => {
    1889            0 :                         let state = self.current_state();
    1890            0 :                         if matches!(
    1891            0 :                             state,
    1892              :                             TimelineState::Broken { .. } | TimelineState::Stopping
    1893            0 :                         ) {
    1894            0 : 
    1895            0 :                             // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
    1896            0 :                             // Don't make noise.
    1897            0 :                         } else {
    1898            0 :                             warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work");
    1899              :                         }
    1900              :                     }
    1901              :                 };
    1902              :             }
    1903              :         }
    1904              : 
    1905       705220 :         if let CurrentLogicalSize::Approximate(_) = &current_size {
    1906          714 :             if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler {
    1907          303 :                 let first = self
    1908          303 :                     .current_logical_size
    1909          303 :                     .did_return_approximate_to_walreceiver
    1910          303 :                     .compare_exchange(
    1911          303 :                         false,
    1912          303 :                         true,
    1913          303 :                         AtomicOrdering::Relaxed,
    1914          303 :                         AtomicOrdering::Relaxed,
    1915          303 :                     )
    1916          303 :                     .is_ok();
    1917          303 :                 if first {
    1918           59 :                     crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc();
    1919          244 :                 }
    1920          411 :             }
    1921       704506 :         }
    1922              : 
    1923       705220 :         current_size
    1924       705403 :     }
    1925              : 
    1926         1206 :     fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &RequestContext) {
    1927         1206 :         let Some(initial_part_end) = self.current_logical_size.initial_part_end else {
    1928              :             // nothing to do for freshly created timelines;
    1929          570 :             assert_eq!(
    1930          570 :                 self.current_logical_size.current_size().accuracy(),
    1931          570 :                 logical_size::Accuracy::Exact,
    1932          570 :             );
    1933          570 :             self.current_logical_size.initialized.add_permits(1);
    1934          570 :             return;
    1935              :         };
    1936              : 
    1937          636 :         let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new();
    1938          636 :         let token = cancel_wait_for_background_loop_concurrency_limit_semaphore.clone();
    1939          636 :         self.current_logical_size
    1940          636 :             .cancel_wait_for_background_loop_concurrency_limit_semaphore.set(token)
    1941          636 :             .expect("initial logical size calculation task must be spawned exactly once per Timeline object");
    1942          636 : 
    1943          636 :         let self_clone = Arc::clone(self);
    1944          636 :         let background_ctx = ctx.detached_child(
    1945          636 :             TaskKind::InitialLogicalSizeCalculation,
    1946          636 :             DownloadBehavior::Download,
    1947          636 :         );
    1948          636 :         task_mgr::spawn(
    1949          636 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    1950          636 :             task_mgr::TaskKind::InitialLogicalSizeCalculation,
    1951          636 :             Some(self.tenant_shard_id),
    1952          636 :             Some(self.timeline_id),
    1953          636 :             "initial size calculation",
    1954              :             false,
    1955              :             // NB: don't log errors here, task_mgr will do that.
    1956          636 :             async move {
    1957          636 :                 let cancel = task_mgr::shutdown_token();
    1958          636 :                 self_clone
    1959          636 :                     .initial_logical_size_calculation_task(
    1960          636 :                         initial_part_end,
    1961          636 :                         cancel_wait_for_background_loop_concurrency_limit_semaphore,
    1962          636 :                         cancel,
    1963          636 :                         background_ctx,
    1964          636 :                     )
    1965        76082 :                     .await;
    1966          633 :                 Ok(())
    1967          633 :             }
    1968          636 :             .instrument(info_span!(parent: None, "initial_size_calculation", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id)),
    1969              :         );
    1970         1206 :     }
    1971              : 
    1972          636 :     async fn initial_logical_size_calculation_task(
    1973          636 :         self: Arc<Self>,
    1974          636 :         initial_part_end: Lsn,
    1975          636 :         skip_concurrency_limiter: CancellationToken,
    1976          636 :         cancel: CancellationToken,
    1977          636 :         background_ctx: RequestContext,
    1978          636 :     ) {
    1979          633 :         scopeguard::defer! {
    1980          633 :             // Irrespective of the outcome of this operation, we should unblock anyone waiting for it.
    1981          633 :             self.current_logical_size.initialized.add_permits(1);
    1982          633 :         }
    1983              : 
    1984              :         enum BackgroundCalculationError {
    1985              :             Cancelled,
    1986              :             Other(anyhow::Error),
    1987              :         }
    1988              : 
    1989          636 :         let try_once = |attempt: usize| {
    1990          636 :             let background_ctx = &background_ctx;
    1991          636 :             let self_ref = &self;
    1992          636 :             let skip_concurrency_limiter = &skip_concurrency_limiter;
    1993          636 :             async move {
    1994          636 :                 let cancel = task_mgr::shutdown_token();
    1995          636 :                 let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
    1996          636 :                     BackgroundLoopKind::InitialLogicalSizeCalculation,
    1997          636 :                     background_ctx,
    1998          636 :                 );
    1999              : 
    2000              :                 use crate::metrics::initial_logical_size::StartCircumstances;
    2001          636 :                 let (_maybe_permit, circumstances) = tokio::select! {
    2002          200 :                     permit = wait_for_permit => {
    2003              :                         (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit)
    2004              :                     }
    2005              :                     _ = self_ref.cancel.cancelled() => {
    2006              :                         return Err(BackgroundCalculationError::Cancelled);
    2007              :                     }
    2008              :                     _ = cancel.cancelled() => {
    2009              :                         return Err(BackgroundCalculationError::Cancelled);
    2010              :                     },
    2011              :                     () = skip_concurrency_limiter.cancelled() => {
    2012              :                         // Some action that is part of a end user interaction requested logical size
    2013              :                         // => break out of the rate limit
    2014              :                         // TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime;
    2015              :                         // but then again what happens if they cancel; also, we should just be using
    2016              :                         // one runtime across the entire process, so, let's leave this for now.
    2017              :                         (None, StartCircumstances::SkippedConcurrencyLimiter)
    2018              :                     }
    2019              :                 };
    2020              : 
    2021          623 :                 let metrics_guard = if attempt == 1 {
    2022          623 :                     crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances)
    2023              :                 } else {
    2024            0 :                     crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
    2025              :                 };
    2026              : 
    2027          623 :                 match self_ref
    2028          623 :                     .logical_size_calculation_task(
    2029          623 :                         initial_part_end,
    2030          623 :                         LogicalSizeCalculationCause::Initial,
    2031          623 :                         background_ctx,
    2032          623 :                     )
    2033        75577 :                     .await
    2034              :                 {
    2035          589 :                     Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
    2036              :                     Err(CalculateLogicalSizeError::Cancelled) => {
    2037           29 :                         Err(BackgroundCalculationError::Cancelled)
    2038              :                     }
    2039            2 :                     Err(CalculateLogicalSizeError::Other(err)) => {
    2040            1 :                         if let Some(PageReconstructError::AncestorStopping(_)) =
    2041            2 :                             err.root_cause().downcast_ref()
    2042              :                         {
    2043            0 :                             Err(BackgroundCalculationError::Cancelled)
    2044              :                         } else {
    2045            2 :                             Err(BackgroundCalculationError::Other(err))
    2046              :                         }
    2047              :                     }
    2048              :                 }
    2049          633 :             }
    2050          636 :         };
    2051              : 
    2052          636 :         let retrying = async {
    2053          636 :             let mut attempt = 0;
    2054          636 :             loop {
    2055          636 :                 attempt += 1;
    2056          636 : 
    2057        76082 :                 match try_once(attempt).await {
    2058          589 :                     Ok(res) => return ControlFlow::Continue(res),
    2059           42 :                     Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
    2060            2 :                     Err(BackgroundCalculationError::Other(e)) => {
    2061            2 :                         warn!(attempt, "initial size calculation failed: {e:?}");
    2062              :                         // exponential back-off doesn't make sense at these long intervals;
    2063              :                         // use fixed retry interval with generous jitter instead
    2064            2 :                         let sleep_duration = Duration::from_secs(
    2065            2 :                             u64::try_from(
    2066            2 :                                 // 1hour base
    2067            2 :                                 (60_i64 * 60_i64)
    2068            2 :                                     // 10min jitter
    2069            2 :                                     + rand::thread_rng().gen_range(-10 * 60..10 * 60),
    2070            2 :                             )
    2071            2 :                             .expect("10min < 1hour"),
    2072            2 :                         );
    2073            2 :                         tokio::time::sleep(sleep_duration).await;
    2074              :                     }
    2075              :                 }
    2076              :             }
    2077          631 :         };
    2078              : 
    2079          636 :         let (calculated_size, metrics_guard) = tokio::select! {
    2080          631 :             res = retrying  => {
    2081              :                 match res {
    2082              :                     ControlFlow::Continue(calculated_size) => calculated_size,
    2083              :                     ControlFlow::Break(()) => return,
    2084              :                 }
    2085              :             }
    2086              :             _ = cancel.cancelled() => {
    2087              :                 return;
    2088              :             }
    2089              :         };
    2090              : 
    2091              :         // we cannot query current_logical_size.current_size() to know the current
    2092              :         // *negative* value, only truncated to u64.
    2093          589 :         let added = self
    2094          589 :             .current_logical_size
    2095          589 :             .size_added_after_initial
    2096          589 :             .load(AtomicOrdering::Relaxed);
    2097          589 : 
    2098          589 :         let sum = calculated_size.saturating_add_signed(added);
    2099          589 : 
    2100          589 :         // set the gauge value before it can be set in `update_current_logical_size`.
    2101          589 :         self.metrics.current_logical_size_gauge.set(sum);
    2102          589 : 
    2103          589 :         self.current_logical_size
    2104          589 :             .initial_logical_size
    2105          589 :             .set((calculated_size, metrics_guard.calculation_result_saved()))
    2106          589 :             .ok()
    2107          589 :             .expect("only this task sets it");
    2108          633 :     }
    2109              : 
    2110           35 :     pub(crate) fn spawn_ondemand_logical_size_calculation(
    2111           35 :         self: &Arc<Self>,
    2112           35 :         lsn: Lsn,
    2113           35 :         cause: LogicalSizeCalculationCause,
    2114           35 :         ctx: RequestContext,
    2115           35 :     ) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
    2116           35 :         let (sender, receiver) = oneshot::channel();
    2117           35 :         let self_clone = Arc::clone(self);
    2118           35 :         // XXX if our caller loses interest, i.e., ctx is cancelled,
    2119           35 :         // we should stop the size calculation work and return an error.
    2120           35 :         // That would require restructuring this function's API to
    2121           35 :         // return the result directly, instead of a Receiver for the result.
    2122           35 :         let ctx = ctx.detached_child(
    2123           35 :             TaskKind::OndemandLogicalSizeCalculation,
    2124           35 :             DownloadBehavior::Download,
    2125           35 :         );
    2126           35 :         task_mgr::spawn(
    2127           35 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    2128           35 :             task_mgr::TaskKind::OndemandLogicalSizeCalculation,
    2129           35 :             Some(self.tenant_shard_id),
    2130           35 :             Some(self.timeline_id),
    2131           35 :             "ondemand logical size calculation",
    2132           35 :             false,
    2133           35 :             async move {
    2134           35 :                 let res = self_clone
    2135           35 :                     .logical_size_calculation_task(lsn, cause, &ctx)
    2136         1983 :                     .await;
    2137           34 :                 let _ = sender.send(res).ok();
    2138           34 :                 Ok(()) // Receiver is responsible for handling errors
    2139           35 :             }
    2140           35 :             .in_current_span(),
    2141           35 :         );
    2142           35 :         receiver
    2143           35 :     }
    2144              : 
    2145              :     /// # Cancel-Safety
    2146              :     ///
    2147              :     /// This method is cancellation-safe.
    2148         1316 :     #[instrument(skip_all)]
    2149              :     async fn logical_size_calculation_task(
    2150              :         self: &Arc<Self>,
    2151              :         lsn: Lsn,
    2152              :         cause: LogicalSizeCalculationCause,
    2153              :         ctx: &RequestContext,
    2154              :     ) -> Result<u64, CalculateLogicalSizeError> {
    2155              :         crate::span::debug_assert_current_span_has_tenant_and_timeline_id();
    2156              :         // We should never be calculating logical sizes on shard !=0, because these shards do not have
    2157              :         // accurate relation sizes, and they do not emit consumption metrics.
    2158              :         debug_assert!(self.tenant_shard_id.is_zero());
    2159              : 
    2160              :         let _guard = self.gate.enter();
    2161              : 
    2162              :         let self_calculation = Arc::clone(self);
    2163              : 
    2164          658 :         let mut calculation = pin!(async {
    2165          658 :             let ctx = ctx.attached_child();
    2166          658 :             self_calculation
    2167          658 :                 .calculate_logical_size(lsn, cause, &ctx)
    2168        77562 :                 .await
    2169          654 :         });
    2170              : 
    2171        78214 :         tokio::select! {
    2172          653 :             res = &mut calculation => { res }
    2173              :             _ = self.cancel.cancelled() => {
    2174            0 :                 debug!("cancelling logical size calculation for timeline shutdown");
    2175              :                 calculation.await
    2176              :             }
    2177              :             _ = task_mgr::shutdown_watcher() => {
    2178            0 :                 debug!("cancelling logical size calculation for task shutdown");
    2179              :                 calculation.await
    2180              :             }
    2181              :         }
    2182              :     }
    2183              : 
    2184              :     /// Calculate the logical size of the database at the latest LSN.
    2185              :     ///
    2186              :     /// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
    2187              :     /// especially if we need to download remote layers.
    2188              :     ///
    2189              :     /// # Cancel-Safety
    2190              :     ///
    2191              :     /// This method is cancellation-safe.
    2192          665 :     async fn calculate_logical_size(
    2193          665 :         &self,
    2194          665 :         up_to_lsn: Lsn,
    2195          665 :         cause: LogicalSizeCalculationCause,
    2196          665 :         ctx: &RequestContext,
    2197          665 :     ) -> Result<u64, CalculateLogicalSizeError> {
    2198          665 :         info!(
    2199          665 :             "Calculating logical size for timeline {} at {}",
    2200          665 :             self.timeline_id, up_to_lsn
    2201          665 :         );
    2202              :         // These failpoints are used by python tests to ensure that we don't delete
    2203              :         // the timeline while the logical size computation is ongoing.
    2204              :         // The first failpoint is used to make this function pause.
    2205              :         // Then the python test initiates timeline delete operation in a thread.
    2206              :         // It waits for a few seconds, then arms the second failpoint and disables
    2207              :         // the first failpoint. The second failpoint prints an error if the timeline
    2208              :         // delete code has deleted the on-disk state while we're still running here.
    2209              :         // It shouldn't do that. If it does it anyway, the error will be caught
    2210              :         // by the test suite, highlighting the problem.
    2211            0 :         fail::fail_point!("timeline-calculate-logical-size-pause");
    2212          665 :         fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
    2213            2 :             if !self
    2214            2 :                 .conf
    2215            2 :                 .metadata_path(&self.tenant_shard_id, &self.timeline_id)
    2216            2 :                 .exists()
    2217              :             {
    2218            0 :                 error!("timeline-calculate-logical-size-pre metadata file does not exist")
    2219            2 :             }
    2220              :             // need to return something
    2221            2 :             Ok(0)
    2222          665 :         });
    2223              :         // See if we've already done the work for initial size calculation.
    2224              :         // This is a short-cut for timelines that are mostly unused.
    2225          663 :         if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
    2226            6 :             return Ok(size);
    2227          657 :         }
    2228          657 :         let storage_time_metrics = match cause {
    2229              :             LogicalSizeCalculationCause::Initial
    2230              :             | LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
    2231          645 :             | LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
    2232              :             LogicalSizeCalculationCause::EvictionTaskImitation => {
    2233           12 :                 &self.metrics.imitate_logical_size_histo
    2234              :             }
    2235              :         };
    2236          657 :         let timer = storage_time_metrics.start_timer();
    2237          657 :         let logical_size = self
    2238          657 :             .get_current_logical_size_non_incremental(up_to_lsn, ctx)
    2239        78021 :             .await?;
    2240            0 :         debug!("calculated logical size: {logical_size}");
    2241          622 :         timer.stop_and_record();
    2242          622 :         Ok(logical_size)
    2243          661 :     }
    2244              : 
    2245              :     /// Update current logical size, adding `delta' to the old value.
    2246       638850 :     fn update_current_logical_size(&self, delta: i64) {
    2247       638850 :         let logical_size = &self.current_logical_size;
    2248       638850 :         logical_size.increment_size(delta);
    2249       638850 : 
    2250       638850 :         // Also set the value in the prometheus gauge. Note that
    2251       638850 :         // there is a race condition here: if this is is called by two
    2252       638850 :         // threads concurrently, the prometheus gauge might be set to
    2253       638850 :         // one value while current_logical_size is set to the
    2254       638850 :         // other.
    2255       638850 :         match logical_size.current_size() {
    2256       638751 :             CurrentLogicalSize::Exact(ref new_current_size) => self
    2257       638751 :                 .metrics
    2258       638751 :                 .current_logical_size_gauge
    2259       638751 :                 .set(new_current_size.into()),
    2260           99 :             CurrentLogicalSize::Approximate(_) => {
    2261           99 :                 // don't update the gauge yet, this allows us not to update the gauge back and
    2262           99 :                 // forth between the initial size calculation task.
    2263           99 :             }
    2264              :         }
    2265       638850 :     }
    2266              : 
    2267         2244 :     async fn find_layer(&self, layer_file_name: &str) -> Option<Layer> {
    2268         2244 :         let guard = self.layers.read().await;
    2269       784845 :         for historic_layer in guard.layer_map().iter_historic_layers() {
    2270       784845 :             let historic_layer_name = historic_layer.filename().file_name();
    2271       784845 :             if layer_file_name == historic_layer_name {
    2272         2244 :                 return Some(guard.get_from_desc(&historic_layer));
    2273       782601 :             }
    2274              :         }
    2275              : 
    2276            0 :         None
    2277         2244 :     }
    2278              : 
    2279              :     /// The timeline heatmap is a hint to secondary locations from the primary location,
    2280              :     /// indicating which layers are currently on-disk on the primary.
    2281              :     ///
    2282              :     /// None is returned if the Timeline is in a state where uploading a heatmap
    2283              :     /// doesn't make sense, such as shutting down or initializing.  The caller
    2284              :     /// should treat this as a cue to simply skip doing any heatmap uploading
    2285              :     /// for this timeline.
    2286           12 :     pub(crate) async fn generate_heatmap(&self) -> Option<HeatMapTimeline> {
    2287              :         // no point in heatmaps without remote client
    2288           12 :         let _remote_client = self.remote_client.as_ref()?;
    2289              : 
    2290           12 :         if !self.is_active() {
    2291            0 :             return None;
    2292           12 :         }
    2293              : 
    2294           12 :         let guard = self.layers.read().await;
    2295              : 
    2296         2515 :         let resident = guard.resident_layers().map(|layer| {
    2297         2515 :             let last_activity_ts = layer.access_stats().latest_activity_or_now();
    2298         2515 : 
    2299         2515 :             HeatMapLayer::new(
    2300         2515 :                 layer.layer_desc().filename(),
    2301         2515 :                 layer.metadata().into(),
    2302         2515 :                 last_activity_ts,
    2303         2515 :             )
    2304         2515 :         });
    2305              : 
    2306           12 :         let layers = resident.collect().await;
    2307              : 
    2308           12 :         Some(HeatMapTimeline::new(self.timeline_id, layers))
    2309           12 :     }
    2310              : }
    2311              : 
    2312              : type TraversalId = String;
    2313              : 
    2314              : trait TraversalLayerExt {
    2315              :     fn traversal_id(&self) -> TraversalId;
    2316              : }
    2317              : 
    2318              : impl TraversalLayerExt for Layer {
    2319         1452 :     fn traversal_id(&self) -> TraversalId {
    2320         1452 :         self.local_path().to_string()
    2321         1452 :     }
    2322              : }
    2323              : 
    2324              : impl TraversalLayerExt for Arc<InMemoryLayer> {
    2325          332 :     fn traversal_id(&self) -> TraversalId {
    2326          332 :         format!("timeline {} in-memory {self}", self.get_timeline_id())
    2327          332 :     }
    2328              : }
    2329              : 
    2330              : impl Timeline {
    2331              :     ///
    2332              :     /// Get a handle to a Layer for reading.
    2333              :     ///
    2334              :     /// The returned Layer might be from an ancestor timeline, if the
    2335              :     /// segment hasn't been updated on this timeline yet.
    2336              :     ///
    2337              :     /// This function takes the current timeline's locked LayerMap as an argument,
    2338              :     /// so callers can avoid potential race conditions.
    2339              :     ///
    2340              :     /// # Cancel-Safety
    2341              :     ///
    2342              :     /// This method is cancellation-safe.
    2343      7350991 :     async fn get_reconstruct_data(
    2344      7350991 :         &self,
    2345      7350991 :         key: Key,
    2346      7350991 :         request_lsn: Lsn,
    2347      7350991 :         reconstruct_state: &mut ValueReconstructState,
    2348      7350991 :         ctx: &RequestContext,
    2349      7351016 :     ) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
    2350      7351016 :         // Start from the current timeline.
    2351      7351016 :         let mut timeline_owned;
    2352      7351016 :         let mut timeline = self;
    2353      7351016 : 
    2354      7351016 :         let mut read_count = scopeguard::guard(0, |cnt| {
    2355      7351010 :             crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
    2356      7351016 :         });
    2357      7351016 : 
    2358      7351016 :         // For debugging purposes, collect the path of layers that we traversed
    2359      7351016 :         // through. It's included in the error message if we fail to find the key.
    2360      7351016 :         let mut traversal_path = Vec::<TraversalPathItem>::new();
    2361              : 
    2362      7351016 :         let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
    2363      1720526 :             *cached_lsn
    2364              :         } else {
    2365      5630490 :             Lsn(0)
    2366              :         };
    2367              : 
    2368              :         // 'prev_lsn' tracks the last LSN that we were at in our search. It's used
    2369              :         // to check that each iteration make some progress, to break infinite
    2370              :         // looping if something goes wrong.
    2371      7351016 :         let mut prev_lsn = Lsn(u64::MAX);
    2372      7351016 : 
    2373      7351016 :         let mut result = ValueReconstructResult::Continue;
    2374      7351016 :         let mut cont_lsn = Lsn(request_lsn.0 + 1);
    2375              : 
    2376     31616645 :         'outer: loop {
    2377     31616645 :             if self.cancel.is_cancelled() {
    2378           34 :                 return Err(PageReconstructError::Cancelled);
    2379     31616611 :             }
    2380     31616611 : 
    2381     31616611 :             // The function should have updated 'state'
    2382     31616611 :             //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
    2383     31616611 :             match result {
    2384      5661473 :                 ValueReconstructResult::Complete => return Ok(traversal_path),
    2385              :                 ValueReconstructResult::Continue => {
    2386              :                     // If we reached an earlier cached page image, we're done.
    2387     25955133 :                     if cont_lsn == cached_lsn + 1 {
    2388      1688207 :                         MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
    2389      1688207 :                         return Ok(traversal_path);
    2390     24266926 :                     }
    2391     24266926 :                     if prev_lsn <= cont_lsn {
    2392              :                         // Didn't make any progress in last iteration. Error out to avoid
    2393              :                         // getting stuck in the loop.
    2394         1264 :                         return Err(layer_traversal_error(format!(
    2395         1264 :                             "could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
    2396         1264 :                             key,
    2397         1264 :                             Lsn(cont_lsn.0 - 1),
    2398         1264 :                             request_lsn,
    2399         1264 :                             timeline.ancestor_lsn
    2400         1264 :                         ), traversal_path));
    2401     24265662 :                     }
    2402     24265662 :                     prev_lsn = cont_lsn;
    2403              :                 }
    2404              :                 ValueReconstructResult::Missing => {
    2405              :                     return Err(layer_traversal_error(
    2406            5 :                         if cfg!(test) {
    2407            4 :                             format!(
    2408            4 :                                 "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}\n{}",
    2409            4 :                                 key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
    2410            4 :                             )
    2411              :                         } else {
    2412            1 :                             format!(
    2413            1 :                                 "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}",
    2414            1 :                                 key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn
    2415            1 :                             )
    2416              :                         },
    2417            5 :                         traversal_path,
    2418              :                     ));
    2419              :                 }
    2420              :             }
    2421              : 
    2422              :             // Recurse into ancestor if needed
    2423     24265662 :             if is_inherited_key(key) && Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
    2424            0 :                 trace!(
    2425            0 :                     "going into ancestor {}, cont_lsn is {}",
    2426            0 :                     timeline.ancestor_lsn,
    2427            0 :                     cont_lsn
    2428            0 :                 );
    2429              : 
    2430      1298662 :                 timeline_owned = timeline.get_ready_ancestor_timeline(ctx).await?;
    2431      1298658 :                 timeline = &*timeline_owned;
    2432      1298658 :                 prev_lsn = Lsn(u64::MAX);
    2433      1298658 :                 continue 'outer;
    2434     22967000 :             }
    2435              : 
    2436     22967000 :             let guard = timeline.layers.read().await;
    2437     22967000 :             let layers = guard.layer_map();
    2438              : 
    2439              :             // Check the open and frozen in-memory layers first, in order from newest
    2440              :             // to oldest.
    2441     22967000 :             if let Some(open_layer) = &layers.open_layer {
    2442     17747685 :                 let start_lsn = open_layer.get_lsn_range().start;
    2443     17747685 :                 if cont_lsn > start_lsn {
    2444              :                     //info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
    2445              :                     // Get all the data needed to reconstruct the page version from this layer.
    2446              :                     // But if we have an older cached page image, no need to go past that.
    2447      5734821 :                     let lsn_floor = max(cached_lsn + 1, start_lsn);
    2448      5734821 :                     result = match open_layer
    2449      5734821 :                         .get_value_reconstruct_data(
    2450      5734821 :                             key,
    2451      5734821 :                             lsn_floor..cont_lsn,
    2452      5734821 :                             reconstruct_state,
    2453      5734821 :                             ctx,
    2454      5734821 :                         )
    2455       669074 :                         .await
    2456              :                     {
    2457      5734821 :                         Ok(result) => result,
    2458            0 :                         Err(e) => return Err(PageReconstructError::from(e)),
    2459              :                     };
    2460      5734821 :                     cont_lsn = lsn_floor;
    2461      5734821 :                     // metrics: open_layer does not count as fs access, so we are not updating `read_count`
    2462      5734821 :                     traversal_path.push((
    2463      5734821 :                         result,
    2464      5734821 :                         cont_lsn,
    2465      5734821 :                         Box::new({
    2466      5734821 :                             let open_layer = Arc::clone(open_layer);
    2467      5734821 :                             move || open_layer.traversal_id()
    2468      5734821 :                         }),
    2469      5734821 :                     ));
    2470      5734821 :                     continue 'outer;
    2471     12012864 :                 }
    2472      5219315 :             }
    2473     17232179 :             for frozen_layer in layers.frozen_layers.iter().rev() {
    2474       888782 :                 let start_lsn = frozen_layer.get_lsn_range().start;
    2475       888782 :                 if cont_lsn > start_lsn {
    2476              :                     //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
    2477       277852 :                     let lsn_floor = max(cached_lsn + 1, start_lsn);
    2478       277852 :                     result = match frozen_layer
    2479       277852 :                         .get_value_reconstruct_data(
    2480       277852 :                             key,
    2481       277852 :                             lsn_floor..cont_lsn,
    2482       277852 :                             reconstruct_state,
    2483       277852 :                             ctx,
    2484       277852 :                         )
    2485        19683 :                         .await
    2486              :                     {
    2487       277852 :                         Ok(result) => result,
    2488            0 :                         Err(e) => return Err(PageReconstructError::from(e)),
    2489              :                     };
    2490       277852 :                     cont_lsn = lsn_floor;
    2491       277852 :                     // metrics: open_layer does not count as fs access, so we are not updating `read_count`
    2492       277852 :                     traversal_path.push((
    2493       277852 :                         result,
    2494       277852 :                         cont_lsn,
    2495       277852 :                         Box::new({
    2496       277852 :                             let frozen_layer = Arc::clone(frozen_layer);
    2497       277852 :                             move || frozen_layer.traversal_id()
    2498       277852 :                         }),
    2499       277852 :                     ));
    2500       277852 :                     continue 'outer;
    2501       610930 :                 }
    2502              :             }
    2503              : 
    2504     16954327 :             if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
    2505     16818687 :                 let layer = guard.get_from_desc(&layer);
    2506     16818687 :                 // Get all the data needed to reconstruct the page version from this layer.
    2507     16818687 :                 // But if we have an older cached page image, no need to go past that.
    2508     16818687 :                 let lsn_floor = max(cached_lsn + 1, lsn_floor);
    2509     16818687 :                 result = match layer
    2510     16818687 :                     .get_value_reconstruct_data(key, lsn_floor..cont_lsn, reconstruct_state, ctx)
    2511      1002781 :                     .await
    2512              :                 {
    2513     16818658 :                     Ok(result) => result,
    2514           21 :                     Err(e) => return Err(PageReconstructError::from(e)),
    2515              :                 };
    2516     16818658 :                 cont_lsn = lsn_floor;
    2517     16818658 :                 *read_count += 1;
    2518     16818658 :                 traversal_path.push((
    2519     16818658 :                     result,
    2520     16818658 :                     cont_lsn,
    2521     16818658 :                     Box::new({
    2522     16818658 :                         let layer = layer.to_owned();
    2523     16818658 :                         move || layer.traversal_id()
    2524     16818658 :                     }),
    2525     16818658 :                 ));
    2526     16818658 :                 continue 'outer;
    2527       135640 :             } else if timeline.ancestor_timeline.is_some() {
    2528              :                 // Nothing on this timeline. Traverse to parent
    2529       135639 :                 result = ValueReconstructResult::Continue;
    2530       135639 :                 cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
    2531       135639 :                 continue 'outer;
    2532              :             } else {
    2533              :                 // Nothing found
    2534            1 :                 result = ValueReconstructResult::Missing;
    2535            1 :                 continue 'outer;
    2536              :             }
    2537              :         }
    2538      7351008 :     }
    2539              : 
    2540              :     /// # Cancel-safety
    2541              :     ///
    2542              :     /// This method is cancellation-safe.
    2543      7427636 :     async fn lookup_cached_page(
    2544      7427636 :         &self,
    2545      7427636 :         key: &Key,
    2546      7427636 :         lsn: Lsn,
    2547      7427636 :         ctx: &RequestContext,
    2548      7427660 :     ) -> Option<(Lsn, Bytes)> {
    2549      7427660 :         let cache = page_cache::get();
    2550              : 
    2551              :         // FIXME: It's pointless to check the cache for things that are not 8kB pages.
    2552              :         // We should look at the key to determine if it's a cacheable object
    2553      7427660 :         let (lsn, read_guard) = cache
    2554      7427660 :             .lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
    2555      5630490 :             .await?;
    2556      1797170 :         let img = Bytes::from(read_guard.to_vec());
    2557      1797170 :         Some((lsn, img))
    2558      7427660 :     }
    2559              : 
    2560      1298660 :     async fn get_ready_ancestor_timeline(
    2561      1298660 :         &self,
    2562      1298660 :         ctx: &RequestContext,
    2563      1298662 :     ) -> Result<Arc<Timeline>, GetReadyAncestorError> {
    2564      1298662 :         let ancestor = match self.get_ancestor_timeline() {
    2565      1298662 :             Ok(timeline) => timeline,
    2566            0 :             Err(e) => return Err(GetReadyAncestorError::from(e)),
    2567              :         };
    2568              : 
    2569              :         // It's possible that the ancestor timeline isn't active yet, or
    2570              :         // is active but hasn't yet caught up to the branch point. Wait
    2571              :         // for it.
    2572              :         //
    2573              :         // This cannot happen while the pageserver is running normally,
    2574              :         // because you cannot create a branch from a point that isn't
    2575              :         // present in the pageserver yet. However, we don't wait for the
    2576              :         // branch point to be uploaded to cloud storage before creating
    2577              :         // a branch. I.e., the branch LSN need not be remote consistent
    2578              :         // for the branching operation to succeed.
    2579              :         //
    2580              :         // Hence, if we try to load a tenant in such a state where
    2581              :         // 1. the existence of the branch was persisted (in IndexPart and/or locally)
    2582              :         // 2. but the ancestor state is behind branch_lsn because it was not yet persisted
    2583              :         // then we will need to wait for the ancestor timeline to
    2584              :         // re-stream WAL up to branch_lsn before we access it.
    2585              :         //
    2586              :         // How can a tenant get in such a state?
    2587              :         // - ungraceful pageserver process exit
    2588              :         // - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
    2589              :         //
    2590              :         // NB: this could be avoided by requiring
    2591              :         //   branch_lsn >= remote_consistent_lsn
    2592              :         // during branch creation.
    2593      1298662 :         match ancestor.wait_to_become_active(ctx).await {
    2594      1298658 :             Ok(()) => {}
    2595              :             Err(TimelineState::Stopping) => {
    2596            2 :                 return Err(GetReadyAncestorError::AncestorStopping(
    2597            2 :                     ancestor.timeline_id,
    2598            2 :                 ));
    2599              :             }
    2600            2 :             Err(state) => {
    2601            2 :                 return Err(GetReadyAncestorError::Other(anyhow::anyhow!(
    2602            2 :                     "Timeline {} will not become active. Current state: {:?}",
    2603            2 :                     ancestor.timeline_id,
    2604            2 :                     &state,
    2605            2 :                 )));
    2606              :             }
    2607              :         }
    2608      1298658 :         ancestor
    2609      1298658 :             .wait_lsn(self.ancestor_lsn, ctx)
    2610            6 :             .await
    2611      1298658 :             .map_err(|e| match e {
    2612            0 :                 e @ WaitLsnError::Timeout(_) => GetReadyAncestorError::AncestorLsnTimeout(e),
    2613            0 :                 WaitLsnError::Shutdown => GetReadyAncestorError::Cancelled,
    2614            0 :                 e @ WaitLsnError::BadState => GetReadyAncestorError::Other(anyhow::anyhow!(e)),
    2615      1298658 :             })?;
    2616              : 
    2617      1298658 :         Ok(ancestor)
    2618      1298662 :     }
    2619              : 
    2620      1298660 :     fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
    2621      1298660 :         let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
    2622            0 :             format!(
    2623            0 :                 "Ancestor is missing. Timeline id: {} Ancestor id {:?}",
    2624            0 :                 self.timeline_id,
    2625            0 :                 self.get_ancestor_timeline_id(),
    2626            0 :             )
    2627      1298660 :         })?;
    2628      1298660 :         Ok(Arc::clone(ancestor))
    2629      1298660 :     }
    2630              : 
    2631      1136326 :     pub(crate) fn get_shard_identity(&self) -> &ShardIdentity {
    2632      1136326 :         &self.shard_identity
    2633      1136326 :     }
    2634              : 
    2635              :     ///
    2636              :     /// Get a handle to the latest layer for appending.
    2637              :     ///
    2638      2946592 :     async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
    2639      2946592 :         let mut guard = self.layers.write().await;
    2640      2946592 :         let layer = guard
    2641      2946592 :             .get_layer_for_write(
    2642      2946592 :                 lsn,
    2643      2946592 :                 self.get_last_record_lsn(),
    2644      2946592 :                 self.conf,
    2645      2946592 :                 self.timeline_id,
    2646      2946592 :                 self.tenant_shard_id,
    2647      2946592 :             )
    2648          269 :             .await?;
    2649      2946592 :         Ok(layer)
    2650      2946592 :     }
    2651              : 
    2652      1214102 :     async fn put_value(
    2653      1214102 :         &self,
    2654      1214102 :         key: Key,
    2655      1214102 :         lsn: Lsn,
    2656      1214102 :         val: &Value,
    2657      1214102 :         ctx: &RequestContext,
    2658      1214102 :     ) -> anyhow::Result<()> {
    2659              :         //info!("PUT: key {} at {}", key, lsn);
    2660      1214102 :         let layer = self.get_layer_for_write(lsn).await?;
    2661      1214102 :         layer.put_value(key, lsn, val, ctx).await?;
    2662      1214102 :         Ok(())
    2663      1214102 :     }
    2664              : 
    2665      1713238 :     async fn put_values(
    2666      1713238 :         &self,
    2667      1713238 :         values: &HashMap<Key, Vec<(Lsn, Value)>>,
    2668      1713238 :         ctx: &RequestContext,
    2669      1713239 :     ) -> anyhow::Result<()> {
    2670              :         // Pick the first LSN in the batch to get the layer to write to.
    2671      1713239 :         for lsns in values.values() {
    2672      1713239 :             if let Some((lsn, _)) = lsns.first() {
    2673      1713239 :                 let layer = self.get_layer_for_write(*lsn).await?;
    2674      1713239 :                 layer.put_values(values, ctx).await?;
    2675      1713238 :                 break;
    2676            0 :             }
    2677              :         }
    2678      1713238 :         Ok(())
    2679      1713238 :     }
    2680              : 
    2681        19251 :     async fn put_tombstones(&self, tombstones: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
    2682        19251 :         if let Some((_, lsn)) = tombstones.first() {
    2683        19251 :             let layer = self.get_layer_for_write(*lsn).await?;
    2684        19251 :             layer.put_tombstones(tombstones).await?;
    2685            0 :         }
    2686        19251 :         Ok(())
    2687        19251 :     }
    2688              : 
    2689     76398101 :     pub(crate) fn finish_write(&self, new_lsn: Lsn) {
    2690     76398101 :         assert!(new_lsn.is_aligned());
    2691              : 
    2692     76398101 :         self.metrics.last_record_gauge.set(new_lsn.0 as i64);
    2693     76398101 :         self.last_record_lsn.advance(new_lsn);
    2694     76398101 :     }
    2695              : 
    2696         5472 :     async fn freeze_inmem_layer(&self, write_lock_held: bool) {
    2697              :         // Freeze the current open in-memory layer. It will be written to disk on next
    2698              :         // iteration.
    2699         5472 :         let _write_guard = if write_lock_held {
    2700         3520 :             None
    2701              :         } else {
    2702         1952 :             Some(self.write_lock.lock().await)
    2703              :         };
    2704         5472 :         let mut guard = self.layers.write().await;
    2705         5472 :         guard
    2706         5472 :             .try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at)
    2707           10 :             .await;
    2708         5472 :     }
    2709              : 
    2710              :     /// Layer flusher task's main loop.
    2711         1573 :     async fn flush_loop(
    2712         1573 :         self: &Arc<Self>,
    2713         1573 :         mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
    2714         1573 :         ctx: &RequestContext,
    2715         1573 :     ) {
    2716         1573 :         info!("started flush loop");
    2717              :         loop {
    2718         6676 :             tokio::select! {
    2719        12001 :                 _ = self.cancel.cancelled() => {
    2720        12001 :                     info!("shutting down layer flush task");
    2721        12001 :                     break;
    2722        12001 :                 },
    2723        12001 :                 _ = task_mgr::shutdown_watcher() => {
    2724        12001 :                     info!("shutting down layer flush task");
    2725        12001 :                     break;
    2726        12001 :                 },
    2727        12001 :                 _ = layer_flush_start_rx.changed() => {}
    2728        12001 :             }
    2729              : 
    2730            0 :             trace!("waking up");
    2731         5109 :             let timer = self.metrics.flush_time_histo.start_timer();
    2732         5109 :             let flush_counter = *layer_flush_start_rx.borrow();
    2733        10340 :             let result = loop {
    2734        10340 :                 if self.cancel.is_cancelled() {
    2735            0 :                     info!("dropping out of flush loop for timeline shutdown");
    2736              :                     // Note: we do not bother transmitting into [`layer_flush_done_tx`], because
    2737              :                     // anyone waiting on that will respect self.cancel as well: they will stop
    2738              :                     // waiting at the same time we as drop out of this loop.
    2739            0 :                     return;
    2740        10340 :                 }
    2741              : 
    2742        10340 :                 let layer_to_flush = {
    2743        10340 :                     let guard = self.layers.read().await;
    2744        10340 :                     guard.layer_map().frozen_layers.front().cloned()
    2745              :                     // drop 'layers' lock to allow concurrent reads and writes
    2746              :                 };
    2747        10340 :                 let Some(layer_to_flush) = layer_to_flush else {
    2748         5103 :                     break Ok(());
    2749              :                 };
    2750        22239 :                 match self.flush_frozen_layer(layer_to_flush, ctx).await {
    2751         5231 :                     Ok(()) => {}
    2752              :                     Err(FlushLayerError::Cancelled) => {
    2753            1 :                         info!("dropping out of flush loop for timeline shutdown");
    2754            1 :                         return;
    2755              :                     }
    2756            0 :                     err @ Err(
    2757              :                         FlushLayerError::Other(_) | FlushLayerError::CreateImageLayersError(_),
    2758              :                     ) => {
    2759            0 :                         error!("could not flush frozen layer: {err:?}");
    2760            0 :                         break err;
    2761              :                     }
    2762              :                 }
    2763              :             };
    2764              :             // Notify any listeners that we're done
    2765         5103 :             let _ = self
    2766         5103 :                 .layer_flush_done_tx
    2767         5103 :                 .send_replace((flush_counter, result));
    2768         5103 : 
    2769         5103 :             timer.stop_and_record();
    2770              :         }
    2771          591 :     }
    2772              : 
    2773         1952 :     async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
    2774         1952 :         let mut rx = self.layer_flush_done_tx.subscribe();
    2775         1952 : 
    2776         1952 :         // Increment the flush cycle counter and wake up the flush task.
    2777         1952 :         // Remember the new value, so that when we listen for the flush
    2778         1952 :         // to finish, we know when the flush that we initiated has
    2779         1952 :         // finished, instead of some other flush that was started earlier.
    2780         1952 :         let mut my_flush_request = 0;
    2781         1952 : 
    2782         1952 :         let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
    2783         1952 :         if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
    2784           45 :             anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
    2785         1907 :         }
    2786         1907 : 
    2787         1907 :         self.layer_flush_start_tx.send_modify(|counter| {
    2788         1907 :             my_flush_request = *counter + 1;
    2789         1907 :             *counter = my_flush_request;
    2790         1907 :         });
    2791              : 
    2792         3813 :         loop {
    2793         3813 :             {
    2794         3813 :                 let (last_result_counter, last_result) = &*rx.borrow();
    2795         3813 :                 if *last_result_counter >= my_flush_request {
    2796         1906 :                     if let Err(_err) = last_result {
    2797              :                         // We already logged the original error in
    2798              :                         // flush_loop. We cannot propagate it to the caller
    2799              :                         // here, because it might not be Cloneable
    2800            0 :                         anyhow::bail!(
    2801            0 :                             "Could not flush frozen layer. Request id: {}",
    2802            0 :                             my_flush_request
    2803            0 :                         );
    2804              :                     } else {
    2805         1906 :                         return Ok(());
    2806              :                     }
    2807         1907 :                 }
    2808              :             }
    2809            0 :             trace!("waiting for flush to complete");
    2810         1907 :             tokio::select! {
    2811         1906 :                 rx_e = rx.changed() => {
    2812              :                     rx_e?;
    2813              :                 },
    2814              :                 // Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring
    2815              :                 // the notification from [`flush_loop`] that it completed.
    2816              :                 _ = self.cancel.cancelled() => {
    2817            1 :                     tracing::info!("Cancelled layer flush due on timeline shutdown");
    2818              :                     return Ok(())
    2819              :                 }
    2820              :             };
    2821            0 :             trace!("done")
    2822              :         }
    2823         1952 :     }
    2824              : 
    2825         3520 :     fn flush_frozen_layers(&self) {
    2826         3520 :         self.layer_flush_start_tx.send_modify(|val| *val += 1);
    2827         3520 :     }
    2828              : 
    2829              :     /// Flush one frozen in-memory layer to disk, as a new delta layer.
    2830        10474 :     #[instrument(skip_all, fields(layer=%frozen_layer))]
    2831              :     async fn flush_frozen_layer(
    2832              :         self: &Arc<Self>,
    2833              :         frozen_layer: Arc<InMemoryLayer>,
    2834              :         ctx: &RequestContext,
    2835              :     ) -> Result<(), FlushLayerError> {
    2836              :         debug_assert_current_span_has_tenant_and_timeline_id();
    2837              :         // As a special case, when we have just imported an image into the repository,
    2838              :         // instead of writing out a L0 delta layer, we directly write out image layer
    2839              :         // files instead. This is possible as long as *all* the data imported into the
    2840              :         // repository have the same LSN.
    2841              :         let lsn_range = frozen_layer.get_lsn_range();
    2842              :         let (layers_to_upload, delta_layer_to_add) =
    2843              :             if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
    2844              :                 #[cfg(test)]
    2845              :                 match &mut *self.flush_loop_state.lock().unwrap() {
    2846              :                     FlushLoopState::NotStarted | FlushLoopState::Exited => {
    2847              :                         panic!("flush loop not running")
    2848              :                     }
    2849              :                     FlushLoopState::Running {
    2850              :                         initdb_optimization_count,
    2851              :                         ..
    2852              :                     } => {
    2853              :                         *initdb_optimization_count += 1;
    2854              :                     }
    2855              :                 }
    2856              :                 // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
    2857              :                 // require downloading anything during initial import.
    2858              :                 let (partitioning, _lsn) = self
    2859              :                     .repartition(
    2860              :                         self.initdb_lsn,
    2861              :                         self.get_compaction_target_size(),
    2862              :                         EnumSet::empty(),
    2863              :                         ctx,
    2864              :                     )
    2865              :                     .await?;
    2866              : 
    2867              :                 if self.cancel.is_cancelled() {
    2868              :                     return Err(FlushLayerError::Cancelled);
    2869              :                 }
    2870              : 
    2871              :                 // For image layers, we add them immediately into the layer map.
    2872              :                 (
    2873              :                     self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
    2874              :                         .await?,
    2875              :                     None,
    2876              :                 )
    2877              :             } else {
    2878              :                 #[cfg(test)]
    2879              :                 match &mut *self.flush_loop_state.lock().unwrap() {
    2880              :                     FlushLoopState::NotStarted | FlushLoopState::Exited => {
    2881              :                         panic!("flush loop not running")
    2882              :                     }
    2883              :                     FlushLoopState::Running {
    2884              :                         expect_initdb_optimization,
    2885              :                         ..
    2886              :                     } => {
    2887              :                         assert!(!*expect_initdb_optimization, "expected initdb optimization");
    2888              :                     }
    2889              :                 }
    2890              :                 // Normal case, write out a L0 delta layer file.
    2891              :                 // `create_delta_layer` will not modify the layer map.
    2892              :                 // We will remove frozen layer and add delta layer in one atomic operation later.
    2893              :                 let layer = self.create_delta_layer(&frozen_layer, ctx).await?;
    2894              :                 (
    2895              :                     // FIXME: even though we have a single image and single delta layer assumption
    2896              :                     // we push them to vec
    2897              :                     vec![layer.clone()],
    2898              :                     Some(layer),
    2899              :                 )
    2900              :             };
    2901              : 
    2902         5236 :         pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
    2903              : 
    2904              :         if self.cancel.is_cancelled() {
    2905              :             return Err(FlushLayerError::Cancelled);
    2906              :         }
    2907              : 
    2908              :         let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
    2909              :         let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
    2910              : 
    2911              :         // The new on-disk layers are now in the layer map. We can remove the
    2912              :         // in-memory layer from the map now. The flushed layer is stored in
    2913              :         // the mapping in `create_delta_layer`.
    2914              :         let metadata = {
    2915              :             let mut guard = self.layers.write().await;
    2916              : 
    2917              :             if self.cancel.is_cancelled() {
    2918              :                 return Err(FlushLayerError::Cancelled);
    2919              :             }
    2920              : 
    2921              :             guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
    2922              : 
    2923              :             if disk_consistent_lsn != old_disk_consistent_lsn {
    2924              :                 assert!(disk_consistent_lsn > old_disk_consistent_lsn);
    2925              :                 self.disk_consistent_lsn.store(disk_consistent_lsn);
    2926              : 
    2927              :                 // Schedule remote uploads that will reflect our new disk_consistent_lsn
    2928              :                 Some(self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?)
    2929              :             } else {
    2930              :                 None
    2931              :             }
    2932              :             // release lock on 'layers'
    2933              :         };
    2934              : 
    2935              :         // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
    2936              :         // a compaction can delete the file and then it won't be available for uploads any more.
    2937              :         // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
    2938              :         // race situation.
    2939              :         // See https://github.com/neondatabase/neon/issues/4526
    2940         5235 :         pausable_failpoint!("flush-frozen-pausable");
    2941              : 
    2942              :         // This failpoint is used by another test case `test_pageserver_recovery`.
    2943            0 :         fail_point!("flush-frozen-exit");
    2944              : 
    2945              :         // Update the metadata file, with new 'disk_consistent_lsn'
    2946              :         //
    2947              :         // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
    2948              :         // *all* the layers, to avoid fsyncing the file multiple times.
    2949              : 
    2950              :         // If we updated our disk_consistent_lsn, persist the updated metadata to local disk.
    2951              :         if let Some(metadata) = metadata {
    2952              :             save_metadata(
    2953              :                 self.conf,
    2954              :                 &self.tenant_shard_id,
    2955              :                 &self.timeline_id,
    2956              :                 &metadata,
    2957              :             )
    2958              :             .await
    2959              :             .context("save_metadata")?;
    2960              :         }
    2961              :         Ok(())
    2962              :     }
    2963              : 
    2964              :     /// Update metadata file
    2965         5254 :     fn schedule_uploads(
    2966         5254 :         &self,
    2967         5254 :         disk_consistent_lsn: Lsn,
    2968         5254 :         layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
    2969         5254 :     ) -> anyhow::Result<TimelineMetadata> {
    2970         5254 :         // We can only save a valid 'prev_record_lsn' value on disk if we
    2971         5254 :         // flushed *all* in-memory changes to disk. We only track
    2972         5254 :         // 'prev_record_lsn' in memory for the latest processed record, so we
    2973         5254 :         // don't remember what the correct value that corresponds to some old
    2974         5254 :         // LSN is. But if we flush everything, then the value corresponding
    2975         5254 :         // current 'last_record_lsn' is correct and we can store it on disk.
    2976         5254 :         let RecordLsn {
    2977         5254 :             last: last_record_lsn,
    2978         5254 :             prev: prev_record_lsn,
    2979         5254 :         } = self.last_record_lsn.load();
    2980         5254 :         let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
    2981         1616 :             Some(prev_record_lsn)
    2982              :         } else {
    2983         3638 :             None
    2984              :         };
    2985              : 
    2986         5254 :         let ancestor_timeline_id = self
    2987         5254 :             .ancestor_timeline
    2988         5254 :             .as_ref()
    2989         5254 :             .map(|ancestor| ancestor.timeline_id);
    2990         5254 : 
    2991         5254 :         let metadata = TimelineMetadata::new(
    2992         5254 :             disk_consistent_lsn,
    2993         5254 :             ondisk_prev_record_lsn,
    2994         5254 :             ancestor_timeline_id,
    2995         5254 :             self.ancestor_lsn,
    2996         5254 :             *self.latest_gc_cutoff_lsn.read(),
    2997         5254 :             self.initdb_lsn,
    2998         5254 :             self.pg_version,
    2999         5254 :         );
    3000         5254 : 
    3001         5254 :         fail_point!("checkpoint-before-saving-metadata", |x| bail!(
    3002            0 :             "{}",
    3003            0 :             x.unwrap()
    3004         5254 :         ));
    3005              : 
    3006         5254 :         if let Some(remote_client) = &self.remote_client {
    3007        10489 :             for layer in layers_to_upload {
    3008         5235 :                 remote_client.schedule_layer_file_upload(layer)?;
    3009              :             }
    3010         5254 :             remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
    3011            0 :         }
    3012              : 
    3013         5254 :         Ok(metadata)
    3014         5254 :     }
    3015              : 
    3016           19 :     async fn update_metadata_file(
    3017           19 :         &self,
    3018           19 :         disk_consistent_lsn: Lsn,
    3019           19 :         layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
    3020           19 :     ) -> anyhow::Result<()> {
    3021           19 :         let metadata = self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?;
    3022              : 
    3023           19 :         save_metadata(
    3024           19 :             self.conf,
    3025           19 :             &self.tenant_shard_id,
    3026           19 :             &self.timeline_id,
    3027           19 :             &metadata,
    3028           19 :         )
    3029            0 :         .await
    3030           19 :         .context("save_metadata")?;
    3031              : 
    3032           19 :         Ok(())
    3033           19 :     }
    3034              : 
    3035            2 :     pub(crate) async fn preserve_initdb_archive(&self) -> anyhow::Result<()> {
    3036            2 :         if let Some(remote_client) = &self.remote_client {
    3037            2 :             remote_client
    3038            2 :                 .preserve_initdb_archive(
    3039            2 :                     &self.tenant_shard_id.tenant_id,
    3040            2 :                     &self.timeline_id,
    3041            2 :                     &self.cancel,
    3042            2 :                 )
    3043            2 :                 .await?;
    3044              :         } else {
    3045            0 :             bail!("No remote storage configured, but was asked to backup the initdb archive for {} / {}", self.tenant_shard_id.tenant_id, self.timeline_id);
    3046              :         }
    3047            2 :         Ok(())
    3048            2 :     }
    3049              : 
    3050              :     // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
    3051              :     // in layer map immediately. The caller is responsible to put it into the layer map.
    3052         5157 :     async fn create_delta_layer(
    3053         5157 :         self: &Arc<Self>,
    3054         5157 :         frozen_layer: &Arc<InMemoryLayer>,
    3055         5157 :         ctx: &RequestContext,
    3056         5157 :     ) -> anyhow::Result<ResidentLayer> {
    3057         5157 :         let span = tracing::info_span!("blocking");
    3058         5157 :         let new_delta: ResidentLayer = tokio::task::spawn_blocking({
    3059         5157 :             let self_clone = Arc::clone(self);
    3060         5157 :             let frozen_layer = Arc::clone(frozen_layer);
    3061         5157 :             let ctx = ctx.attached_child();
    3062         5157 :             move || {
    3063         5157 :                 // Write it out
    3064         5157 :                 // Keep this inside `spawn_blocking` and `Handle::current`
    3065         5157 :                 // as long as the write path is still sync and the read impl
    3066         5157 :                 // is still not fully async. Otherwise executor threads would
    3067         5157 :                 // be blocked.
    3068         5157 :                 let _g = span.entered();
    3069         5157 :                 let new_delta =
    3070         5157 :                     Handle::current().block_on(frozen_layer.write_to_disk(&self_clone, &ctx))?;
    3071         5157 :                 let new_delta_path = new_delta.local_path().to_owned();
    3072         5157 : 
    3073         5157 :                 // Sync it to disk.
    3074         5157 :                 //
    3075         5157 :                 // We must also fsync the timeline dir to ensure the directory entries for
    3076         5157 :                 // new layer files are durable.
    3077         5157 :                 //
    3078         5157 :                 // NB: timeline dir must be synced _after_ the file contents are durable.
    3079         5157 :                 // So, two separate fsyncs are required, they mustn't be batched.
    3080         5157 :                 //
    3081         5157 :                 // TODO: If we're running inside 'flush_frozen_layers' and there are multiple
    3082         5157 :                 // files to flush, the fsync overhead can be reduces as follows:
    3083         5157 :                 // 1. write them all to temporary file names
    3084         5157 :                 // 2. fsync them
    3085         5157 :                 // 3. rename to the final name
    3086         5157 :                 // 4. fsync the parent directory.
    3087         5157 :                 // Note that (1),(2),(3) today happen inside write_to_disk().
    3088         5157 :                 //
    3089         5157 :                 // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
    3090         5157 :                 par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
    3091         5157 :                 par_fsync::par_fsync(&[self_clone
    3092         5157 :                     .conf
    3093         5157 :                     .timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id)])
    3094         5157 :                 .context("fsync of timeline dir")?;
    3095              : 
    3096         5156 :                 anyhow::Ok(new_delta)
    3097         5157 :             }
    3098         5157 :         })
    3099         5156 :         .await
    3100         5156 :         .context("spawn_blocking")
    3101         5156 :         .and_then(|x| x)?;
    3102              : 
    3103         5156 :         Ok(new_delta)
    3104         5156 :     }
    3105              : 
    3106         1724 :     async fn repartition(
    3107         1724 :         &self,
    3108         1724 :         lsn: Lsn,
    3109         1724 :         partition_size: u64,
    3110         1724 :         flags: EnumSet<CompactFlags>,
    3111         1724 :         ctx: &RequestContext,
    3112         1724 :     ) -> anyhow::Result<(KeyPartitioning, Lsn)> {
    3113         1724 :         {
    3114         1724 :             let partitioning_guard = self.partitioning.lock().unwrap();
    3115         1724 :             let distance = lsn.0 - partitioning_guard.1 .0;
    3116         1724 :             if partitioning_guard.1 != Lsn(0)
    3117         1005 :                 && distance <= self.repartition_threshold
    3118          842 :                 && !flags.contains(CompactFlags::ForceRepartition)
    3119              :             {
    3120            0 :                 debug!(
    3121            0 :                     distance,
    3122            0 :                     threshold = self.repartition_threshold,
    3123            0 :                     "no repartitioning needed"
    3124            0 :                 );
    3125          840 :                 return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
    3126          884 :             }
    3127              :         }
    3128       172295 :         let keyspace = self.collect_keyspace(lsn, ctx).await?;
    3129          881 :         let partitioning = keyspace.partition(partition_size);
    3130          881 : 
    3131          881 :         let mut partitioning_guard = self.partitioning.lock().unwrap();
    3132          881 :         if lsn > partitioning_guard.1 {
    3133          881 :             *partitioning_guard = (partitioning, lsn);
    3134          881 :         } else {
    3135            0 :             warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
    3136              :         }
    3137          881 :         Ok((partitioning_guard.0.clone(), partitioning_guard.1))
    3138         1723 :     }
    3139              : 
    3140              :     // Is it time to create a new image layer for the given partition?
    3141        36566 :     async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
    3142        36566 :         let threshold = self.get_image_creation_threshold();
    3143              : 
    3144        36566 :         let guard = self.layers.read().await;
    3145        36566 :         let layers = guard.layer_map();
    3146        36566 : 
    3147        36566 :         let mut max_deltas = 0;
    3148        36566 :         {
    3149        36566 :             let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
    3150        36566 :             if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
    3151         4218 :                 let img_range =
    3152         4218 :                     partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
    3153         4218 :                 if wanted.overlaps(&img_range) {
    3154              :                     //
    3155              :                     // gc_timeline only pays attention to image layers that are older than the GC cutoff,
    3156              :                     // but create_image_layers creates image layers at last-record-lsn.
    3157              :                     // So it's possible that gc_timeline wants a new image layer to be created for a key range,
    3158              :                     // but the range is already covered by image layers at more recent LSNs. Before we
    3159              :                     // create a new image layer, check if the range is already covered at more recent LSNs.
    3160            0 :                     if !layers
    3161            0 :                         .image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))
    3162              :                     {
    3163            0 :                         debug!(
    3164            0 :                             "Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
    3165            0 :                             img_range.start, img_range.end, cutoff_lsn, lsn
    3166            0 :                         );
    3167            0 :                         return true;
    3168            0 :                     }
    3169         4218 :                 }
    3170        32348 :             }
    3171              :         }
    3172              : 
    3173      2258653 :         for part_range in &partition.ranges {
    3174      2228479 :             let image_coverage = layers.image_coverage(part_range, lsn);
    3175      4288563 :             for (img_range, last_img) in image_coverage {
    3176      2066476 :                 let img_lsn = if let Some(last_img) = last_img {
    3177       363037 :                     last_img.get_lsn_range().end
    3178              :                 } else {
    3179      1703439 :                     Lsn(0)
    3180              :                 };
    3181              :                 // Let's consider an example:
    3182              :                 //
    3183              :                 // delta layer with LSN range 71-81
    3184              :                 // delta layer with LSN range 81-91
    3185              :                 // delta layer with LSN range 91-101
    3186              :                 // image layer at LSN 100
    3187              :                 //
    3188              :                 // If 'lsn' is still 100, i.e. no new WAL has been processed since the last image layer,
    3189              :                 // there's no need to create a new one. We check this case explicitly, to avoid passing
    3190              :                 // a bogus range to count_deltas below, with start > end. It's even possible that there
    3191              :                 // are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
    3192              :                 // after we read last_record_lsn, which is passed here in the 'lsn' argument.
    3193      2066476 :                 if img_lsn < lsn {
    3194      2025978 :                     let num_deltas =
    3195      2025978 :                         layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold));
    3196      2025978 : 
    3197      2025978 :                     max_deltas = max_deltas.max(num_deltas);
    3198      2025978 :                     if num_deltas >= threshold {
    3199            0 :                         debug!(
    3200            0 :                             "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
    3201            0 :                             img_range.start, img_range.end, num_deltas, img_lsn, lsn
    3202            0 :                         );
    3203         6392 :                         return true;
    3204      2019586 :                     }
    3205        40498 :                 }
    3206              :             }
    3207              :         }
    3208              : 
    3209            0 :         debug!(
    3210            0 :             max_deltas,
    3211            0 :             "none of the partitioned ranges had >= {threshold} deltas"
    3212            0 :         );
    3213        30174 :         false
    3214        36566 :     }
    3215              : 
    3216         3440 :     #[tracing::instrument(skip_all, fields(%lsn, %force))]
    3217              :     async fn create_image_layers(
    3218              :         self: &Arc<Timeline>,
    3219              :         partitioning: &KeyPartitioning,
    3220              :         lsn: Lsn,
    3221              :         force: bool,
    3222              :         ctx: &RequestContext,
    3223              :     ) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
    3224              :         let timer = self.metrics.create_images_time_histo.start_timer();
    3225              :         let mut image_layers = Vec::new();
    3226              : 
    3227              :         // We need to avoid holes between generated image layers.
    3228              :         // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
    3229              :         // image layer with hole between them. In this case such layer can not be utilized by GC.
    3230              :         //
    3231              :         // How such hole between partitions can appear?
    3232              :         // if we have relation with relid=1 and size 100 and relation with relid=2 with size 200 then result of
    3233              :         // KeySpace::partition may contain partitions <100000000..100000099> and <200000000..200000199>.
    3234              :         // If there is delta layer <100000000..300000000> then it never be garbage collected because
    3235              :         // image layers  <100000000..100000099> and <200000000..200000199> are not completely covering it.
    3236              :         let mut start = Key::MIN;
    3237              : 
    3238              :         for partition in partitioning.parts.iter() {
    3239              :             let img_range = start..partition.ranges.last().unwrap().end;
    3240              :             start = img_range.end;
    3241              :             if force || self.time_for_new_image_layer(partition, lsn).await {
    3242              :                 let mut image_layer_writer = ImageLayerWriter::new(
    3243              :                     self.conf,
    3244              :                     self.timeline_id,
    3245              :                     self.tenant_shard_id,
    3246              :                     &img_range,
    3247              :                     lsn,
    3248              :                 )
    3249              :                 .await?;
    3250              : 
    3251            0 :                 fail_point!("image-layer-writer-fail-before-finish", |_| {
    3252            0 :                     Err(CreateImageLayersError::Other(anyhow::anyhow!(
    3253            0 :                         "failpoint image-layer-writer-fail-before-finish"
    3254            0 :                     )))
    3255            0 :                 });
    3256              : 
    3257              :                 let mut key_request_accum = KeySpaceAccum::new();
    3258              :                 for range in &partition.ranges {
    3259              :                     let mut key = range.start;
    3260              :                     while key < range.end {
    3261              :                         if self.shard_identity.is_key_disposable(&key) {
    3262            0 :                             debug!(
    3263            0 :                                 "Dropping key {} during compaction (it belongs on shard {:?})",
    3264            0 :                                 key,
    3265            0 :                                 self.shard_identity.get_shard_number(&key)
    3266            0 :                             );
    3267              :                             key = key.next();
    3268              :                             continue;
    3269              :                         }
    3270              : 
    3271              :                         key_request_accum.add_key(key);
    3272              :                         if key_request_accum.size() >= Timeline::MAX_GET_VECTORED_KEYS
    3273              :                             || key.next() == range.end
    3274              :                         {
    3275              :                             let results = self
    3276              :                                 .get_vectored(
    3277              :                                     &key_request_accum.consume_keyspace().ranges,
    3278              :                                     lsn,
    3279              :                                     ctx,
    3280              :                                 )
    3281              :                                 .await?;
    3282              : 
    3283              :                             for (img_key, img) in results {
    3284              :                                 let img = match img {
    3285              :                                     Ok(img) => img,
    3286              :                                     Err(err) => {
    3287              :                                         // If we fail to reconstruct a VM or FSM page, we can zero the
    3288              :                                         // page without losing any actual user data. That seems better
    3289              :                                         // than failing repeatedly and getting stuck.
    3290              :                                         //
    3291              :                                         // We had a bug at one point, where we truncated the FSM and VM
    3292              :                                         // in the pageserver, but the Postgres didn't know about that
    3293              :                                         // and continued to generate incremental WAL records for pages
    3294              :                                         // that didn't exist in the pageserver. Trying to replay those
    3295              :                                         // WAL records failed to find the previous image of the page.
    3296              :                                         // This special case allows us to recover from that situation.
    3297              :                                         // See https://github.com/neondatabase/neon/issues/2601.
    3298              :                                         //
    3299              :                                         // Unfortunately we cannot do this for the main fork, or for
    3300              :                                         // any metadata keys, keys, as that would lead to actual data
    3301              :                                         // loss.
    3302              :                                         if is_rel_fsm_block_key(img_key)
    3303              :                                             || is_rel_vm_block_key(img_key)
    3304              :                                         {
    3305            0 :                                             warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
    3306              :                                             ZERO_PAGE.clone()
    3307              :                                         } else {
    3308              :                                             return Err(
    3309              :                                                 CreateImageLayersError::PageReconstructError(err),
    3310              :                                             );
    3311              :                                         }
    3312              :                                     }
    3313              :                                 };
    3314              : 
    3315              :                                 image_layer_writer.put_image(img_key, img).await?;
    3316              :                             }
    3317              :                         }
    3318              : 
    3319              :                         key = key.next();
    3320              :                     }
    3321              :                 }
    3322              :                 let image_layer = image_layer_writer.finish(self).await?;
    3323              :                 image_layers.push(image_layer);
    3324              :             }
    3325              :         }
    3326              :         // All layers that the GC wanted us to create have now been created.
    3327              :         //
    3328              :         // It's possible that another GC cycle happened while we were compacting, and added
    3329              :         // something new to wanted_image_layers, and we now clear that before processing it.
    3330              :         // That's OK, because the next GC iteration will put it back in.
    3331              :         *self.wanted_image_layers.lock().unwrap() = None;
    3332              : 
    3333              :         // Sync the new layer to disk before adding it to the layer map, to make sure
    3334              :         // we don't garbage collect something based on the new layer, before it has
    3335              :         // reached the disk.
    3336              :         //
    3337              :         // We must also fsync the timeline dir to ensure the directory entries for
    3338              :         // new layer files are durable
    3339              :         //
    3340              :         // Compaction creates multiple image layers. It would be better to create them all
    3341              :         // and fsync them all in parallel.
    3342              :         let all_paths = image_layers
    3343              :             .iter()
    3344         6472 :             .map(|layer| layer.local_path().to_owned())
    3345              :             .collect::<Vec<_>>();
    3346              : 
    3347              :         par_fsync::par_fsync_async(&all_paths)
    3348              :             .await
    3349              :             .context("fsync of newly created layer files")?;
    3350              : 
    3351              :         if !all_paths.is_empty() {
    3352              :             par_fsync::par_fsync_async(&[self
    3353              :                 .conf
    3354              :                 .timeline_path(&self.tenant_shard_id, &self.timeline_id)])
    3355              :             .await
    3356              :             .context("fsync of timeline dir")?;
    3357              :         }
    3358              : 
    3359              :         let mut guard = self.layers.write().await;
    3360              : 
    3361              :         // FIXME: we could add the images to be uploaded *before* returning from here, but right
    3362              :         // now they are being scheduled outside of write lock
    3363              :         guard.track_new_image_layers(&image_layers, &self.metrics);
    3364              :         drop_wlock(guard);
    3365              :         timer.stop_and_record();
    3366              : 
    3367              :         Ok(image_layers)
    3368              :     }
    3369              : 
    3370              :     /// Wait until the background initial logical size calculation is complete, or
    3371              :     /// this Timeline is shut down.  Calling this function will cause the initial
    3372              :     /// logical size calculation to skip waiting for the background jobs barrier.
    3373          256 :     pub(crate) async fn await_initial_logical_size(self: Arc<Self>) {
    3374          256 :         if let Some(await_bg_cancel) = self
    3375          256 :             .current_logical_size
    3376          256 :             .cancel_wait_for_background_loop_concurrency_limit_semaphore
    3377          256 :             .get()
    3378          243 :         {
    3379          243 :             await_bg_cancel.cancel();
    3380          243 :         } else {
    3381              :             // We should not wait if we were not able to explicitly instruct
    3382              :             // the logical size cancellation to skip the concurrency limit semaphore.
    3383              :             // TODO: this is an unexpected case.  We should restructure so that it
    3384              :             // can't happen.
    3385           13 :             tracing::info!(
    3386           13 :                 "await_initial_logical_size: can't get semaphore cancel token, skipping"
    3387           13 :             );
    3388              :         }
    3389              : 
    3390          256 :         tokio::select!(
    3391          495 :             _ = self.current_logical_size.initialized.acquire() => {},
    3392          495 :             _ = self.cancel.cancelled() => {}
    3393          495 :         )
    3394          244 :     }
    3395              : }
    3396              : 
    3397         1344 : #[derive(Default)]
    3398              : struct CompactLevel0Phase1Result {
    3399              :     new_layers: Vec<ResidentLayer>,
    3400              :     deltas_to_compact: Vec<Layer>,
    3401              : }
    3402              : 
    3403              : /// Top-level failure to compact.
    3404            0 : #[derive(Debug, thiserror::Error)]
    3405              : pub(crate) enum CompactionError {
    3406              :     #[error("The timeline or pageserver is shutting down")]
    3407              :     ShuttingDown,
    3408              :     /// Compaction cannot be done right now; page reconstruction and so on.
    3409              :     #[error(transparent)]
    3410              :     Other(#[from] anyhow::Error),
    3411              : }
    3412              : 
    3413              : #[serde_as]
    3414         4116 : #[derive(serde::Serialize)]
    3415              : struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
    3416              : 
    3417        11487 : #[derive(Default)]
    3418              : enum DurationRecorder {
    3419              :     #[default]
    3420              :     NotStarted,
    3421              :     Recorded(RecordedDuration, tokio::time::Instant),
    3422              : }
    3423              : 
    3424              : impl DurationRecorder {
    3425         3115 :     fn till_now(&self) -> DurationRecorder {
    3426         3115 :         match self {
    3427              :             DurationRecorder::NotStarted => {
    3428            0 :                 panic!("must only call on recorded measurements")
    3429              :             }
    3430         3115 :             DurationRecorder::Recorded(_, ended) => {
    3431         3115 :                 let now = tokio::time::Instant::now();
    3432         3115 :                 DurationRecorder::Recorded(RecordedDuration(now - *ended), now)
    3433         3115 :             }
    3434         3115 :         }
    3435         3115 :     }
    3436         2058 :     fn into_recorded(self) -> Option<RecordedDuration> {
    3437         2058 :         match self {
    3438            0 :             DurationRecorder::NotStarted => None,
    3439         2058 :             DurationRecorder::Recorded(recorded, _) => Some(recorded),
    3440              :         }
    3441         2058 :     }
    3442              : }
    3443              : 
    3444         1641 : #[derive(Default)]
    3445              : struct CompactLevel0Phase1StatsBuilder {
    3446              :     version: Option<u64>,
    3447              :     tenant_id: Option<TenantShardId>,
    3448              :     timeline_id: Option<TimelineId>,
    3449              :     read_lock_acquisition_micros: DurationRecorder,
    3450              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    3451              :     read_lock_held_key_sort_micros: DurationRecorder,
    3452              :     read_lock_held_prerequisites_micros: DurationRecorder,
    3453              :     read_lock_held_compute_holes_micros: DurationRecorder,
    3454              :     read_lock_drop_micros: DurationRecorder,
    3455              :     write_layer_files_micros: DurationRecorder,
    3456              :     level0_deltas_count: Option<usize>,
    3457              :     new_deltas_count: Option<usize>,
    3458              :     new_deltas_size: Option<u64>,
    3459              : }
    3460              : 
    3461          294 : #[derive(serde::Serialize)]
    3462              : struct CompactLevel0Phase1Stats {
    3463              :     version: u64,
    3464              :     tenant_id: TenantShardId,
    3465              :     timeline_id: TimelineId,
    3466              :     read_lock_acquisition_micros: RecordedDuration,
    3467              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    3468              :     read_lock_held_key_sort_micros: RecordedDuration,
    3469              :     read_lock_held_prerequisites_micros: RecordedDuration,
    3470              :     read_lock_held_compute_holes_micros: RecordedDuration,
    3471              :     read_lock_drop_micros: RecordedDuration,
    3472              :     write_layer_files_micros: RecordedDuration,
    3473              :     level0_deltas_count: usize,
    3474              :     new_deltas_count: usize,
    3475              :     new_deltas_size: u64,
    3476              : }
    3477              : 
    3478              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    3479              :     type Error = anyhow::Error;
    3480              : 
    3481          294 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    3482          294 :         Ok(Self {
    3483          294 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    3484          294 :             tenant_id: value
    3485          294 :                 .tenant_id
    3486          294 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    3487          294 :             timeline_id: value
    3488          294 :                 .timeline_id
    3489          294 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    3490          294 :             read_lock_acquisition_micros: value
    3491          294 :                 .read_lock_acquisition_micros
    3492          294 :                 .into_recorded()
    3493          294 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    3494          294 :             read_lock_held_spawn_blocking_startup_micros: value
    3495          294 :                 .read_lock_held_spawn_blocking_startup_micros
    3496          294 :                 .into_recorded()
    3497          294 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    3498          294 :             read_lock_held_key_sort_micros: value
    3499          294 :                 .read_lock_held_key_sort_micros
    3500          294 :                 .into_recorded()
    3501          294 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    3502          294 :             read_lock_held_prerequisites_micros: value
    3503          294 :                 .read_lock_held_prerequisites_micros
    3504          294 :                 .into_recorded()
    3505          294 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    3506          294 :             read_lock_held_compute_holes_micros: value
    3507          294 :                 .read_lock_held_compute_holes_micros
    3508          294 :                 .into_recorded()
    3509          294 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    3510          294 :             read_lock_drop_micros: value
    3511          294 :                 .read_lock_drop_micros
    3512          294 :                 .into_recorded()
    3513          294 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    3514          294 :             write_layer_files_micros: value
    3515          294 :                 .write_layer_files_micros
    3516          294 :                 .into_recorded()
    3517          294 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    3518          294 :             level0_deltas_count: value
    3519          294 :                 .level0_deltas_count
    3520          294 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    3521          294 :             new_deltas_count: value
    3522          294 :                 .new_deltas_count
    3523          294 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    3524          294 :             new_deltas_size: value
    3525          294 :                 .new_deltas_size
    3526          294 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    3527              :         })
    3528          294 :     }
    3529              : }
    3530              : 
    3531              : impl Timeline {
    3532              :     /// Level0 files first phase of compaction, explained in the [`Self::compact`] comment.
    3533         1641 :     async fn compact_level0_phase1(
    3534         1641 :         self: &Arc<Self>,
    3535         1641 :         guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
    3536         1641 :         mut stats: CompactLevel0Phase1StatsBuilder,
    3537         1641 :         target_file_size: u64,
    3538         1641 :         ctx: &RequestContext,
    3539         1641 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
    3540         1641 :         stats.read_lock_held_spawn_blocking_startup_micros =
    3541         1641 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
    3542         1641 :         let layers = guard.layer_map();
    3543         1641 :         let level0_deltas = layers.get_level0_deltas()?;
    3544         1641 :         let mut level0_deltas = level0_deltas
    3545         1641 :             .into_iter()
    3546         7543 :             .map(|x| guard.get_from_desc(&x))
    3547         1641 :             .collect_vec();
    3548         1641 :         stats.level0_deltas_count = Some(level0_deltas.len());
    3549         1641 :         // Only compact if enough layers have accumulated.
    3550         1641 :         let threshold = self.get_compaction_threshold();
    3551         1641 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
    3552            0 :             debug!(
    3553            0 :                 level0_deltas = level0_deltas.len(),
    3554            0 :                 threshold, "too few deltas to compact"
    3555            0 :             );
    3556         1344 :             return Ok(CompactLevel0Phase1Result::default());
    3557          297 :         }
    3558          297 : 
    3559          297 :         // This failpoint is used together with `test_duplicate_layers` integration test.
    3560          297 :         // It returns the compaction result exactly the same layers as input to compaction.
    3561          297 :         // We want to ensure that this will not cause any problem when updating the layer map
    3562          297 :         // after the compaction is finished.
    3563          297 :         //
    3564          297 :         // Currently, there are two rare edge cases that will cause duplicated layers being
    3565          297 :         // inserted.
    3566          297 :         // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
    3567          297 :         //    is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
    3568          297 :         //    map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
    3569          297 :         //    point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
    3570          297 :         //    and this causes an overwrite. This is acceptable because the content is the same, and we should do a
    3571          297 :         //    layer replace instead of the normal remove / upload process.
    3572          297 :         // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
    3573          297 :         //    size length. Compaction will likely create the same set of n files afterwards.
    3574          297 :         //
    3575          297 :         // This failpoint is a superset of both of the cases.
    3576          297 :         if cfg!(feature = "testing") {
    3577          297 :             let active = (|| {
    3578          297 :                 ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
    3579          297 :                 false
    3580          297 :             })();
    3581          297 : 
    3582          297 :             if active {
    3583            2 :                 let mut new_layers = Vec::with_capacity(level0_deltas.len());
    3584           30 :                 for delta in &level0_deltas {
    3585              :                     // we are just faking these layers as being produced again for this failpoint
    3586           28 :                     new_layers.push(
    3587           28 :                         delta
    3588           28 :                             .download_and_keep_resident()
    3589            0 :                             .await
    3590           28 :                             .context("download layer for failpoint")?,
    3591              :                     );
    3592              :                 }
    3593            2 :                 tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
    3594            2 :                 return Ok(CompactLevel0Phase1Result {
    3595            2 :                     new_layers,
    3596            2 :                     deltas_to_compact: level0_deltas,
    3597            2 :                 });
    3598          295 :             }
    3599            0 :         }
    3600              : 
    3601              :         // Gather the files to compact in this iteration.
    3602              :         //
    3603              :         // Start with the oldest Level 0 delta file, and collect any other
    3604              :         // level 0 files that form a contiguous sequence, such that the end
    3605              :         // LSN of previous file matches the start LSN of the next file.
    3606              :         //
    3607              :         // Note that if the files don't form such a sequence, we might
    3608              :         // "compact" just a single file. That's a bit pointless, but it allows
    3609              :         // us to get rid of the level 0 file, and compact the other files on
    3610              :         // the next iteration. This could probably made smarter, but such
    3611              :         // "gaps" in the sequence of level 0 files should only happen in case
    3612              :         // of a crash, partial download from cloud storage, or something like
    3613              :         // that, so it's not a big deal in practice.
    3614         8014 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
    3615          295 :         let mut level0_deltas_iter = level0_deltas.iter();
    3616          295 : 
    3617          295 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
    3618          295 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
    3619          295 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
    3620          295 : 
    3621          295 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
    3622         4148 :         for l in level0_deltas_iter {
    3623         3853 :             let lsn_range = &l.layer_desc().lsn_range;
    3624         3853 : 
    3625         3853 :             if lsn_range.start != prev_lsn_end {
    3626            0 :                 break;
    3627         3853 :             }
    3628         3853 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
    3629         3853 :             prev_lsn_end = lsn_range.end;
    3630              :         }
    3631          295 :         let lsn_range = Range {
    3632          295 :             start: deltas_to_compact
    3633          295 :                 .first()
    3634          295 :                 .unwrap()
    3635          295 :                 .layer_desc()
    3636          295 :                 .lsn_range
    3637          295 :                 .start,
    3638          295 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
    3639          295 :         };
    3640              : 
    3641          295 :         info!(
    3642          295 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
    3643          295 :             lsn_range.start,
    3644          295 :             lsn_range.end,
    3645          295 :             deltas_to_compact.len(),
    3646          295 :             level0_deltas.len()
    3647          295 :         );
    3648              : 
    3649         4148 :         for l in deltas_to_compact.iter() {
    3650         4148 :             info!("compact includes {l}");
    3651              :         }
    3652              : 
    3653              :         // We don't need the original list of layers anymore. Drop it so that
    3654              :         // we don't accidentally use it later in the function.
    3655          295 :         drop(level0_deltas);
    3656          295 : 
    3657          295 :         stats.read_lock_held_prerequisites_micros = stats
    3658          295 :             .read_lock_held_spawn_blocking_startup_micros
    3659          295 :             .till_now();
    3660          295 : 
    3661          295 :         // Determine N largest holes where N is number of compacted layers.
    3662          295 :         let max_holes = deltas_to_compact.len();
    3663          295 :         let last_record_lsn = self.get_last_record_lsn();
    3664          295 :         let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
    3665          295 :         let min_hole_coverage_size = 3; // TODO: something more flexible?
    3666          295 : 
    3667          295 :         // min-heap (reserve space for one more element added before eviction)
    3668          295 :         let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
    3669          295 :         let mut prev: Option<Key> = None;
    3670          295 : 
    3671          295 :         let mut all_keys = Vec::new();
    3672              : 
    3673         4148 :         for l in deltas_to_compact.iter() {
    3674         4148 :             all_keys.extend(l.load_keys(ctx).await?);
    3675              :         }
    3676              : 
    3677              :         // FIXME: should spawn_blocking the rest of this function
    3678              : 
    3679              :         // The current stdlib sorting implementation is designed in a way where it is
    3680              :         // particularly fast where the slice is made up of sorted sub-ranges.
    3681    155368940 :         all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    3682          295 : 
    3683          295 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
    3684              : 
    3685     17304139 :         for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
    3686     17304139 :             if let Some(prev_key) = prev {
    3687              :                 // just first fast filter
    3688     17303844 :                 if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
    3689       201481 :                     let key_range = prev_key..next_key;
    3690       201481 :                     // Measuring hole by just subtraction of i128 representation of key range boundaries
    3691       201481 :                     // has not so much sense, because largest holes will corresponds field1/field2 changes.
    3692       201481 :                     // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
    3693       201481 :                     // That is why it is better to measure size of hole as number of covering image layers.
    3694       201481 :                     let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
    3695       201481 :                     if coverage_size >= min_hole_coverage_size {
    3696          213 :                         heap.push(Hole {
    3697          213 :                             key_range,
    3698          213 :                             coverage_size,
    3699          213 :                         });
    3700          213 :                         if heap.len() > max_holes {
    3701          128 :                             heap.pop(); // remove smallest hole
    3702          128 :                         }
    3703       201268 :                     }
    3704     17102363 :                 }
    3705          295 :             }
    3706     17304139 :             prev = Some(next_key.next());
    3707              :         }
    3708          295 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
    3709          295 :         drop_rlock(guard);
    3710          295 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
    3711          295 :         let mut holes = heap.into_vec();
    3712          295 :         holes.sort_unstable_by_key(|hole| hole.key_range.start);
    3713          295 :         let mut next_hole = 0; // index of next hole in holes vector
    3714          295 : 
    3715          295 :         // This iterator walks through all key-value pairs from all the layers
    3716          295 :         // we're compacting, in key, LSN order.
    3717          295 :         let all_values_iter = all_keys.iter();
    3718          295 : 
    3719          295 :         // This iterator walks through all keys and is needed to calculate size used by each key
    3720          295 :         let mut all_keys_iter = all_keys
    3721          295 :             .iter()
    3722     17292588 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
    3723     17292293 :             .coalesce(|mut prev, cur| {
    3724     17292293 :                 // Coalesce keys that belong to the same key pair.
    3725     17292293 :                 // This ensures that compaction doesn't put them
    3726     17292293 :                 // into different layer files.
    3727     17292293 :                 // Still limit this by the target file size,
    3728     17292293 :                 // so that we keep the size of the files in
    3729     17292293 :                 // check.
    3730     17292293 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
    3731     15012062 :                     prev.2 += cur.2;
    3732     15012062 :                     Ok(prev)
    3733              :                 } else {
    3734      2280231 :                     Err((prev, cur))
    3735              :                 }
    3736     17292293 :             });
    3737          295 : 
    3738          295 :         // Merge the contents of all the input delta layers into a new set
    3739          295 :         // of delta layers, based on the current partitioning.
    3740          295 :         //
    3741          295 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
    3742          295 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
    3743          295 :         // would be too large. In that case, we also split on the LSN dimension.
    3744          295 :         //
    3745          295 :         // LSN
    3746          295 :         //  ^
    3747          295 :         //  |
    3748          295 :         //  | +-----------+            +--+--+--+--+
    3749          295 :         //  | |           |            |  |  |  |  |
    3750          295 :         //  | +-----------+            |  |  |  |  |
    3751          295 :         //  | |           |            |  |  |  |  |
    3752          295 :         //  | +-----------+     ==>    |  |  |  |  |
    3753          295 :         //  | |           |            |  |  |  |  |
    3754          295 :         //  | +-----------+            |  |  |  |  |
    3755          295 :         //  | |           |            |  |  |  |  |
    3756          295 :         //  | +-----------+            +--+--+--+--+
    3757          295 :         //  |
    3758          295 :         //  +--------------> key
    3759          295 :         //
    3760          295 :         //
    3761          295 :         // If one key (X) has a lot of page versions:
    3762          295 :         //
    3763          295 :         // LSN
    3764          295 :         //  ^
    3765          295 :         //  |                                 (X)
    3766          295 :         //  | +-----------+            +--+--+--+--+
    3767          295 :         //  | |           |            |  |  |  |  |
    3768          295 :         //  | +-----------+            |  |  +--+  |
    3769          295 :         //  | |           |            |  |  |  |  |
    3770          295 :         //  | +-----------+     ==>    |  |  |  |  |
    3771          295 :         //  | |           |            |  |  +--+  |
    3772          295 :         //  | +-----------+            |  |  |  |  |
    3773          295 :         //  | |           |            |  |  |  |  |
    3774          295 :         //  | +-----------+            +--+--+--+--+
    3775          295 :         //  |
    3776          295 :         //  +--------------> key
    3777          295 :         // TODO: this actually divides the layers into fixed-size chunks, not
    3778          295 :         // based on the partitioning.
    3779          295 :         //
    3780          295 :         // TODO: we should also opportunistically materialize and
    3781          295 :         // garbage collect what we can.
    3782          295 :         let mut new_layers = Vec::new();
    3783          295 :         let mut prev_key: Option<Key> = None;
    3784          295 :         let mut writer: Option<DeltaLayerWriter> = None;
    3785          295 :         let mut key_values_total_size = 0u64;
    3786          295 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
    3787          295 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
    3788              : 
    3789              :         for &DeltaEntry {
    3790     17292587 :             key, lsn, ref val, ..
    3791     17292881 :         } in all_values_iter
    3792              :         {
    3793     17292587 :             let value = val.load(ctx).await?;
    3794     17292586 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
    3795     17292586 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    3796     17292586 :             if !same_key || lsn == dup_end_lsn {
    3797      2280524 :                 let mut next_key_size = 0u64;
    3798      2280524 :                 let is_dup_layer = dup_end_lsn.is_valid();
    3799      2280524 :                 dup_start_lsn = Lsn::INVALID;
    3800      2280524 :                 if !same_key {
    3801      2280486 :                     dup_end_lsn = Lsn::INVALID;
    3802      2280486 :                 }
    3803              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    3804      2280525 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    3805      2280525 :                     next_key_size = next_size;
    3806      2280525 :                     if key != next_key {
    3807      2280192 :                         if dup_end_lsn.is_valid() {
    3808           12 :                             // We are writting segment with duplicates:
    3809           12 :                             // place all remaining values of this key in separate segment
    3810           12 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    3811           12 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    3812      2280180 :                         }
    3813      2280192 :                         break;
    3814          333 :                     }
    3815          333 :                     key_values_total_size += next_size;
    3816          333 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    3817          333 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    3818          333 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    3819              :                         // Split key between multiple layers: such layer can contain only single key
    3820           38 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    3821           25 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    3822              :                         } else {
    3823           13 :                             lsn // start with the first LSN for this key
    3824              :                         };
    3825           38 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    3826           38 :                         break;
    3827          295 :                     }
    3828              :                 }
    3829              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    3830      2280524 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    3831            1 :                     dup_start_lsn = dup_end_lsn;
    3832            1 :                     dup_end_lsn = lsn_range.end;
    3833      2280523 :                 }
    3834      2280524 :                 if writer.is_some() {
    3835      2280229 :                     let written_size = writer.as_mut().unwrap().size();
    3836      2280229 :                     let contains_hole =
    3837      2280229 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    3838              :                     // check if key cause layer overflow or contains hole...
    3839      2280229 :                     if is_dup_layer
    3840      2280179 :                         || dup_end_lsn.is_valid()
    3841      2280168 :                         || written_size + key_values_total_size > target_file_size
    3842      2269869 :                         || contains_hole
    3843              :                     {
    3844              :                         // ... if so, flush previous layer and prepare to write new one
    3845        10443 :                         new_layers.push(
    3846        10443 :                             writer
    3847        10443 :                                 .take()
    3848        10443 :                                 .unwrap()
    3849        10443 :                                 .finish(prev_key.unwrap().next(), self)
    3850         1322 :                                 .await?,
    3851              :                         );
    3852        10443 :                         writer = None;
    3853        10443 : 
    3854        10443 :                         if contains_hole {
    3855           85 :                             // skip hole
    3856           85 :                             next_hole += 1;
    3857        10358 :                         }
    3858      2269786 :                     }
    3859          295 :                 }
    3860              :                 // Remember size of key value because at next iteration we will access next item
    3861      2280524 :                 key_values_total_size = next_key_size;
    3862     15012062 :             }
    3863     17292586 :             if writer.is_none() {
    3864        10738 :                 // Create writer if not initiaized yet
    3865        10738 :                 writer = Some(
    3866              :                     DeltaLayerWriter::new(
    3867        10738 :                         self.conf,
    3868        10738 :                         self.timeline_id,
    3869        10738 :                         self.tenant_shard_id,
    3870        10738 :                         key,
    3871        10738 :                         if dup_end_lsn.is_valid() {
    3872              :                             // this is a layer containing slice of values of the same key
    3873            0 :                             debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    3874           51 :                             dup_start_lsn..dup_end_lsn
    3875              :                         } else {
    3876            0 :                             debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    3877        10687 :                             lsn_range.clone()
    3878              :                         },
    3879              :                     )
    3880           10 :                     .await?,
    3881              :                 );
    3882     17281848 :             }
    3883              : 
    3884     17292586 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    3885            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    3886            0 :                     "failpoint delta-layer-writer-fail-before-finish"
    3887            0 :                 )))
    3888     17292586 :             });
    3889              : 
    3890     17292586 :             if !self.shard_identity.is_key_disposable(&key) {
    3891     17292586 :                 writer.as_mut().unwrap().put_value(key, lsn, value).await?;
    3892              :             } else {
    3893            0 :                 debug!(
    3894            0 :                     "Dropping key {} during compaction (it belongs on shard {:?})",
    3895            0 :                     key,
    3896            0 :                     self.shard_identity.get_shard_number(&key)
    3897            0 :                 );
    3898              :             }
    3899              : 
    3900     17292586 :             if !new_layers.is_empty() {
    3901     14046784 :                 fail_point!("after-timeline-compacted-first-L1");
    3902     14046784 :             }
    3903              : 
    3904     17292586 :             prev_key = Some(key);
    3905              :         }
    3906          294 :         if let Some(writer) = writer {
    3907          294 :             new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
    3908            0 :         }
    3909              : 
    3910              :         // Sync layers
    3911          294 :         if !new_layers.is_empty() {
    3912              :             // Print a warning if the created layer is larger than double the target size
    3913              :             // Add two pages for potential overhead. This should in theory be already
    3914              :             // accounted for in the target calculation, but for very small targets,
    3915              :             // we still might easily hit the limit otherwise.
    3916          294 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    3917        10736 :             for layer in new_layers.iter() {
    3918        10736 :                 if layer.layer_desc().file_size > warn_limit {
    3919            0 :                     warn!(
    3920            0 :                         %layer,
    3921            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    3922            0 :                     );
    3923        10736 :                 }
    3924              :             }
    3925              : 
    3926              :             // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
    3927          294 :             let layer_paths: Vec<Utf8PathBuf> = new_layers
    3928          294 :                 .iter()
    3929        10736 :                 .map(|l| l.local_path().to_owned())
    3930          294 :                 .collect();
    3931          294 : 
    3932          294 :             // Fsync all the layer files and directory using multiple threads to
    3933          294 :             // minimize latency.
    3934          294 :             par_fsync::par_fsync_async(&layer_paths)
    3935          520 :                 .await
    3936          294 :                 .context("fsync all new layers")?;
    3937              : 
    3938          294 :             let timeline_dir = self
    3939          294 :                 .conf
    3940          294 :                 .timeline_path(&self.tenant_shard_id, &self.timeline_id);
    3941          294 : 
    3942          294 :             par_fsync::par_fsync_async(&[timeline_dir])
    3943          324 :                 .await
    3944          294 :                 .context("fsync of timeline dir")?;
    3945            0 :         }
    3946              : 
    3947          294 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    3948          294 :         stats.new_deltas_count = Some(new_layers.len());
    3949        10736 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    3950          294 : 
    3951          294 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    3952          294 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    3953              :         {
    3954          294 :             Ok(stats_json) => {
    3955          294 :                 info!(
    3956          294 :                     stats_json = stats_json.as_str(),
    3957          294 :                     "compact_level0_phase1 stats available"
    3958          294 :                 )
    3959              :             }
    3960            0 :             Err(e) => {
    3961            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    3962              :             }
    3963              :         }
    3964              : 
    3965          294 :         Ok(CompactLevel0Phase1Result {
    3966          294 :             new_layers,
    3967          294 :             deltas_to_compact: deltas_to_compact
    3968          294 :                 .into_iter()
    3969         4133 :                 .map(|x| x.drop_eviction_guard())
    3970          294 :                 .collect::<Vec<_>>(),
    3971          294 :         })
    3972         1640 :     }
    3973              : 
    3974              :     ///
    3975              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
    3976              :     /// as Level 1 files.
    3977              :     ///
    3978         1641 :     async fn compact_level0(
    3979         1641 :         self: &Arc<Self>,
    3980         1641 :         target_file_size: u64,
    3981         1641 :         ctx: &RequestContext,
    3982         1641 :     ) -> Result<(), CompactionError> {
    3983              :         let CompactLevel0Phase1Result {
    3984         1640 :             new_layers,
    3985         1640 :             deltas_to_compact,
    3986              :         } = {
    3987         1641 :             let phase1_span = info_span!("compact_level0_phase1");
    3988         1641 :             let ctx = ctx.attached_child();
    3989         1641 :             let mut stats = CompactLevel0Phase1StatsBuilder {
    3990         1641 :                 version: Some(2),
    3991         1641 :                 tenant_id: Some(self.tenant_shard_id),
    3992         1641 :                 timeline_id: Some(self.timeline_id),
    3993         1641 :                 ..Default::default()
    3994         1641 :             };
    3995         1641 : 
    3996         1641 :             let begin = tokio::time::Instant::now();
    3997         1641 :             let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
    3998         1641 :             let now = tokio::time::Instant::now();
    3999         1641 :             stats.read_lock_acquisition_micros =
    4000         1641 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
    4001         1641 :             self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
    4002         1641 :                 .instrument(phase1_span)
    4003       326194 :                 .await?
    4004              :         };
    4005              : 
    4006         1640 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
    4007              :             // nothing to do
    4008         1344 :             return Ok(());
    4009          296 :         }
    4010              : 
    4011          296 :         let mut guard = self.layers.write().await;
    4012              : 
    4013          296 :         let mut duplicated_layers = HashSet::new();
    4014          296 : 
    4015          296 :         let mut insert_layers = Vec::with_capacity(new_layers.len());
    4016              : 
    4017        11060 :         for l in &new_layers {
    4018        10764 :             if guard.contains(l.as_ref()) {
    4019              :                 // expected in tests
    4020           28 :                 tracing::error!(layer=%l, "duplicated L1 layer");
    4021              : 
    4022              :                 // good ways to cause a duplicate: we repeatedly error after taking the writelock
    4023              :                 // `guard`  on self.layers. as of writing this, there are no error returns except
    4024              :                 // for compact_level0_phase1 creating an L0, which does not happen in practice
    4025              :                 // because we have not implemented L0 => L0 compaction.
    4026           28 :                 duplicated_layers.insert(l.layer_desc().key());
    4027        10736 :             } else if LayerMap::is_l0(l.layer_desc()) {
    4028            0 :                 return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
    4029        10736 :             } else {
    4030        10736 :                 insert_layers.push(l.clone());
    4031        10736 :             }
    4032              :         }
    4033              : 
    4034          296 :         let remove_layers = {
    4035          296 :             let mut deltas_to_compact = deltas_to_compact;
    4036          296 :             // only remove those inputs which were not outputs
    4037         4161 :             deltas_to_compact.retain(|l| !duplicated_layers.contains(&l.layer_desc().key()));
    4038          296 :             deltas_to_compact
    4039          296 :         };
    4040          296 : 
    4041          296 :         // deletion will happen later, the layer file manager calls garbage_collect_on_drop
    4042          296 :         guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
    4043              : 
    4044          296 :         if let Some(remote_client) = self.remote_client.as_ref() {
    4045          296 :             remote_client.schedule_compaction_update(&remove_layers, &new_layers)?;
    4046            0 :         }
    4047              : 
    4048          296 :         drop_wlock(guard);
    4049          296 : 
    4050          296 :         Ok(())
    4051         1640 :     }
    4052              : 
    4053              :     /// Update information about which layer files need to be retained on
    4054              :     /// garbage collection. This is separate from actually performing the GC,
    4055              :     /// and is updated more frequently, so that compaction can remove obsolete
    4056              :     /// page versions more aggressively.
    4057              :     ///
    4058              :     /// TODO: that's wishful thinking, compaction doesn't actually do that
    4059              :     /// currently.
    4060              :     ///
    4061              :     /// The caller specifies how much history is needed with the 3 arguments:
    4062              :     ///
    4063              :     /// retain_lsns: keep a version of each page at these LSNs
    4064              :     /// cutoff_horizon: also keep everything newer than this LSN
    4065              :     /// pitr: the time duration required to keep data for PITR
    4066              :     ///
    4067              :     /// The 'retain_lsns' list is currently used to prevent removing files that
    4068              :     /// are needed by child timelines. In the future, the user might be able to
    4069              :     /// name additional points in time to retain. The caller is responsible for
    4070              :     /// collecting that information.
    4071              :     ///
    4072              :     /// The 'cutoff_horizon' point is used to retain recent versions that might still be
    4073              :     /// needed by read-only nodes. (As of this writing, the caller just passes
    4074              :     /// the latest LSN subtracted by a constant, and doesn't do anything smart
    4075              :     /// to figure out what read-only nodes might actually need.)
    4076              :     ///
    4077              :     /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
    4078              :     /// whether a record is needed for PITR.
    4079              :     ///
    4080              :     /// NOTE: This function holds a short-lived lock to protect the 'gc_info'
    4081              :     /// field, so that the three values passed as argument are stored
    4082              :     /// atomically. But the caller is responsible for ensuring that no new
    4083              :     /// branches are created that would need to be included in 'retain_lsns',
    4084              :     /// for example. The caller should hold `Tenant::gc_cs` lock to ensure
    4085              :     /// that.
    4086              :     ///
    4087         1734 :     #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
    4088              :     pub(super) async fn update_gc_info(
    4089              :         &self,
    4090              :         retain_lsns: Vec<Lsn>,
    4091              :         cutoff_horizon: Lsn,
    4092              :         pitr: Duration,
    4093              :         cancel: &CancellationToken,
    4094              :         ctx: &RequestContext,
    4095              :     ) -> anyhow::Result<()> {
    4096              :         // First, calculate pitr_cutoff_timestamp and then convert it to LSN.
    4097              :         //
    4098              :         // Some unit tests depend on garbage-collection working even when
    4099              :         // CLOG data is missing, so that find_lsn_for_timestamp() doesn't
    4100              :         // work, so avoid calling it altogether if time-based retention is not
    4101              :         // configured. It would be pointless anyway.
    4102              :         let pitr_cutoff = if pitr != Duration::ZERO {
    4103              :             let now = SystemTime::now();
    4104              :             if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
    4105              :                 let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
    4106              : 
    4107              :                 match self
    4108              :                     .find_lsn_for_timestamp(pitr_timestamp, cancel, ctx)
    4109              :                     .await?
    4110              :                 {
    4111              :                     LsnForTimestamp::Present(lsn) => lsn,
    4112              :                     LsnForTimestamp::Future(lsn) => {
    4113              :                         // The timestamp is in the future. That sounds impossible,
    4114              :                         // but what it really means is that there hasn't been
    4115              :                         // any commits since the cutoff timestamp.
    4116            0 :                         debug!("future({})", lsn);
    4117              :                         cutoff_horizon
    4118              :                     }
    4119              :                     LsnForTimestamp::Past(lsn) => {
    4120            0 :                         debug!("past({})", lsn);
    4121              :                         // conservative, safe default is to remove nothing, when we
    4122              :                         // have no commit timestamp data available
    4123              :                         *self.get_latest_gc_cutoff_lsn()
    4124              :                     }
    4125              :                     LsnForTimestamp::NoData(lsn) => {
    4126            0 :                         debug!("nodata({})", lsn);
    4127              :                         // conservative, safe default is to remove nothing, when we
    4128              :                         // have no commit timestamp data available
    4129              :                         *self.get_latest_gc_cutoff_lsn()
    4130              :                     }
    4131              :                 }
    4132              :             } else {
    4133              :                 // If we don't have enough data to convert to LSN,
    4134              :                 // play safe and don't remove any layers.
    4135              :                 *self.get_latest_gc_cutoff_lsn()
    4136              :             }
    4137              :         } else {
    4138              :             // No time-based retention was configured. Set time-based cutoff to
    4139              :             // same as LSN based.
    4140              :             cutoff_horizon
    4141              :         };
    4142              : 
    4143              :         // Grab the lock and update the values
    4144              :         *self.gc_info.write().unwrap() = GcInfo {
    4145              :             retain_lsns,
    4146              :             horizon_cutoff: cutoff_horizon,
    4147              :             pitr_cutoff,
    4148              :         };
    4149              : 
    4150              :         Ok(())
    4151              :     }
    4152              : 
    4153              :     /// Garbage collect layer files on a timeline that are no longer needed.
    4154              :     ///
    4155              :     /// Currently, we don't make any attempt at removing unneeded page versions
    4156              :     /// within a layer file. We can only remove the whole file if it's fully
    4157              :     /// obsolete.
    4158          796 :     pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
    4159          796 :         // this is most likely the background tasks, but it might be the spawned task from
    4160          796 :         // immediate_gc
    4161          796 :         let cancel = crate::task_mgr::shutdown_token();
    4162          796 :         let _g = tokio::select! {
    4163          795 :             guard = self.gc_lock.lock() => guard,
    4164              :             _ = self.cancel.cancelled() => return Ok(GcResult::default()),
    4165              :             _ = cancel.cancelled() => return Ok(GcResult::default()),
    4166              :         };
    4167          795 :         let timer = self.metrics.garbage_collect_histo.start_timer();
    4168              : 
    4169            0 :         fail_point!("before-timeline-gc");
    4170              : 
    4171              :         // Is the timeline being deleted?
    4172          795 :         if self.is_stopping() {
    4173            0 :             anyhow::bail!("timeline is Stopping");
    4174          795 :         }
    4175          795 : 
    4176          795 :         let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
    4177          795 :             let gc_info = self.gc_info.read().unwrap();
    4178          795 : 
    4179          795 :             let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
    4180          795 :             let pitr_cutoff = gc_info.pitr_cutoff;
    4181          795 :             let retain_lsns = gc_info.retain_lsns.clone();
    4182          795 :             (horizon_cutoff, pitr_cutoff, retain_lsns)
    4183          795 :         };
    4184          795 : 
    4185          795 :         let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
    4186              : 
    4187          795 :         let res = self
    4188          795 :             .gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
    4189          795 :             .instrument(
    4190          795 :                 info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
    4191              :             )
    4192          232 :             .await?;
    4193              : 
    4194              :         // only record successes
    4195          795 :         timer.stop_and_record();
    4196          795 : 
    4197          795 :         Ok(res)
    4198          796 :     }
    4199              : 
    4200          795 :     async fn gc_timeline(
    4201          795 :         &self,
    4202          795 :         horizon_cutoff: Lsn,
    4203          795 :         pitr_cutoff: Lsn,
    4204          795 :         retain_lsns: Vec<Lsn>,
    4205          795 :         new_gc_cutoff: Lsn,
    4206          795 :     ) -> anyhow::Result<GcResult> {
    4207          795 :         let now = SystemTime::now();
    4208          795 :         let mut result: GcResult = GcResult::default();
    4209          795 : 
    4210          795 :         // Nothing to GC. Return early.
    4211          795 :         let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
    4212          795 :         if latest_gc_cutoff >= new_gc_cutoff {
    4213           84 :             info!(
    4214           84 :                 "Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
    4215           84 :             );
    4216           84 :             return Ok(result);
    4217          711 :         }
    4218              : 
    4219              :         // We need to ensure that no one tries to read page versions or create
    4220              :         // branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
    4221              :         // for details. This will block until the old value is no longer in use.
    4222              :         //
    4223              :         // The GC cutoff should only ever move forwards.
    4224          711 :         let waitlist = {
    4225          711 :             let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
    4226          711 :             ensure!(
    4227          711 :                 *write_guard <= new_gc_cutoff,
    4228            0 :                 "Cannot move GC cutoff LSN backwards (was {}, new {})",
    4229            0 :                 *write_guard,
    4230              :                 new_gc_cutoff
    4231              :             );
    4232          711 :             write_guard.store_and_unlock(new_gc_cutoff)
    4233          711 :         };
    4234          711 :         waitlist.wait().await;
    4235              : 
    4236          711 :         info!("GC starting");
    4237              : 
    4238            0 :         debug!("retain_lsns: {:?}", retain_lsns);
    4239              : 
    4240          711 :         let mut layers_to_remove = Vec::new();
    4241          711 :         let mut wanted_image_layers = KeySpaceRandomAccum::default();
    4242              : 
    4243              :         // Scan all layers in the timeline (remote or on-disk).
    4244              :         //
    4245              :         // Garbage collect the layer if all conditions are satisfied:
    4246              :         // 1. it is older than cutoff LSN;
    4247              :         // 2. it is older than PITR interval;
    4248              :         // 3. it doesn't need to be retained for 'retain_lsns';
    4249              :         // 4. newer on-disk image layers cover the layer's whole key range
    4250              :         //
    4251              :         // TODO holding a write lock is too agressive and avoidable
    4252          711 :         let mut guard = self.layers.write().await;
    4253          711 :         let layers = guard.layer_map();
    4254        17631 :         'outer: for l in layers.iter_historic_layers() {
    4255        17631 :             result.layers_total += 1;
    4256        17631 : 
    4257        17631 :             // 1. Is it newer than GC horizon cutoff point?
    4258        17631 :             if l.get_lsn_range().end > horizon_cutoff {
    4259            0 :                 debug!(
    4260            0 :                     "keeping {} because it's newer than horizon_cutoff {}",
    4261            0 :                     l.filename(),
    4262            0 :                     horizon_cutoff,
    4263            0 :                 );
    4264         2187 :                 result.layers_needed_by_cutoff += 1;
    4265         2187 :                 continue 'outer;
    4266        15444 :             }
    4267        15444 : 
    4268        15444 :             // 2. It is newer than PiTR cutoff point?
    4269        15444 :             if l.get_lsn_range().end > pitr_cutoff {
    4270            0 :                 debug!(
    4271            0 :                     "keeping {} because it's newer than pitr_cutoff {}",
    4272            0 :                     l.filename(),
    4273            0 :                     pitr_cutoff,
    4274            0 :                 );
    4275           16 :                 result.layers_needed_by_pitr += 1;
    4276           16 :                 continue 'outer;
    4277        15428 :             }
    4278              : 
    4279              :             // 3. Is it needed by a child branch?
    4280              :             // NOTE With that we would keep data that
    4281              :             // might be referenced by child branches forever.
    4282              :             // We can track this in child timeline GC and delete parent layers when
    4283              :             // they are no longer needed. This might be complicated with long inheritance chains.
    4284              :             //
    4285              :             // TODO Vec is not a great choice for `retain_lsns`
    4286        15446 :             for retain_lsn in &retain_lsns {
    4287              :                 // start_lsn is inclusive
    4288          464 :                 if &l.get_lsn_range().start <= retain_lsn {
    4289            0 :                     debug!(
    4290            0 :                         "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
    4291            0 :                         l.filename(),
    4292            0 :                         retain_lsn,
    4293            0 :                         l.is_incremental(),
    4294            0 :                     );
    4295          446 :                     result.layers_needed_by_branches += 1;
    4296          446 :                     continue 'outer;
    4297           18 :                 }
    4298              :             }
    4299              : 
    4300              :             // 4. Is there a later on-disk layer for this relation?
    4301              :             //
    4302              :             // The end-LSN is exclusive, while disk_consistent_lsn is
    4303              :             // inclusive. For example, if disk_consistent_lsn is 100, it is
    4304              :             // OK for a delta layer to have end LSN 101, but if the end LSN
    4305              :             // is 102, then it might not have been fully flushed to disk
    4306              :             // before crash.
    4307              :             //
    4308              :             // For example, imagine that the following layers exist:
    4309              :             //
    4310              :             // 1000      - image (A)
    4311              :             // 1000-2000 - delta (B)
    4312              :             // 2000      - image (C)
    4313              :             // 2000-3000 - delta (D)
    4314              :             // 3000      - image (E)
    4315              :             //
    4316              :             // If GC horizon is at 2500, we can remove layers A and B, but
    4317              :             // we cannot remove C, even though it's older than 2500, because
    4318              :             // the delta layer 2000-3000 depends on it.
    4319        14982 :             if !layers
    4320        14982 :                 .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
    4321              :             {
    4322            0 :                 debug!("keeping {} because it is the latest layer", l.filename());
    4323              :                 // Collect delta key ranges that need image layers to allow garbage
    4324              :                 // collecting the layers.
    4325              :                 // It is not so obvious whether we need to propagate information only about
    4326              :                 // delta layers. Image layers can form "stairs" preventing old image from been deleted.
    4327              :                 // But image layers are in any case less sparse than delta layers. Also we need some
    4328              :                 // protection from replacing recent image layers with new one after each GC iteration.
    4329        13896 :                 if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
    4330            0 :                     wanted_image_layers.add_range(l.get_key_range());
    4331        13896 :                 }
    4332        13896 :                 result.layers_not_updated += 1;
    4333        13896 :                 continue 'outer;
    4334         1086 :             }
    4335              : 
    4336              :             // We didn't find any reason to keep this file, so remove it.
    4337            0 :             debug!(
    4338            0 :                 "garbage collecting {} is_dropped: xx is_incremental: {}",
    4339            0 :                 l.filename(),
    4340            0 :                 l.is_incremental(),
    4341            0 :             );
    4342         1086 :             layers_to_remove.push(l);
    4343              :         }
    4344          711 :         self.wanted_image_layers
    4345          711 :             .lock()
    4346          711 :             .unwrap()
    4347          711 :             .replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
    4348          711 : 
    4349          711 :         if !layers_to_remove.is_empty() {
    4350              :             // Persist the new GC cutoff value in the metadata file, before
    4351              :             // we actually remove anything.
    4352              :             //
    4353              :             // This does not in fact have any effect as we no longer consider local metadata unless
    4354              :             // running without remote storage.
    4355              :             //
    4356              :             // This unconditionally schedules also an index_part.json update, even though, we will
    4357              :             // be doing one a bit later with the unlinked gc'd layers.
    4358              :             //
    4359              :             // TODO: remove when implementing <https://github.com/neondatabase/neon/issues/4099>.
    4360           19 :             self.update_metadata_file(self.disk_consistent_lsn.load(), None)
    4361            0 :                 .await?;
    4362              : 
    4363           19 :             let gc_layers = layers_to_remove
    4364           19 :                 .iter()
    4365         1086 :                 .map(|x| guard.get_from_desc(x))
    4366           19 :                 .collect::<Vec<Layer>>();
    4367           19 : 
    4368           19 :             result.layers_removed = gc_layers.len() as u64;
    4369              : 
    4370           19 :             if let Some(remote_client) = self.remote_client.as_ref() {
    4371           19 :                 remote_client.schedule_gc_update(&gc_layers)?;
    4372            0 :             }
    4373              : 
    4374           19 :             guard.finish_gc_timeline(&gc_layers);
    4375           19 : 
    4376           19 :             #[cfg(feature = "testing")]
    4377           19 :             {
    4378           19 :                 result.doomed_layers = gc_layers;
    4379           19 :             }
    4380          692 :         }
    4381              : 
    4382          711 :         info!(
    4383          711 :             "GC completed removing {} layers, cutoff {}",
    4384          711 :             result.layers_removed, new_gc_cutoff
    4385          711 :         );
    4386              : 
    4387          711 :         result.elapsed = now.elapsed()?;
    4388          711 :         Ok(result)
    4389          795 :     }
    4390              : 
    4391              :     /// Reconstruct a value, using the given base image and WAL records in 'data'.
    4392      7349656 :     async fn reconstruct_value(
    4393      7349656 :         &self,
    4394      7349656 :         key: Key,
    4395      7349656 :         request_lsn: Lsn,
    4396      7349656 :         mut data: ValueReconstructState,
    4397      7349680 :     ) -> Result<Bytes, PageReconstructError> {
    4398      7349680 :         // Perform WAL redo if needed
    4399      7349680 :         data.records.reverse();
    4400      7349680 : 
    4401      7349680 :         // If we have a page image, and no WAL, we're all set
    4402      7349680 :         if data.records.is_empty() {
    4403      5003053 :             if let Some((img_lsn, img)) = &data.img {
    4404            0 :                 trace!(
    4405            0 :                     "found page image for key {} at {}, no WAL redo required, req LSN {}",
    4406            0 :                     key,
    4407            0 :                     img_lsn,
    4408            0 :                     request_lsn,
    4409            0 :                 );
    4410      5003053 :                 Ok(img.clone())
    4411              :             } else {
    4412            0 :                 Err(PageReconstructError::from(anyhow!(
    4413            0 :                     "base image for {key} at {request_lsn} not found"
    4414            0 :                 )))
    4415              :             }
    4416              :         } else {
    4417              :             // We need to do WAL redo.
    4418              :             //
    4419              :             // If we don't have a base image, then the oldest WAL record better initialize
    4420              :             // the page
    4421      2346627 :             if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
    4422            0 :                 Err(PageReconstructError::from(anyhow!(
    4423            0 :                     "Base image for {} at {} not found, but got {} WAL records",
    4424            0 :                     key,
    4425            0 :                     request_lsn,
    4426            0 :                     data.records.len()
    4427            0 :                 )))
    4428              :             } else {
    4429      2346627 :                 if data.img.is_some() {
    4430            0 :                     trace!(
    4431            0 :                         "found {} WAL records and a base image for {} at {}, performing WAL redo",
    4432            0 :                         data.records.len(),
    4433            0 :                         key,
    4434            0 :                         request_lsn
    4435            0 :                     );
    4436              :                 } else {
    4437            0 :                     trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
    4438              :                 };
    4439              : 
    4440      2346627 :                 let last_rec_lsn = data.records.last().unwrap().0;
    4441              : 
    4442      2346627 :                 let img = match self
    4443      2346627 :                     .walredo_mgr
    4444      2346627 :                     .as_ref()
    4445      2346627 :                     .context("timeline has no walredo manager")
    4446      2346627 :                     .map_err(PageReconstructError::WalRedo)?
    4447      2346627 :                     .request_redo(key, request_lsn, data.img, data.records, self.pg_version)
    4448            0 :                     .await
    4449      2346627 :                     .context("reconstruct a page image")
    4450              :                 {
    4451      2346627 :                     Ok(img) => img,
    4452            0 :                     Err(e) => return Err(PageReconstructError::WalRedo(e)),
    4453              :                 };
    4454              : 
    4455      2346627 :                 if img.len() == page_cache::PAGE_SZ {
    4456      2343539 :                     let cache = page_cache::get();
    4457      2343539 :                     if let Err(e) = cache
    4458      2343539 :                         .memorize_materialized_page(
    4459      2343539 :                             self.tenant_shard_id,
    4460      2343539 :                             self.timeline_id,
    4461      2343539 :                             key,
    4462      2343539 :                             last_rec_lsn,
    4463      2343539 :                             &img,
    4464      2343539 :                         )
    4465         8303 :                         .await
    4466      2343539 :                         .context("Materialized page memoization failed")
    4467              :                     {
    4468            0 :                         return Err(PageReconstructError::from(e));
    4469      2343539 :                     }
    4470         3088 :                 }
    4471              : 
    4472      2346627 :                 Ok(img)
    4473              :             }
    4474              :         }
    4475      7349680 :     }
    4476              : 
    4477            2 :     pub(crate) async fn spawn_download_all_remote_layers(
    4478            2 :         self: Arc<Self>,
    4479            2 :         request: DownloadRemoteLayersTaskSpawnRequest,
    4480            2 :     ) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
    4481            2 :         use pageserver_api::models::DownloadRemoteLayersTaskState;
    4482            2 : 
    4483            2 :         // this is not really needed anymore; it has tests which really check the return value from
    4484            2 :         // http api. it would be better not to maintain this anymore.
    4485            2 : 
    4486            2 :         let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
    4487            2 :         if let Some(st) = &*status_guard {
    4488            1 :             match &st.state {
    4489              :                 DownloadRemoteLayersTaskState::Running => {
    4490            0 :                     return Err(st.clone());
    4491              :                 }
    4492              :                 DownloadRemoteLayersTaskState::ShutDown
    4493            1 :                 | DownloadRemoteLayersTaskState::Completed => {
    4494            1 :                     *status_guard = None;
    4495            1 :                 }
    4496              :             }
    4497            1 :         }
    4498              : 
    4499            2 :         let self_clone = Arc::clone(&self);
    4500            2 :         let task_id = task_mgr::spawn(
    4501            2 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    4502            2 :             task_mgr::TaskKind::DownloadAllRemoteLayers,
    4503            2 :             Some(self.tenant_shard_id),
    4504            2 :             Some(self.timeline_id),
    4505            2 :             "download all remote layers task",
    4506              :             false,
    4507            2 :             async move {
    4508            9 :                 self_clone.download_all_remote_layers(request).await;
    4509            2 :                 let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
    4510            2 :                  match &mut *status_guard {
    4511              :                     None => {
    4512            0 :                         warn!("tasks status is supposed to be Some(), since we are running");
    4513              :                     }
    4514            2 :                     Some(st) => {
    4515            2 :                         let exp_task_id = format!("{}", task_mgr::current_task_id().unwrap());
    4516            2 :                         if st.task_id != exp_task_id {
    4517            0 :                             warn!("task id changed while we were still running, expecting {} but have {}", exp_task_id, st.task_id);
    4518            2 :                         } else {
    4519            2 :                             st.state = DownloadRemoteLayersTaskState::Completed;
    4520            2 :                         }
    4521              :                     }
    4522              :                 };
    4523            2 :                 Ok(())
    4524            2 :             }
    4525            2 :             .instrument(info_span!(parent: None, "download_all_remote_layers", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
    4526              :         );
    4527              : 
    4528            2 :         let initial_info = DownloadRemoteLayersTaskInfo {
    4529            2 :             task_id: format!("{task_id}"),
    4530            2 :             state: DownloadRemoteLayersTaskState::Running,
    4531            2 :             total_layer_count: 0,
    4532            2 :             successful_download_count: 0,
    4533            2 :             failed_download_count: 0,
    4534            2 :         };
    4535            2 :         *status_guard = Some(initial_info.clone());
    4536            2 : 
    4537            2 :         Ok(initial_info)
    4538            2 :     }
    4539              : 
    4540            2 :     async fn download_all_remote_layers(
    4541            2 :         self: &Arc<Self>,
    4542            2 :         request: DownloadRemoteLayersTaskSpawnRequest,
    4543            2 :     ) {
    4544              :         use pageserver_api::models::DownloadRemoteLayersTaskState;
    4545              : 
    4546            2 :         let remaining = {
    4547            2 :             let guard = self.layers.read().await;
    4548            2 :             guard
    4549            2 :                 .layer_map()
    4550            2 :                 .iter_historic_layers()
    4551           10 :                 .map(|desc| guard.get_from_desc(&desc))
    4552            2 :                 .collect::<Vec<_>>()
    4553            2 :         };
    4554            2 :         let total_layer_count = remaining.len();
    4555            2 : 
    4556            2 :         macro_rules! lock_status {
    4557            2 :             ($st:ident) => {
    4558            2 :                 let mut st = self.download_all_remote_layers_task_info.write().unwrap();
    4559            2 :                 let st = st
    4560            2 :                     .as_mut()
    4561            2 :                     .expect("this function is only called after the task has been spawned");
    4562            2 :                 assert_eq!(
    4563            2 :                     st.task_id,
    4564            2 :                     format!(
    4565            2 :                         "{}",
    4566            2 :                         task_mgr::current_task_id().expect("we run inside a task_mgr task")
    4567            2 :                     )
    4568            2 :                 );
    4569            2 :                 let $st = st;
    4570            2 :             };
    4571            2 :         }
    4572            2 : 
    4573            2 :         {
    4574            2 :             lock_status!(st);
    4575            2 :             st.total_layer_count = total_layer_count as u64;
    4576            2 :         }
    4577            2 : 
    4578            2 :         let mut remaining = remaining.into_iter();
    4579            2 :         let mut have_remaining = true;
    4580            2 :         let mut js = tokio::task::JoinSet::new();
    4581            2 : 
    4582            2 :         let cancel = task_mgr::shutdown_token();
    4583            2 : 
    4584            2 :         let limit = request.max_concurrent_downloads;
    4585              : 
    4586              :         loop {
    4587           12 :             while js.len() < limit.get() && have_remaining && !cancel.is_cancelled() {
    4588           12 :                 let Some(next) = remaining.next() else {
    4589            2 :                     have_remaining = false;
    4590            2 :                     break;
    4591              :                 };
    4592              : 
    4593           10 :                 let span = tracing::info_span!("download", layer = %next);
    4594              : 
    4595           10 :                 js.spawn(
    4596           10 :                     async move {
    4597           26 :                         let res = next.download().await;
    4598           10 :                         (next, res)
    4599           10 :                     }
    4600           10 :                     .instrument(span),
    4601           10 :                 );
    4602              :             }
    4603              : 
    4604           12 :             while let Some(res) = js.join_next().await {
    4605           10 :                 match res {
    4606              :                     Ok((_, Ok(_))) => {
    4607            5 :                         lock_status!(st);
    4608            5 :                         st.successful_download_count += 1;
    4609              :                     }
    4610            5 :                     Ok((layer, Err(e))) => {
    4611            5 :                         tracing::error!(%layer, "download failed: {e:#}");
    4612            5 :                         lock_status!(st);
    4613            5 :                         st.failed_download_count += 1;
    4614              :                     }
    4615            0 :                     Err(je) if je.is_cancelled() => unreachable!("not used here"),
    4616            0 :                     Err(je) if je.is_panic() => {
    4617            0 :                         lock_status!(st);
    4618            0 :                         st.failed_download_count += 1;
    4619              :                     }
    4620            0 :                     Err(je) => tracing::warn!("unknown joinerror: {je:?}"),
    4621              :                 }
    4622              :             }
    4623              : 
    4624            2 :             if js.is_empty() && (!have_remaining || cancel.is_cancelled()) {
    4625            2 :                 break;
    4626            0 :             }
    4627              :         }
    4628              : 
    4629              :         {
    4630            2 :             lock_status!(st);
    4631            2 :             st.state = DownloadRemoteLayersTaskState::Completed;
    4632            2 :         }
    4633            2 :     }
    4634              : 
    4635           49 :     pub(crate) fn get_download_all_remote_layers_task_info(
    4636           49 :         &self,
    4637           49 :     ) -> Option<DownloadRemoteLayersTaskInfo> {
    4638           49 :         self.download_all_remote_layers_task_info
    4639           49 :             .read()
    4640           49 :             .unwrap()
    4641           49 :             .clone()
    4642           49 :     }
    4643              : }
    4644              : 
    4645              : impl Timeline {
    4646              :     /// Returns non-remote layers for eviction.
    4647           37 :     pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
    4648           37 :         let guard = self.layers.read().await;
    4649           37 :         let mut max_layer_size: Option<u64> = None;
    4650              : 
    4651           37 :         let resident_layers = guard
    4652           37 :             .resident_layers()
    4653          539 :             .map(|layer| {
    4654          539 :                 let file_size = layer.layer_desc().file_size;
    4655          539 :                 max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
    4656          539 : 
    4657          539 :                 let last_activity_ts = layer.access_stats().latest_activity_or_now();
    4658          539 : 
    4659          539 :                 EvictionCandidate {
    4660          539 :                     layer: layer.into(),
    4661          539 :                     last_activity_ts,
    4662          539 :                     relative_last_activity: finite_f32::FiniteF32::ZERO,
    4663          539 :                 }
    4664          539 :             })
    4665           37 :             .collect()
    4666           23 :             .await;
    4667              : 
    4668           37 :         DiskUsageEvictionInfo {
    4669           37 :             max_layer_size,
    4670           37 :             resident_layers,
    4671           37 :         }
    4672           37 :     }
    4673              : 
    4674        22798 :     pub(crate) fn get_shard_index(&self) -> ShardIndex {
    4675        22798 :         ShardIndex {
    4676        22798 :             shard_number: self.tenant_shard_id.shard_number,
    4677        22798 :             shard_count: self.tenant_shard_id.shard_count,
    4678        22798 :         }
    4679        22798 :     }
    4680              : }
    4681              : 
    4682              : type TraversalPathItem = (
    4683              :     ValueReconstructResult,
    4684              :     Lsn,
    4685              :     Box<dyn Send + FnOnce() -> TraversalId>,
    4686              : );
    4687              : 
    4688              : /// Helper function for get_reconstruct_data() to add the path of layers traversed
    4689              : /// to an error, as anyhow context information.
    4690         1269 : fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructError {
    4691         1269 :     // We want the original 'msg' to be the outermost context. The outermost context
    4692         1269 :     // is the most high-level information, which also gets propagated to the client.
    4693         1269 :     let mut msg_iter = path
    4694         1269 :         .into_iter()
    4695         1784 :         .map(|(r, c, l)| {
    4696         1784 :             format!(
    4697         1784 :                 "layer traversal: result {:?}, cont_lsn {}, layer: {}",
    4698         1784 :                 r,
    4699         1784 :                 c,
    4700         1784 :                 l(),
    4701         1784 :             )
    4702         1784 :         })
    4703         1269 :         .chain(std::iter::once(msg));
    4704         1269 :     // Construct initial message from the first traversed layer
    4705         1269 :     let err = anyhow!(msg_iter.next().unwrap());
    4706         1269 : 
    4707         1269 :     // Append all subsequent traversals, and the error message 'msg', as contexts.
    4708         1784 :     let msg = msg_iter.fold(err, |err, msg| err.context(msg));
    4709         1269 :     PageReconstructError::from(msg)
    4710         1269 : }
    4711              : 
    4712              : /// Various functions to mutate the timeline.
    4713              : // TODO Currently, Deref is used to allow easy access to read methods from this trait.
    4714              : // This is probably considered a bad practice in Rust and should be fixed eventually,
    4715              : // but will cause large code changes.
    4716              : pub(crate) struct TimelineWriter<'a> {
    4717              :     tl: &'a Timeline,
    4718              :     _write_guard: tokio::sync::MutexGuard<'a, ()>,
    4719              : }
    4720              : 
    4721              : impl Deref for TimelineWriter<'_> {
    4722              :     type Target = Timeline;
    4723              : 
    4724            0 :     fn deref(&self) -> &Self::Target {
    4725            0 :         self.tl
    4726            0 :     }
    4727              : }
    4728              : 
    4729              : impl<'a> TimelineWriter<'a> {
    4730              :     /// Put a new page version that can be constructed from a WAL record
    4731              :     ///
    4732              :     /// This will implicitly extend the relation, if the page is beyond the
    4733              :     /// current end-of-file.
    4734      1214102 :     pub(crate) async fn put(
    4735      1214102 :         &self,
    4736      1214102 :         key: Key,
    4737      1214102 :         lsn: Lsn,
    4738      1214102 :         value: &Value,
    4739      1214102 :         ctx: &RequestContext,
    4740      1214102 :     ) -> anyhow::Result<()> {
    4741      1214102 :         self.tl.put_value(key, lsn, value, ctx).await
    4742      1214102 :     }
    4743              : 
    4744      1713238 :     pub(crate) async fn put_batch(
    4745      1713238 :         &self,
    4746      1713238 :         batch: &HashMap<Key, Vec<(Lsn, Value)>>,
    4747      1713238 :         ctx: &RequestContext,
    4748      1713239 :     ) -> anyhow::Result<()> {
    4749      1713239 :         self.tl.put_values(batch, ctx).await
    4750      1713238 :     }
    4751              : 
    4752        19251 :     pub(crate) async fn delete_batch(&self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
    4753        19251 :         self.tl.put_tombstones(batch).await
    4754        19251 :     }
    4755              : 
    4756              :     /// Track the end of the latest digested WAL record.
    4757              :     /// Remember the (end of) last valid WAL record remembered in the timeline.
    4758              :     ///
    4759              :     /// Call this after you have finished writing all the WAL up to 'lsn'.
    4760              :     ///
    4761              :     /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for
    4762              :     /// the 'lsn' or anything older. The previous last record LSN is stored alongside
    4763              :     /// the latest and can be read.
    4764     76398101 :     pub(crate) fn finish_write(&self, new_lsn: Lsn) {
    4765     76398101 :         self.tl.finish_write(new_lsn);
    4766     76398101 :     }
    4767              : 
    4768       638850 :     pub(crate) fn update_current_logical_size(&self, delta: i64) {
    4769       638850 :         self.tl.update_current_logical_size(delta)
    4770       638850 :     }
    4771              : }
    4772              : 
    4773              : // We need TimelineWriter to be send in upcoming conversion of
    4774              : // Timeline::layers to tokio::sync::RwLock.
    4775            2 : #[test]
    4776            2 : fn is_send() {
    4777            2 :     fn _assert_send<T: Send>() {}
    4778            2 :     _assert_send::<TimelineWriter<'_>>();
    4779            2 : }
    4780              : 
    4781              : /// Add a suffix to a layer file's name: .{num}.old
    4782              : /// Uses the first available num (starts at 0)
    4783            1 : fn rename_to_backup(path: &Utf8Path) -> anyhow::Result<()> {
    4784            1 :     let filename = path
    4785            1 :         .file_name()
    4786            1 :         .ok_or_else(|| anyhow!("Path {path} don't have a file name"))?;
    4787            1 :     let mut new_path = path.to_owned();
    4788              : 
    4789            1 :     for i in 0u32.. {
    4790            1 :         new_path.set_file_name(format!("{filename}.{i}.old"));
    4791            1 :         if !new_path.exists() {
    4792            1 :             std::fs::rename(path, &new_path)
    4793            1 :                 .with_context(|| format!("rename {path:?} to {new_path:?}"))?;
    4794            1 :             return Ok(());
    4795            0 :         }
    4796              :     }
    4797              : 
    4798            0 :     bail!("couldn't find an unused backup number for {:?}", path)
    4799            1 : }
    4800              : 
    4801              : #[cfg(test)]
    4802              : mod tests {
    4803              :     use utils::{id::TimelineId, lsn::Lsn};
    4804              : 
    4805              :     use crate::tenant::{
    4806              :         harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline,
    4807              :     };
    4808              : 
    4809            2 :     #[tokio::test]
    4810            2 :     async fn two_layer_eviction_attempts_at_the_same_time() {
    4811            2 :         let harness =
    4812            2 :             TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
    4813            2 : 
    4814            2 :         let ctx = any_context();
    4815            2 :         let tenant = harness.try_load(&ctx).await.unwrap();
    4816            2 :         let timeline = tenant
    4817            2 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
    4818            8 :             .await
    4819            2 :             .unwrap();
    4820            2 : 
    4821            2 :         let layer = find_some_layer(&timeline).await;
    4822            2 :         let layer = layer
    4823            2 :             .keep_resident()
    4824            2 :             .await
    4825            2 :             .expect("no download => no downloading errors")
    4826            2 :             .expect("should had been resident")
    4827            2 :             .drop_eviction_guard();
    4828            2 : 
    4829            2 :         let first = async { layer.evict_and_wait().await };
    4830            2 :         let second = async { layer.evict_and_wait().await };
    4831            2 : 
    4832            4 :         let (first, second) = tokio::join!(first, second);
    4833            2 : 
    4834            5 :         let res = layer.keep_resident().await;
    4835            2 :         assert!(matches!(res, Ok(None)), "{res:?}");
    4836            2 : 
    4837            2 :         match (first, second) {
    4838            2 :             (Ok(()), Ok(())) => {
    4839            1 :                 // because there are no more timeline locks being taken on eviction path, we can
    4840            1 :                 // witness all three outcomes here.
    4841            1 :             }
    4842            2 :             (Ok(()), Err(EvictionError::NotFound)) | (Err(EvictionError::NotFound), Ok(())) => {
    4843            1 :                 // if one completes before the other, this is fine just as well.
    4844            1 :             }
    4845            2 :             other => unreachable!("unexpected {:?}", other),
    4846            2 :         }
    4847            2 :     }
    4848              : 
    4849            2 :     fn any_context() -> crate::context::RequestContext {
    4850            2 :         use crate::context::*;
    4851            2 :         use crate::task_mgr::*;
    4852            2 :         RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
    4853            2 :     }
    4854              : 
    4855            2 :     async fn find_some_layer(timeline: &Timeline) -> Layer {
    4856            2 :         let layers = timeline.layers.read().await;
    4857            2 :         let desc = layers
    4858            2 :             .layer_map()
    4859            2 :             .iter_historic_layers()
    4860            2 :             .next()
    4861            2 :             .expect("must find one layer to evict");
    4862            2 : 
    4863            2 :         layers.get_from_desc(&desc)
    4864            2 :     }
    4865              : }
        

Generated by: LCOV version 2.1-beta