LCOV - code coverage report
Current view: top level - pageserver/src/tenant - timeline.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 89.2 % 2871 2562
Test Date: 2024-02-14 18:05:35 Functions: 67.2 % 412 277

            Line data    Source code
       1              : pub mod delete;
       2              : mod eviction_task;
       3              : mod init;
       4              : pub mod layer_manager;
       5              : pub(crate) mod logical_size;
       6              : pub mod span;
       7              : pub mod uninit;
       8              : mod walreceiver;
       9              : 
      10              : use anyhow::{anyhow, bail, ensure, Context, Result};
      11              : use bytes::Bytes;
      12              : use camino::{Utf8Path, Utf8PathBuf};
      13              : use enumset::EnumSet;
      14              : use fail::fail_point;
      15              : use futures::stream::StreamExt;
      16              : use itertools::Itertools;
      17              : use once_cell::sync::Lazy;
      18              : use pageserver_api::{
      19              :     keyspace::{key_range_size, KeySpaceAccum},
      20              :     models::{
      21              :         DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
      22              :         LayerMapInfo, TimelineState,
      23              :     },
      24              :     reltag::BlockNumber,
      25              :     shard::{ShardIdentity, TenantShardId},
      26              : };
      27              : use rand::Rng;
      28              : use serde_with::serde_as;
      29              : use storage_broker::BrokerClientChannel;
      30              : use tokio::{
      31              :     runtime::Handle,
      32              :     sync::{oneshot, watch},
      33              : };
      34              : use tokio_util::sync::CancellationToken;
      35              : use tracing::*;
      36              : use utils::sync::gate::Gate;
      37              : 
      38              : use std::ops::{Deref, Range};
      39              : use std::pin::pin;
      40              : use std::sync::atomic::Ordering as AtomicOrdering;
      41              : use std::sync::{Arc, Mutex, RwLock, Weak};
      42              : use std::time::{Duration, Instant, SystemTime};
      43              : use std::{
      44              :     array,
      45              :     collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
      46              :     sync::atomic::AtomicU64,
      47              : };
      48              : use std::{
      49              :     cmp::{max, min, Ordering},
      50              :     ops::ControlFlow,
      51              : };
      52              : 
      53              : use crate::pgdatadir_mapping::DirectoryKind;
      54              : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
      55              : use crate::tenant::{
      56              :     layer_map::{LayerMap, SearchResult},
      57              :     metadata::{save_metadata, TimelineMetadata},
      58              :     par_fsync,
      59              : };
      60              : use crate::{
      61              :     context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder},
      62              :     disk_usage_eviction_task::DiskUsageEvictionInfo,
      63              : };
      64              : use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
      65              : use crate::{
      66              :     disk_usage_eviction_task::finite_f32,
      67              :     tenant::storage_layer::{
      68              :         AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
      69              :         LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
      70              :         ValueReconstructState,
      71              :     },
      72              : };
      73              : use crate::{
      74              :     disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
      75              : };
      76              : use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
      77              : 
      78              : use crate::config::PageServerConf;
      79              : use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
      80              : use crate::metrics::{
      81              :     TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
      82              : };
      83              : use crate::pgdatadir_mapping::CalculateLogicalSizeError;
      84              : use crate::tenant::config::TenantConfOpt;
      85              : use pageserver_api::key::{is_inherited_key, is_rel_fsm_block_key, is_rel_vm_block_key};
      86              : use pageserver_api::reltag::RelTag;
      87              : use pageserver_api::shard::ShardIndex;
      88              : 
      89              : use postgres_connection::PgConnectionConfig;
      90              : use postgres_ffi::to_pg_timestamp;
      91              : use utils::{
      92              :     completion,
      93              :     generation::Generation,
      94              :     id::TimelineId,
      95              :     lsn::{AtomicLsn, Lsn, RecordLsn},
      96              :     seqwait::SeqWait,
      97              :     simple_rcu::{Rcu, RcuReadGuard},
      98              : };
      99              : 
     100              : use crate::page_cache;
     101              : use crate::repository::GcResult;
     102              : use crate::repository::{Key, Value};
     103              : use crate::task_mgr;
     104              : use crate::task_mgr::TaskKind;
     105              : use crate::ZERO_PAGE;
     106              : 
     107              : use self::delete::DeleteTimelineFlow;
     108              : pub(super) use self::eviction_task::EvictionTaskTenantState;
     109              : use self::eviction_task::EvictionTaskTimelineState;
     110              : use self::layer_manager::LayerManager;
     111              : use self::logical_size::LogicalSize;
     112              : use self::walreceiver::{WalReceiver, WalReceiverConf};
     113              : 
     114              : use super::config::TenantConf;
     115              : use super::remote_timeline_client::index::IndexPart;
     116              : use super::remote_timeline_client::RemoteTimelineClient;
     117              : use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
     118              : use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
     119              : 
     120           45 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
     121              : pub(super) enum FlushLoopState {
     122              :     NotStarted,
     123              :     Running {
     124              :         #[cfg(test)]
     125              :         expect_initdb_optimization: bool,
     126              :         #[cfg(test)]
     127              :         initdb_optimization_count: usize,
     128              :     },
     129              :     Exited,
     130              : }
     131              : 
     132              : /// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
     133            0 : #[derive(Debug, Clone, PartialEq, Eq)]
     134              : pub(crate) struct Hole {
     135              :     key_range: Range<Key>,
     136              :     coverage_size: usize,
     137              : }
     138              : 
     139              : impl Ord for Hole {
     140          286 :     fn cmp(&self, other: &Self) -> Ordering {
     141          286 :         other.coverage_size.cmp(&self.coverage_size) // inverse order
     142          286 :     }
     143              : }
     144              : 
     145              : impl PartialOrd for Hole {
     146          286 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     147          286 :         Some(self.cmp(other))
     148          286 :     }
     149              : }
     150              : 
     151              : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
     152              : /// Can be removed after all refactors are done.
     153          295 : fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
     154          295 :     drop(rlock)
     155          295 : }
     156              : 
     157              : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
     158              : /// Can be removed after all refactors are done.
     159         1994 : fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
     160         1994 :     drop(rlock)
     161         1994 : }
     162              : 
     163              : /// The outward-facing resources required to build a Timeline
     164              : pub struct TimelineResources {
     165              :     pub remote_client: Option<RemoteTimelineClient>,
     166              :     pub deletion_queue_client: DeletionQueueClient,
     167              : }
     168              : 
     169              : pub struct Timeline {
     170              :     conf: &'static PageServerConf,
     171              :     tenant_conf: Arc<RwLock<AttachedTenantConf>>,
     172              : 
     173              :     myself: Weak<Self>,
     174              : 
     175              :     pub(crate) tenant_shard_id: TenantShardId,
     176              :     pub timeline_id: TimelineId,
     177              : 
     178              :     /// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
     179              :     /// Never changes for the lifetime of this [`Timeline`] object.
     180              :     ///
     181              :     /// This duplicates the generation stored in LocationConf, but that structure is mutable:
     182              :     /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
     183              :     pub(crate) generation: Generation,
     184              : 
     185              :     /// The detailed sharding information from our parent Tenant.  This enables us to map keys
     186              :     /// to shards, and is constant through the lifetime of this Timeline.
     187              :     shard_identity: ShardIdentity,
     188              : 
     189              :     pub pg_version: u32,
     190              : 
     191              :     /// The tuple has two elements.
     192              :     /// 1. `LayerFileManager` keeps track of the various physical representations of the layer files (inmem, local, remote).
     193              :     /// 2. `LayerMap`, the acceleration data structure for `get_reconstruct_data`.
     194              :     ///
     195              :     /// `LayerMap` maps out the `(PAGE,LSN) / (KEY,LSN)` space, which is composed of `(KeyRange, LsnRange)` rectangles.
     196              :     /// We describe these rectangles through the `PersistentLayerDesc` struct.
     197              :     ///
     198              :     /// When we want to reconstruct a page, we first find the `PersistentLayerDesc`'s that we need for page reconstruction,
     199              :     /// using `LayerMap`. Then, we use `LayerFileManager` to get the `PersistentLayer`'s that correspond to the
     200              :     /// `PersistentLayerDesc`'s.
     201              :     ///
     202              :     /// Hence, it's important to keep things coherent. The `LayerFileManager` must always have an entry for all
     203              :     /// `PersistentLayerDesc`'s in the `LayerMap`. If it doesn't, `LayerFileManager::get_from_desc` will panic at
     204              :     /// runtime, e.g., during page reconstruction.
     205              :     ///
     206              :     /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
     207              :     /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
     208              :     pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
     209              : 
     210              :     /// Set of key ranges which should be covered by image layers to
     211              :     /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
     212              :     /// It is used by compaction task when it checks if new image layer should be created.
     213              :     /// Newly created image layer doesn't help to remove the delta layer, until the
     214              :     /// newly created image layer falls off the PITR horizon. So on next GC cycle,
     215              :     /// gc_timeline may still want the new image layer to be created. To avoid redundant
     216              :     /// image layers creation we should check if image layer exists but beyond PITR horizon.
     217              :     /// This is why we need remember GC cutoff LSN.
     218              :     ///
     219              :     wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
     220              : 
     221              :     last_freeze_at: AtomicLsn,
     222              :     // Atomic would be more appropriate here.
     223              :     last_freeze_ts: RwLock<Instant>,
     224              : 
     225              :     // WAL redo manager. `None` only for broken tenants.
     226              :     walredo_mgr: Option<Arc<super::WalRedoManager>>,
     227              : 
     228              :     /// Remote storage client.
     229              :     /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
     230              :     pub remote_client: Option<Arc<RemoteTimelineClient>>,
     231              : 
     232              :     // What page versions do we hold in the repository? If we get a
     233              :     // request > last_record_lsn, we need to wait until we receive all
     234              :     // the WAL up to the request. The SeqWait provides functions for
     235              :     // that. TODO: If we get a request for an old LSN, such that the
     236              :     // versions have already been garbage collected away, we should
     237              :     // throw an error, but we don't track that currently.
     238              :     //
     239              :     // last_record_lsn.load().last points to the end of last processed WAL record.
     240              :     //
     241              :     // We also remember the starting point of the previous record in
     242              :     // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
     243              :     // first WAL record when the node is started up. But here, we just
     244              :     // keep track of it.
     245              :     last_record_lsn: SeqWait<RecordLsn, Lsn>,
     246              : 
     247              :     // All WAL records have been processed and stored durably on files on
     248              :     // local disk, up to this LSN. On crash and restart, we need to re-process
     249              :     // the WAL starting from this point.
     250              :     //
     251              :     // Some later WAL records might have been processed and also flushed to disk
     252              :     // already, so don't be surprised to see some, but there's no guarantee on
     253              :     // them yet.
     254              :     disk_consistent_lsn: AtomicLsn,
     255              : 
     256              :     // Parent timeline that this timeline was branched from, and the LSN
     257              :     // of the branch point.
     258              :     ancestor_timeline: Option<Arc<Timeline>>,
     259              :     ancestor_lsn: Lsn,
     260              : 
     261              :     pub(super) metrics: TimelineMetrics,
     262              : 
     263              :     // `Timeline` doesn't write these metrics itself, but it manages the lifetime.  Code
     264              :     // in `crate::page_service` writes these metrics.
     265              :     pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
     266              : 
     267              :     directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM],
     268              : 
     269              :     /// Ensures layers aren't frozen by checkpointer between
     270              :     /// [`Timeline::get_layer_for_write`] and layer reads.
     271              :     /// Locked automatically by [`TimelineWriter`] and checkpointer.
     272              :     /// Must always be acquired before the layer map/individual layer lock
     273              :     /// to avoid deadlock.
     274              :     write_lock: tokio::sync::Mutex<()>,
     275              : 
     276              :     /// Used to avoid multiple `flush_loop` tasks running
     277              :     pub(super) flush_loop_state: Mutex<FlushLoopState>,
     278              : 
     279              :     /// layer_flush_start_tx can be used to wake up the layer-flushing task.
     280              :     /// The value is a counter, incremented every time a new flush cycle is requested.
     281              :     /// The flush cycle counter is sent back on the layer_flush_done channel when
     282              :     /// the flush finishes. You can use that to wait for the flush to finish.
     283              :     layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
     284              :     /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
     285              :     layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
     286              : 
     287              :     // Needed to ensure that we can't create a branch at a point that was already garbage collected
     288              :     pub latest_gc_cutoff_lsn: Rcu<Lsn>,
     289              : 
     290              :     // List of child timelines and their branch points. This is needed to avoid
     291              :     // garbage collecting data that is still needed by the child timelines.
     292              :     pub gc_info: std::sync::RwLock<GcInfo>,
     293              : 
     294              :     // It may change across major versions so for simplicity
     295              :     // keep it after running initdb for a timeline.
     296              :     // It is needed in checks when we want to error on some operations
     297              :     // when they are requested for pre-initdb lsn.
     298              :     // It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn",
     299              :     // though let's keep them both for better error visibility.
     300              :     pub initdb_lsn: Lsn,
     301              : 
     302              :     /// When did we last calculate the partitioning?
     303              :     partitioning: Mutex<(KeyPartitioning, Lsn)>,
     304              : 
     305              :     /// Configuration: how often should the partitioning be recalculated.
     306              :     repartition_threshold: u64,
     307              : 
     308              :     /// Current logical size of the "datadir", at the last LSN.
     309              :     current_logical_size: LogicalSize,
     310              : 
     311              :     /// Information about the last processed message by the WAL receiver,
     312              :     /// or None if WAL receiver has not received anything for this timeline
     313              :     /// yet.
     314              :     pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
     315              :     pub walreceiver: Mutex<Option<WalReceiver>>,
     316              : 
     317              :     /// Relation size cache
     318              :     pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
     319              : 
     320              :     download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
     321              : 
     322              :     state: watch::Sender<TimelineState>,
     323              : 
     324              :     /// Prevent two tasks from deleting the timeline at the same time. If held, the
     325              :     /// timeline is being deleted. If 'true', the timeline has already been deleted.
     326              :     pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
     327              : 
     328              :     eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
     329              : 
     330              :     /// Load or creation time information about the disk_consistent_lsn and when the loading
     331              :     /// happened. Used for consumption metrics.
     332              :     pub(crate) loaded_at: (Lsn, SystemTime),
     333              : 
     334              :     /// Gate to prevent shutdown completing while I/O is still happening to this timeline's data
     335              :     pub(crate) gate: Gate,
     336              : 
     337              :     /// Cancellation token scoped to this timeline: anything doing long-running work relating
     338              :     /// to the timeline should drop out when this token fires.
     339              :     pub(crate) cancel: CancellationToken,
     340              : 
     341              :     /// Make sure we only have one running compaction at a time in tests.
     342              :     ///
     343              :     /// Must only be taken in two places:
     344              :     /// - [`Timeline::compact`] (this file)
     345              :     /// - [`delete::delete_local_layer_files`]
     346              :     ///
     347              :     /// Timeline deletion will acquire both compaction and gc locks in whatever order.
     348              :     compaction_lock: tokio::sync::Mutex<()>,
     349              : 
     350              :     /// Make sure we only have one running gc at a time.
     351              :     ///
     352              :     /// Must only be taken in two places:
     353              :     /// - [`Timeline::gc`] (this file)
     354              :     /// - [`delete::delete_local_layer_files`]
     355              :     ///
     356              :     /// Timeline deletion will acquire both compaction and gc locks in whatever order.
     357              :     gc_lock: tokio::sync::Mutex<()>,
     358              : }
     359              : 
     360              : pub struct WalReceiverInfo {
     361              :     pub wal_source_connconf: PgConnectionConfig,
     362              :     pub last_received_msg_lsn: Lsn,
     363              :     pub last_received_msg_ts: u128,
     364              : }
     365              : 
     366              : ///
     367              : /// Information about how much history needs to be retained, needed by
     368              : /// Garbage Collection.
     369              : ///
     370              : pub struct GcInfo {
     371              :     /// Specific LSNs that are needed.
     372              :     ///
     373              :     /// Currently, this includes all points where child branches have
     374              :     /// been forked off from. In the future, could also include
     375              :     /// explicit user-defined snapshot points.
     376              :     pub retain_lsns: Vec<Lsn>,
     377              : 
     378              :     /// In addition to 'retain_lsns', keep everything newer than this
     379              :     /// point.
     380              :     ///
     381              :     /// This is calculated by subtracting 'gc_horizon' setting from
     382              :     /// last-record LSN
     383              :     ///
     384              :     /// FIXME: is this inclusive or exclusive?
     385              :     pub horizon_cutoff: Lsn,
     386              : 
     387              :     /// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this
     388              :     /// point.
     389              :     ///
     390              :     /// This is calculated by finding a number such that a record is needed for PITR
     391              :     /// if only if its LSN is larger than 'pitr_cutoff'.
     392              :     pub pitr_cutoff: Lsn,
     393              : }
     394              : 
     395              : /// An error happened in a get() operation.
     396           70 : #[derive(thiserror::Error, Debug)]
     397              : pub(crate) enum PageReconstructError {
     398              :     #[error(transparent)]
     399              :     Other(#[from] anyhow::Error),
     400              : 
     401              :     #[error("Ancestor LSN wait error: {0}")]
     402              :     AncestorLsnTimeout(#[from] WaitLsnError),
     403              : 
     404              :     #[error("timeline shutting down")]
     405              :     Cancelled,
     406              : 
     407              :     /// The ancestor of this is being stopped
     408              :     #[error("ancestor timeline {0} is being stopped")]
     409              :     AncestorStopping(TimelineId),
     410              : 
     411              :     /// An error happened replaying WAL records
     412              :     #[error(transparent)]
     413              :     WalRedo(anyhow::Error),
     414              : }
     415              : 
     416              : impl PageReconstructError {
     417              :     /// Returns true if this error indicates a tenant/timeline shutdown alike situation
     418            0 :     pub(crate) fn is_stopping(&self) -> bool {
     419            0 :         use PageReconstructError::*;
     420            0 :         match self {
     421            0 :             Other(_) => false,
     422            0 :             AncestorLsnTimeout(_) => false,
     423            0 :             Cancelled | AncestorStopping(_) => true,
     424            0 :             WalRedo(_) => false,
     425              :         }
     426            0 :     }
     427              : }
     428              : 
     429            0 : #[derive(thiserror::Error, Debug)]
     430              : enum CreateImageLayersError {
     431              :     #[error("timeline shutting down")]
     432              :     Cancelled,
     433              : 
     434              :     #[error(transparent)]
     435              :     GetVectoredError(GetVectoredError),
     436              : 
     437              :     #[error(transparent)]
     438              :     PageReconstructError(PageReconstructError),
     439              : 
     440              :     #[error(transparent)]
     441              :     Other(#[from] anyhow::Error),
     442              : }
     443              : 
     444            0 : #[derive(thiserror::Error, Debug)]
     445              : enum FlushLayerError {
     446              :     /// Timeline cancellation token was cancelled
     447              :     #[error("timeline shutting down")]
     448              :     Cancelled,
     449              : 
     450              :     #[error(transparent)]
     451              :     CreateImageLayersError(CreateImageLayersError),
     452              : 
     453              :     #[error(transparent)]
     454              :     Other(#[from] anyhow::Error),
     455              : }
     456              : 
     457            0 : #[derive(thiserror::Error, Debug)]
     458              : pub(crate) enum GetVectoredError {
     459              :     #[error("timeline shutting down")]
     460              :     Cancelled,
     461              : 
     462              :     #[error("Requested too many keys: {0} > {}", Timeline::MAX_GET_VECTORED_KEYS)]
     463              :     Oversized(u64),
     464              : 
     465              :     #[error("Requested at invalid LSN: {0}")]
     466              :     InvalidLsn(Lsn),
     467              : }
     468              : 
     469            0 : #[derive(thiserror::Error, Debug)]
     470              : pub(crate) enum GetReadyAncestorError {
     471              :     #[error("ancestor timeline {0} is being stopped")]
     472              :     AncestorStopping(TimelineId),
     473              : 
     474              :     #[error("Ancestor LSN wait error: {0}")]
     475              :     AncestorLsnTimeout(#[from] WaitLsnError),
     476              : 
     477              :     #[error("Cancelled")]
     478              :     Cancelled,
     479              : 
     480              :     #[error(transparent)]
     481              :     Other(#[from] anyhow::Error),
     482              : }
     483              : 
     484            0 : #[derive(Clone, Copy)]
     485              : pub enum LogicalSizeCalculationCause {
     486              :     Initial,
     487              :     ConsumptionMetricsSyntheticSize,
     488              :     EvictionTaskImitation,
     489              :     TenantSizeHandler,
     490              : }
     491              : 
     492              : pub enum GetLogicalSizePriority {
     493              :     User,
     494              :     Background,
     495              : }
     496              : 
     497          813 : #[derive(enumset::EnumSetType)]
     498              : pub(crate) enum CompactFlags {
     499              :     ForceRepartition,
     500              : }
     501              : 
     502              : impl std::fmt::Debug for Timeline {
     503            0 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
     504            0 :         write!(f, "Timeline<{}>", self.timeline_id)
     505            0 :     }
     506              : }
     507              : 
     508           48 : #[derive(thiserror::Error, Debug)]
     509              : pub(crate) enum WaitLsnError {
     510              :     // Called on a timeline which is shutting down
     511              :     #[error("Shutdown")]
     512              :     Shutdown,
     513              : 
     514              :     // Called on an timeline not in active state or shutting down
     515              :     #[error("Bad state (not active)")]
     516              :     BadState,
     517              : 
     518              :     // Timeout expired while waiting for LSN to catch up with goal.
     519              :     #[error("{0}")]
     520              :     Timeout(String),
     521              : }
     522              : 
     523              : // The impls below achieve cancellation mapping for errors.
     524              : // Perhaps there's a way of achieving this with less cruft.
     525              : 
     526              : impl From<CreateImageLayersError> for CompactionError {
     527            0 :     fn from(e: CreateImageLayersError) -> Self {
     528            0 :         match e {
     529            0 :             CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
     530            0 :             _ => CompactionError::Other(e.into()),
     531              :         }
     532            0 :     }
     533              : }
     534              : 
     535              : impl From<CreateImageLayersError> for FlushLayerError {
     536            0 :     fn from(e: CreateImageLayersError) -> Self {
     537            0 :         match e {
     538            0 :             CreateImageLayersError::Cancelled => FlushLayerError::Cancelled,
     539            0 :             any => FlushLayerError::CreateImageLayersError(any),
     540              :         }
     541            0 :     }
     542              : }
     543              : 
     544              : impl From<PageReconstructError> for CreateImageLayersError {
     545            0 :     fn from(e: PageReconstructError) -> Self {
     546            0 :         match e {
     547            0 :             PageReconstructError::Cancelled => CreateImageLayersError::Cancelled,
     548            0 :             _ => CreateImageLayersError::PageReconstructError(e),
     549              :         }
     550            0 :     }
     551              : }
     552              : 
     553              : impl From<GetVectoredError> for CreateImageLayersError {
     554            0 :     fn from(e: GetVectoredError) -> Self {
     555            0 :         match e {
     556            0 :             GetVectoredError::Cancelled => CreateImageLayersError::Cancelled,
     557            0 :             _ => CreateImageLayersError::GetVectoredError(e),
     558              :         }
     559            0 :     }
     560              : }
     561              : 
     562              : impl From<GetReadyAncestorError> for PageReconstructError {
     563            4 :     fn from(e: GetReadyAncestorError) -> Self {
     564            4 :         use GetReadyAncestorError::*;
     565            4 :         match e {
     566            2 :             AncestorStopping(tid) => PageReconstructError::AncestorStopping(tid),
     567            0 :             AncestorLsnTimeout(wait_err) => PageReconstructError::AncestorLsnTimeout(wait_err),
     568            0 :             Cancelled => PageReconstructError::Cancelled,
     569            2 :             Other(other) => PageReconstructError::Other(other),
     570              :         }
     571            4 :     }
     572              : }
     573              : 
     574              : /// Public interface functions
     575              : impl Timeline {
     576              :     /// Get the LSN where this branch was created
     577         3803 :     pub(crate) fn get_ancestor_lsn(&self) -> Lsn {
     578         3803 :         self.ancestor_lsn
     579         3803 :     }
     580              : 
     581              :     /// Get the ancestor's timeline id
     582         5285 :     pub(crate) fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
     583         5285 :         self.ancestor_timeline
     584         5285 :             .as_ref()
     585         5285 :             .map(|ancestor| ancestor.timeline_id)
     586         5285 :     }
     587              : 
     588              :     /// Lock and get timeline's GC cutoff
     589      4472625 :     pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
     590      4472625 :         self.latest_gc_cutoff_lsn.read()
     591      4472625 :     }
     592              : 
     593              :     /// Look up given page version.
     594              :     ///
     595              :     /// If a remote layer file is needed, it is downloaded as part of this
     596              :     /// call.
     597              :     ///
     598              :     /// NOTE: It is considered an error to 'get' a key that doesn't exist. The
     599              :     /// abstraction above this needs to store suitable metadata to track what
     600              :     /// data exists with what keys, in separate metadata entries. If a
     601              :     /// non-existent key is requested, we may incorrectly return a value from
     602              :     /// an ancestor branch, for example, or waste a lot of cycles chasing the
     603              :     /// non-existing key.
     604              :     ///
     605              :     /// # Cancel-Safety
     606              :     ///
     607              :     /// This method is cancellation-safe.
     608      7398627 :     pub(crate) async fn get(
     609      7398627 :         &self,
     610      7398627 :         key: Key,
     611      7398627 :         lsn: Lsn,
     612      7398627 :         ctx: &RequestContext,
     613      7398634 :     ) -> Result<Bytes, PageReconstructError> {
     614      7398634 :         if !lsn.is_valid() {
     615            1 :             return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
     616      7398633 :         }
     617              : 
     618              :         // This check is debug-only because of the cost of hashing, and because it's a double-check: we
     619              :         // already checked the key against the shard_identity when looking up the Timeline from
     620              :         // page_service.
     621      7398633 :         debug_assert!(!self.shard_identity.is_key_disposable(&key));
     622              : 
     623              :         // XXX: structured stats collection for layer eviction here.
     624            0 :         trace!(
     625            0 :             "get page request for {}@{} from task kind {:?}",
     626            0 :             key,
     627            0 :             lsn,
     628            0 :             ctx.task_kind()
     629            0 :         );
     630              : 
     631              :         // Check the page cache. We will get back the most recent page with lsn <= `lsn`.
     632              :         // The cached image can be returned directly if there is no WAL between the cached image
     633              :         // and requested LSN. The cached image can also be used to reduce the amount of WAL needed
     634              :         // for redo.
     635      7398633 :         let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
     636      1787176 :             Some((cached_lsn, cached_img)) => {
     637      1787176 :                 match cached_lsn.cmp(&lsn) {
     638      1710871 :                     Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
     639              :                     Ordering::Equal => {
     640        76305 :                         MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
     641        76305 :                         return Ok(cached_img); // exact LSN match, return the image
     642              :                     }
     643              :                     Ordering::Greater => {
     644            0 :                         unreachable!("the returned lsn should never be after the requested lsn")
     645              :                     }
     646              :                 }
     647      1710871 :                 Some((cached_lsn, cached_img))
     648              :             }
     649      5611457 :             None => None,
     650              :         };
     651              : 
     652      7322328 :         let mut reconstruct_state = ValueReconstructState {
     653      7322328 :             records: Vec::new(),
     654      7322328 :             img: cached_page_img,
     655      7322328 :         };
     656      7322328 : 
     657      7322328 :         let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
     658      7322328 :         let path = self
     659      7322328 :             .get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
     660      2279470 :             .await?;
     661      7320977 :         timer.stop_and_record();
     662      7320977 : 
     663      7320977 :         let start = Instant::now();
     664      7320977 :         let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
     665      7320975 :         let elapsed = start.elapsed();
     666      7320975 :         crate::metrics::RECONSTRUCT_TIME
     667      7320975 :             .for_result(&res)
     668      7320975 :             .observe(elapsed.as_secs_f64());
     669      7320975 : 
     670      7320975 :         if cfg!(feature = "testing") && res.is_err() {
     671              :             // it can only be walredo issue
     672              :             use std::fmt::Write;
     673              : 
     674            0 :             let mut msg = String::new();
     675            0 : 
     676            0 :             path.into_iter().for_each(|(res, cont_lsn, layer)| {
     677            0 :                 writeln!(
     678            0 :                     msg,
     679            0 :                     "- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
     680            0 :                     layer(),
     681            0 :                 )
     682            0 :                 .expect("string grows")
     683            0 :             });
     684              : 
     685              :             // this is to rule out or provide evidence that we could in some cases read a duplicate
     686              :             // walrecord
     687            0 :             tracing::info!("walredo failed, path:\n{msg}");
     688      7320975 :         }
     689              : 
     690      7320975 :         res
     691      7398620 :     }
     692              : 
     693              :     pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
     694              : 
     695              :     /// Look up multiple page versions at a given LSN
     696              :     ///
     697              :     /// This naive implementation will be replaced with a more efficient one
     698              :     /// which actually vectorizes the read path.
     699        78555 :     pub(crate) async fn get_vectored(
     700        78555 :         &self,
     701        78555 :         key_ranges: &[Range<Key>],
     702        78555 :         lsn: Lsn,
     703        78555 :         ctx: &RequestContext,
     704        78555 :     ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
     705        78555 :         if !lsn.is_valid() {
     706            0 :             return Err(GetVectoredError::InvalidLsn(lsn));
     707        78555 :         }
     708        78555 : 
     709        78555 :         let key_count = key_ranges
     710        78555 :             .iter()
     711        79754 :             .map(|range| key_range_size(range) as u64)
     712        78555 :             .sum();
     713        78555 :         if key_count > Timeline::MAX_GET_VECTORED_KEYS {
     714            0 :             return Err(GetVectoredError::Oversized(key_count));
     715        78555 :         }
     716        78555 : 
     717        78555 :         let _timer = crate::metrics::GET_VECTORED_LATENCY
     718        78555 :             .for_task_kind(ctx.task_kind())
     719        78555 :             .map(|t| t.start_timer());
     720        78555 : 
     721        78555 :         let mut values = BTreeMap::new();
     722       158309 :         for range in key_ranges {
     723        79754 :             let mut key = range.start;
     724       311977 :             while key != range.end {
     725       232223 :                 assert!(!self.shard_identity.is_key_disposable(&key));
     726              : 
     727       232223 :                 let block = self.get(key, lsn, ctx).await;
     728              : 
     729              :                 if matches!(
     730            0 :                     block,
     731              :                     Err(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_))
     732              :                 ) {
     733            0 :                     return Err(GetVectoredError::Cancelled);
     734       232223 :                 }
     735       232223 : 
     736       232223 :                 values.insert(key, block);
     737       232223 :                 key = key.next();
     738              :             }
     739              :         }
     740              : 
     741        78555 :         Ok(values)
     742        78555 :     }
     743              : 
     744              :     /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
     745      9450148 :     pub(crate) fn get_last_record_lsn(&self) -> Lsn {
     746      9450148 :         self.last_record_lsn.load().last
     747      9450148 :     }
     748              : 
     749         3185 :     pub(crate) fn get_prev_record_lsn(&self) -> Lsn {
     750         3185 :         self.last_record_lsn.load().prev
     751         3185 :     }
     752              : 
     753              :     /// Atomically get both last and prev.
     754         1080 :     pub(crate) fn get_last_record_rlsn(&self) -> RecordLsn {
     755         1080 :         self.last_record_lsn.load()
     756         1080 :     }
     757              : 
     758       757672 :     pub(crate) fn get_disk_consistent_lsn(&self) -> Lsn {
     759       757672 :         self.disk_consistent_lsn.load()
     760       757672 :     }
     761              : 
     762              :     /// remote_consistent_lsn from the perspective of the tenant's current generation,
     763              :     /// not validated with control plane yet.
     764              :     /// See [`Self::get_remote_consistent_lsn_visible`].
     765         3185 :     pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
     766         3185 :         if let Some(remote_client) = &self.remote_client {
     767         3185 :             remote_client.remote_consistent_lsn_projected()
     768              :         } else {
     769            0 :             None
     770              :         }
     771         3185 :     }
     772              : 
     773              :     /// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
     774              :     /// i.e. a value of remote_consistent_lsn_projected which has undergone
     775              :     /// generation validation in the deletion queue.
     776       755304 :     pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
     777       755304 :         if let Some(remote_client) = &self.remote_client {
     778       755304 :             remote_client.remote_consistent_lsn_visible()
     779              :         } else {
     780            0 :             None
     781              :         }
     782       755304 :     }
     783              : 
     784              :     /// The sum of the file size of all historic layers in the layer map.
     785              :     /// This method makes no distinction between local and remote layers.
     786              :     /// Hence, the result **does not represent local filesystem usage**.
     787         3661 :     pub(crate) async fn layer_size_sum(&self) -> u64 {
     788         3661 :         let guard = self.layers.read().await;
     789         3661 :         let layer_map = guard.layer_map();
     790         3661 :         let mut size = 0;
     791       325484 :         for l in layer_map.iter_historic_layers() {
     792       325484 :             size += l.file_size();
     793       325484 :         }
     794         3661 :         size
     795         3661 :     }
     796              : 
     797           17 :     pub(crate) fn resident_physical_size(&self) -> u64 {
     798           17 :         self.metrics.resident_physical_size_get()
     799           17 :     }
     800              : 
     801         3185 :     pub(crate) fn get_directory_metrics(&self) -> [u64; DirectoryKind::KINDS_NUM] {
     802        22295 :         array::from_fn(|idx| self.directory_metrics[idx].load(AtomicOrdering::Relaxed))
     803         3185 :     }
     804              : 
     805              :     ///
     806              :     /// Wait until WAL has been received and processed up to this LSN.
     807              :     ///
     808              :     /// You should call this before any of the other get_* or list_* functions. Calling
     809              :     /// those functions with an LSN that has been processed yet is an error.
     810              :     ///
     811      1539795 :     pub(crate) async fn wait_lsn(
     812      1539795 :         &self,
     813      1539795 :         lsn: Lsn,
     814      1539795 :         _ctx: &RequestContext, /* Prepare for use by cancellation */
     815      1539796 :     ) -> Result<(), WaitLsnError> {
     816      1539796 :         if self.cancel.is_cancelled() {
     817            0 :             return Err(WaitLsnError::Shutdown);
     818      1539796 :         } else if !self.is_active() {
     819            0 :             return Err(WaitLsnError::BadState);
     820      1539796 :         }
     821              : 
     822              :         // This should never be called from the WAL receiver, because that could lead
     823              :         // to a deadlock.
     824              :         debug_assert!(
     825      1539796 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
     826            0 :             "wait_lsn cannot be called in WAL receiver"
     827              :         );
     828              :         debug_assert!(
     829      1539796 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
     830            0 :             "wait_lsn cannot be called in WAL receiver"
     831              :         );
     832              :         debug_assert!(
     833      1539796 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
     834            0 :             "wait_lsn cannot be called in WAL receiver"
     835              :         );
     836              : 
     837      1539796 :         let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
     838      1539796 : 
     839      1539796 :         match self
     840      1539796 :             .last_record_lsn
     841      1539796 :             .wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
     842       108459 :             .await
     843              :         {
     844      1539772 :             Ok(()) => Ok(()),
     845           24 :             Err(e) => {
     846           24 :                 use utils::seqwait::SeqWaitError::*;
     847           24 :                 match e {
     848            0 :                     Shutdown => Err(WaitLsnError::Shutdown),
     849              :                     Timeout => {
     850              :                         // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
     851           24 :                         drop(_timer);
     852           24 :                         let walreceiver_status = self.walreceiver_status();
     853           24 :                         Err(WaitLsnError::Timeout(format!(
     854           24 :                         "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
     855           24 :                         lsn,
     856           24 :                         self.get_last_record_lsn(),
     857           24 :                         self.get_disk_consistent_lsn(),
     858           24 :                         walreceiver_status,
     859           24 :                     )))
     860              :                     }
     861              :                 }
     862              :             }
     863              :         }
     864      1539796 :     }
     865              : 
     866         3209 :     pub(crate) fn walreceiver_status(&self) -> String {
     867         3209 :         match &*self.walreceiver.lock().unwrap() {
     868          112 :             None => "stopping or stopped".to_string(),
     869         3097 :             Some(walreceiver) => match walreceiver.status() {
     870         1920 :                 Some(status) => status.to_human_readable_string(),
     871         1177 :                 None => "Not active".to_string(),
     872              :             },
     873              :         }
     874         3209 :     }
     875              : 
     876              :     /// Check that it is valid to request operations with that lsn.
     877          670 :     pub(crate) fn check_lsn_is_in_scope(
     878          670 :         &self,
     879          670 :         lsn: Lsn,
     880          670 :         latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
     881          670 :     ) -> anyhow::Result<()> {
     882          670 :         ensure!(
     883          670 :             lsn >= **latest_gc_cutoff_lsn,
     884           16 :             "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
     885           16 :             lsn,
     886           16 :             **latest_gc_cutoff_lsn,
     887              :         );
     888          654 :         Ok(())
     889          670 :     }
     890              : 
     891              :     /// Flush to disk all data that was written with the put_* functions
     892         3812 :     #[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
     893              :     pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
     894              :         self.freeze_inmem_layer(false).await;
     895              :         self.flush_frozen_layers_and_wait().await
     896              :     }
     897              : 
     898              :     /// Outermost timeline compaction operation; downloads needed layers.
     899         1623 :     pub(crate) async fn compact(
     900         1623 :         self: &Arc<Self>,
     901         1623 :         cancel: &CancellationToken,
     902         1623 :         flags: EnumSet<CompactFlags>,
     903         1623 :         ctx: &RequestContext,
     904         1623 :     ) -> Result<(), CompactionError> {
     905         1623 :         // most likely the cancellation token is from background task, but in tests it could be the
     906         1623 :         // request task as well.
     907         1623 : 
     908         1623 :         let prepare = async move {
     909         1623 :             let guard = self.compaction_lock.lock().await;
     910              : 
     911         1623 :             let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
     912         1623 :                 BackgroundLoopKind::Compaction,
     913         1623 :                 ctx,
     914         1623 :             )
     915            0 :             .await;
     916              : 
     917         1623 :             (guard, permit)
     918         1623 :         };
     919              : 
     920              :         // this wait probably never needs any "long time spent" logging, because we already nag if
     921              :         // compaction task goes over it's period (20s) which is quite often in production.
     922         1623 :         let (_guard, _permit) = tokio::select! {
     923         1623 :             tuple = prepare => { tuple },
     924              :             _ = self.cancel.cancelled() => return Ok(()),
     925              :             _ = cancel.cancelled() => return Ok(()),
     926              :         };
     927              : 
     928         1623 :         let last_record_lsn = self.get_last_record_lsn();
     929         1623 : 
     930         1623 :         // Last record Lsn could be zero in case the timeline was just created
     931         1623 :         if !last_record_lsn.is_valid() {
     932            0 :             warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
     933            0 :             return Ok(());
     934         1623 :         }
     935         1623 : 
     936         1623 :         // High level strategy for compaction / image creation:
     937         1623 :         //
     938         1623 :         // 1. First, calculate the desired "partitioning" of the
     939         1623 :         // currently in-use key space. The goal is to partition the
     940         1623 :         // key space into roughly fixed-size chunks, but also take into
     941         1623 :         // account any existing image layers, and try to align the
     942         1623 :         // chunk boundaries with the existing image layers to avoid
     943         1623 :         // too much churn. Also try to align chunk boundaries with
     944         1623 :         // relation boundaries.  In principle, we don't know about
     945         1623 :         // relation boundaries here, we just deal with key-value
     946         1623 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
     947         1623 :         // map relations into key-value pairs. But in practice we know
     948         1623 :         // that 'field6' is the block number, and the fields 1-5
     949         1623 :         // identify a relation. This is just an optimization,
     950         1623 :         // though.
     951         1623 :         //
     952         1623 :         // 2. Once we know the partitioning, for each partition,
     953         1623 :         // decide if it's time to create a new image layer. The
     954         1623 :         // criteria is: there has been too much "churn" since the last
     955         1623 :         // image layer? The "churn" is fuzzy concept, it's a
     956         1623 :         // combination of too many delta files, or too much WAL in
     957         1623 :         // total in the delta file. Or perhaps: if creating an image
     958         1623 :         // file would allow to delete some older files.
     959         1623 :         //
     960         1623 :         // 3. After that, we compact all level0 delta files if there
     961         1623 :         // are too many of them.  While compacting, we also garbage
     962         1623 :         // collect any page versions that are no longer needed because
     963         1623 :         // of the new image layers we created in step 2.
     964         1623 :         //
     965         1623 :         // TODO: This high level strategy hasn't been implemented yet.
     966         1623 :         // Below are functions compact_level0() and create_image_layers()
     967         1623 :         // but they are a bit ad hoc and don't quite work like it's explained
     968         1623 :         // above. Rewrite it.
     969         1623 : 
     970         1623 :         // Is the timeline being deleted?
     971         1623 :         if self.is_stopping() {
     972            0 :             trace!("Dropping out of compaction on timeline shutdown");
     973            0 :             return Err(CompactionError::ShuttingDown);
     974         1623 :         }
     975         1623 : 
     976         1623 :         let target_file_size = self.get_checkpoint_distance();
     977         1623 : 
     978         1623 :         // Define partitioning schema if needed
     979         1623 : 
     980         1623 :         // FIXME: the match should only cover repartitioning, not the next steps
     981         1623 :         match self
     982         1623 :             .repartition(
     983         1623 :                 self.get_last_record_lsn(),
     984         1623 :                 self.get_compaction_target_size(),
     985         1623 :                 flags,
     986         1623 :                 ctx,
     987         1623 :             )
     988       176612 :             .await
     989              :         {
     990         1621 :             Ok((partitioning, lsn)) => {
     991         1621 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
     992         1621 :                 let image_ctx = RequestContextBuilder::extend(ctx)
     993         1621 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
     994         1621 :                     .build();
     995         1621 : 
     996         1621 :                 // 2. Compact
     997         1621 :                 let timer = self.metrics.compact_time_histo.start_timer();
     998       314622 :                 self.compact_level0(target_file_size, ctx).await?;
     999         1619 :                 timer.stop_and_record();
    1000              : 
    1001              :                 // 3. Create new image layers for partitions that have been modified
    1002              :                 // "enough".
    1003         1619 :                 let layers = self
    1004         1619 :                     .create_image_layers(&partitioning, lsn, false, &image_ctx)
    1005        56836 :                     .await
    1006         1619 :                     .map_err(anyhow::Error::from)?;
    1007         1619 :                 if let Some(remote_client) = &self.remote_client {
    1008         8014 :                     for layer in layers {
    1009         6395 :                         remote_client.schedule_layer_file_upload(layer)?;
    1010              :                     }
    1011            0 :                 }
    1012              : 
    1013         1619 :                 if let Some(remote_client) = &self.remote_client {
    1014              :                     // should any new image layer been created, not uploading index_part will
    1015              :                     // result in a mismatch between remote_physical_size and layermap calculated
    1016              :                     // size, which will fail some tests, but should not be an issue otherwise.
    1017         1619 :                     remote_client.schedule_index_upload_for_file_changes()?;
    1018            0 :                 }
    1019              :             }
    1020            1 :             Err(err) => {
    1021            1 :                 // no partitioning? This is normal, if the timeline was just created
    1022            1 :                 // as an empty timeline. Also in unit tests, when we use the timeline
    1023            1 :                 // as a simple key-value store, ignoring the datadir layout. Log the
    1024            1 :                 // error but continue.
    1025            1 :                 //
    1026            1 :                 // Suppress error when it's due to cancellation
    1027            1 :                 if !self.cancel.is_cancelled() {
    1028            1 :                     error!("could not compact, repartitioning keyspace failed: {err:?}");
    1029            0 :                 }
    1030              :             }
    1031              :         };
    1032              : 
    1033         1620 :         Ok(())
    1034         1620 :     }
    1035              : 
    1036              :     /// Mutate the timeline with a [`TimelineWriter`].
    1037      3310571 :     pub(crate) async fn writer(&self) -> TimelineWriter<'_> {
    1038      3310571 :         TimelineWriter {
    1039      3310571 :             tl: self,
    1040      3310571 :             _write_guard: self.write_lock.lock().await,
    1041              :         }
    1042      3310571 :     }
    1043              : 
    1044              :     /// Check if more than 'checkpoint_distance' of WAL has been accumulated in
    1045              :     /// the in-memory layer, and initiate flushing it if so.
    1046              :     ///
    1047              :     /// Also flush after a period of time without new data -- it helps
    1048              :     /// safekeepers to regard pageserver as caught up and suspend activity.
    1049      1356943 :     pub(crate) async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
    1050      1356943 :         let last_lsn = self.get_last_record_lsn();
    1051      1356123 :         let open_layer_size = {
    1052      1356943 :             let guard = self.layers.read().await;
    1053      1356943 :             let layers = guard.layer_map();
    1054      1356943 :             let Some(open_layer) = layers.open_layer.as_ref() else {
    1055          820 :                 return Ok(());
    1056              :             };
    1057      1356123 :             open_layer.size().await?
    1058              :         };
    1059      1356123 :         let last_freeze_at = self.last_freeze_at.load();
    1060      1356123 :         let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
    1061      1356123 :         let distance = last_lsn.widening_sub(last_freeze_at);
    1062      1356123 :         // Checkpointing the open layer can be triggered by layer size or LSN range.
    1063      1356123 :         // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
    1064      1356123 :         // we want to stay below that with a big margin.  The LSN distance determines how
    1065      1356123 :         // much WAL the safekeepers need to store.
    1066      1356123 :         if distance >= self.get_checkpoint_distance().into()
    1067      1355570 :             || open_layer_size > self.get_checkpoint_distance()
    1068      1352619 :             || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
    1069              :         {
    1070         3504 :             info!(
    1071         3504 :                 "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
    1072         3504 :                 distance,
    1073         3504 :                 open_layer_size,
    1074         3504 :                 last_freeze_ts.elapsed()
    1075         3504 :             );
    1076              : 
    1077         3504 :             self.freeze_inmem_layer(true).await;
    1078         3504 :             self.last_freeze_at.store(last_lsn);
    1079         3504 :             *(self.last_freeze_ts.write().unwrap()) = Instant::now();
    1080         3504 : 
    1081         3504 :             // Wake up the layer flusher
    1082         3504 :             self.flush_frozen_layers();
    1083      1352619 :         }
    1084      1356123 :         Ok(())
    1085      1356943 :     }
    1086              : 
    1087         1257 :     pub(crate) fn activate(
    1088         1257 :         self: &Arc<Self>,
    1089         1257 :         broker_client: BrokerClientChannel,
    1090         1257 :         background_jobs_can_start: Option<&completion::Barrier>,
    1091         1257 :         ctx: &RequestContext,
    1092         1257 :     ) {
    1093         1257 :         if self.tenant_shard_id.is_zero() {
    1094         1206 :             // Logical size is only maintained accurately on shard zero.
    1095         1206 :             self.spawn_initial_logical_size_computation_task(ctx);
    1096         1206 :         }
    1097         1257 :         self.launch_wal_receiver(ctx, broker_client);
    1098         1257 :         self.set_state(TimelineState::Active);
    1099         1257 :         self.launch_eviction_task(background_jobs_can_start);
    1100         1257 :     }
    1101              : 
    1102              :     /// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
    1103              :     /// also to remote storage.  This method can easily take multiple seconds for a busy timeline.
    1104              :     ///
    1105              :     /// While we are flushing, we continue to accept read I/O.
    1106          262 :     pub(crate) async fn flush_and_shutdown(&self) {
    1107          262 :         debug_assert_current_span_has_tenant_and_timeline_id();
    1108              : 
    1109              :         // Stop ingesting data, so that we are not still writing to an InMemoryLayer while
    1110              :         // trying to flush
    1111            0 :         tracing::debug!("Waiting for WalReceiverManager...");
    1112          262 :         task_mgr::shutdown_tasks(
    1113          262 :             Some(TaskKind::WalReceiverManager),
    1114          262 :             Some(self.tenant_shard_id),
    1115          262 :             Some(self.timeline_id),
    1116          262 :         )
    1117          145 :         .await;
    1118              : 
    1119              :         // Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
    1120          262 :         self.last_record_lsn.shutdown();
    1121          262 : 
    1122          262 :         // now all writers to InMemory layer are gone, do the final flush if requested
    1123          262 :         match self.freeze_and_flush().await {
    1124              :             Ok(_) => {
    1125              :                 // drain the upload queue
    1126          217 :                 if let Some(client) = self.remote_client.as_ref() {
    1127              :                     // if we did not wait for completion here, it might be our shutdown process
    1128              :                     // didn't wait for remote uploads to complete at all, as new tasks can forever
    1129              :                     // be spawned.
    1130              :                     //
    1131              :                     // what is problematic is the shutting down of RemoteTimelineClient, because
    1132              :                     // obviously it does not make sense to stop while we wait for it, but what
    1133              :                     // about corner cases like s3 suddenly hanging up?
    1134          217 :                     if let Err(e) = client.shutdown().await {
    1135              :                         // Non-fatal.  Shutdown is infallible.  Failures to flush just mean that
    1136              :                         // we have some extra WAL replay to do next time the timeline starts.
    1137            0 :                         warn!("failed to flush to remote storage: {e:#}");
    1138          217 :                     }
    1139            0 :                 }
    1140              :             }
    1141           45 :             Err(e) => {
    1142              :                 // Non-fatal.  Shutdown is infallible.  Failures to flush just mean that
    1143              :                 // we have some extra WAL replay to do next time the timeline starts.
    1144           45 :                 warn!("failed to freeze and flush: {e:#}");
    1145              :             }
    1146              :         }
    1147              : 
    1148          438 :         self.shutdown().await;
    1149          262 :     }
    1150              : 
    1151              :     /// Shut down immediately, without waiting for any open layers to flush to disk.  This is a subset of
    1152              :     /// the graceful [`Timeline::flush_and_shutdown`] function.
    1153          593 :     pub(crate) async fn shutdown(&self) {
    1154          593 :         debug_assert_current_span_has_tenant_and_timeline_id();
    1155              : 
    1156              :         // Signal any subscribers to our cancellation token to drop out
    1157            0 :         tracing::debug!("Cancelling CancellationToken");
    1158          593 :         self.cancel.cancel();
    1159          593 : 
    1160          593 :         // Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
    1161          593 :         // while doing so.
    1162          593 :         self.last_record_lsn.shutdown();
    1163          593 : 
    1164          593 :         // Shut down the layer flush task before the remote client, as one depends on the other
    1165          593 :         task_mgr::shutdown_tasks(
    1166          593 :             Some(TaskKind::LayerFlushTask),
    1167          593 :             Some(self.tenant_shard_id),
    1168          593 :             Some(self.timeline_id),
    1169          593 :         )
    1170          502 :         .await;
    1171              : 
    1172              :         // Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
    1173              :         // case our caller wants to use that for a deletion
    1174          593 :         if let Some(remote_client) = self.remote_client.as_ref() {
    1175          593 :             match remote_client.stop() {
    1176          593 :                 Ok(()) => {}
    1177            0 :                 Err(StopError::QueueUninitialized) => {
    1178            0 :                     // Shutting down during initialization is legal
    1179            0 :                 }
    1180              :             }
    1181            0 :         }
    1182              : 
    1183            0 :         tracing::debug!("Waiting for tasks...");
    1184              : 
    1185          658 :         task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;
    1186              : 
    1187              :         // Finally wait until any gate-holders are complete
    1188          593 :         self.gate.close().await;
    1189          593 :     }
    1190              : 
    1191         2281 :     pub(crate) fn set_state(&self, new_state: TimelineState) {
    1192         2281 :         match (self.current_state(), new_state) {
    1193         2281 :             (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
    1194          126 :                 info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
    1195              :             }
    1196            0 :             (st, TimelineState::Loading) => {
    1197            0 :                 error!("ignoring transition from {st:?} into Loading state");
    1198              :             }
    1199            7 :             (TimelineState::Broken { .. }, new_state) => {
    1200            7 :                 error!("Ignoring state update {new_state:?} for broken timeline");
    1201              :             }
    1202              :             (TimelineState::Stopping, TimelineState::Active) => {
    1203            0 :                 error!("Not activating a Stopping timeline");
    1204              :             }
    1205         2148 :             (_, new_state) => {
    1206         2148 :                 self.state.send_replace(new_state);
    1207         2148 :             }
    1208              :         }
    1209         2281 :     }
    1210              : 
    1211           19 :     pub(crate) fn set_broken(&self, reason: String) {
    1212           19 :         let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
    1213           19 :         let broken_state = TimelineState::Broken {
    1214           19 :             reason,
    1215           19 :             backtrace: backtrace_str,
    1216           19 :         };
    1217           19 :         self.set_state(broken_state);
    1218           19 : 
    1219           19 :         // Although the Broken state is not equivalent to shutdown() (shutdown will be called
    1220           19 :         // later when this tenant is detach or the process shuts down), firing the cancellation token
    1221           19 :         // here avoids the need for other tasks to watch for the Broken state explicitly.
    1222           19 :         self.cancel.cancel();
    1223           19 :     }
    1224              : 
    1225      2250837 :     pub(crate) fn current_state(&self) -> TimelineState {
    1226      2250837 :         self.state.borrow().clone()
    1227      2250837 :     }
    1228              : 
    1229          943 :     pub(crate) fn is_broken(&self) -> bool {
    1230          943 :         matches!(&*self.state.borrow(), TimelineState::Broken { .. })
    1231          943 :     }
    1232              : 
    1233      1554712 :     pub(crate) fn is_active(&self) -> bool {
    1234      1554712 :         self.current_state() == TimelineState::Active
    1235      1554712 :     }
    1236              : 
    1237         2813 :     pub(crate) fn is_stopping(&self) -> bool {
    1238         2813 :         self.current_state() == TimelineState::Stopping
    1239         2813 :     }
    1240              : 
    1241         1253 :     pub(crate) fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
    1242         1253 :         self.state.subscribe()
    1243         1253 :     }
    1244              : 
    1245      1313705 :     pub(crate) async fn wait_to_become_active(
    1246      1313705 :         &self,
    1247      1313705 :         _ctx: &RequestContext, // Prepare for use by cancellation
    1248      1313706 :     ) -> Result<(), TimelineState> {
    1249      1313706 :         let mut receiver = self.state.subscribe();
    1250      1313735 :         loop {
    1251      1313735 :             let current_state = receiver.borrow().clone();
    1252      1313735 :             match current_state {
    1253              :                 TimelineState::Loading => {
    1254           29 :                     receiver
    1255           29 :                         .changed()
    1256           29 :                         .await
    1257           29 :                         .expect("holding a reference to self");
    1258              :                 }
    1259              :                 TimelineState::Active { .. } => {
    1260      1313699 :                     return Ok(());
    1261              :                 }
    1262              :                 TimelineState::Broken { .. } | TimelineState::Stopping => {
    1263              :                     // There's no chance the timeline can transition back into ::Active
    1264            7 :                     return Err(current_state);
    1265              :                 }
    1266              :             }
    1267              :         }
    1268      1313706 :     }
    1269              : 
    1270           97 :     pub(crate) async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
    1271           97 :         let guard = self.layers.read().await;
    1272           97 :         let layer_map = guard.layer_map();
    1273           97 :         let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
    1274           97 :         if let Some(open_layer) = &layer_map.open_layer {
    1275            3 :             in_memory_layers.push(open_layer.info());
    1276           94 :         }
    1277           97 :         for frozen_layer in &layer_map.frozen_layers {
    1278            0 :             in_memory_layers.push(frozen_layer.info());
    1279            0 :         }
    1280              : 
    1281           97 :         let mut historic_layers = Vec::new();
    1282         3050 :         for historic_layer in layer_map.iter_historic_layers() {
    1283         3050 :             let historic_layer = guard.get_from_desc(&historic_layer);
    1284         3050 :             historic_layers.push(historic_layer.info(reset));
    1285         3050 :         }
    1286              : 
    1287           97 :         LayerMapInfo {
    1288           97 :             in_memory_layers,
    1289           97 :             historic_layers,
    1290           97 :         }
    1291           97 :     }
    1292              : 
    1293            4 :     #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
    1294              :     pub(crate) async fn download_layer(
    1295              :         &self,
    1296              :         layer_file_name: &str,
    1297              :     ) -> anyhow::Result<Option<bool>> {
    1298              :         let Some(layer) = self.find_layer(layer_file_name).await else {
    1299              :             return Ok(None);
    1300              :         };
    1301              : 
    1302              :         if self.remote_client.is_none() {
    1303              :             return Ok(Some(false));
    1304              :         }
    1305              : 
    1306              :         layer.download().await?;
    1307              : 
    1308              :         Ok(Some(true))
    1309              :     }
    1310              : 
    1311              :     /// Evict just one layer.
    1312              :     ///
    1313              :     /// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`.
    1314         2206 :     pub(crate) async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
    1315         2206 :         let _gate = self
    1316         2206 :             .gate
    1317         2206 :             .enter()
    1318         2206 :             .map_err(|_| anyhow::anyhow!("Shutting down"))?;
    1319              : 
    1320         2206 :         let Some(local_layer) = self.find_layer(layer_file_name).await else {
    1321            0 :             return Ok(None);
    1322              :         };
    1323              : 
    1324         2206 :         match local_layer.evict_and_wait().await {
    1325         2206 :             Ok(()) => Ok(Some(true)),
    1326            0 :             Err(EvictionError::NotFound) => Ok(Some(false)),
    1327            0 :             Err(EvictionError::Downloaded) => Ok(Some(false)),
    1328              :         }
    1329         2206 :     }
    1330              : }
    1331              : 
    1332              : /// Number of times we will compute partition within a checkpoint distance.
    1333              : const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
    1334              : 
    1335              : // Private functions
    1336              : impl Timeline {
    1337          607 :     pub(crate) fn get_lazy_slru_download(&self) -> bool {
    1338          607 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1339          607 :         tenant_conf
    1340          607 :             .lazy_slru_download
    1341          607 :             .unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
    1342          607 :     }
    1343              : 
    1344      2714908 :     fn get_checkpoint_distance(&self) -> u64 {
    1345      2714908 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1346      2714908 :         tenant_conf
    1347      2714908 :             .checkpoint_distance
    1348      2714908 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
    1349      2714908 :     }
    1350              : 
    1351      1352619 :     fn get_checkpoint_timeout(&self) -> Duration {
    1352      1352619 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1353      1352619 :         tenant_conf
    1354      1352619 :             .checkpoint_timeout
    1355      1352619 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
    1356      1352619 :     }
    1357              : 
    1358         1703 :     fn get_compaction_target_size(&self) -> u64 {
    1359         1703 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1360         1703 :         tenant_conf
    1361         1703 :             .compaction_target_size
    1362         1703 :             .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
    1363         1703 :     }
    1364              : 
    1365         1621 :     fn get_compaction_threshold(&self) -> usize {
    1366         1621 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1367         1621 :         tenant_conf
    1368         1621 :             .compaction_threshold
    1369         1621 :             .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
    1370         1621 :     }
    1371              : 
    1372        36833 :     fn get_image_creation_threshold(&self) -> usize {
    1373        36833 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1374        36833 :         tenant_conf
    1375        36833 :             .image_creation_threshold
    1376        36833 :             .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
    1377        36833 :     }
    1378              : 
    1379         2664 :     fn get_eviction_policy(&self) -> EvictionPolicy {
    1380         2664 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
    1381         2664 :         tenant_conf
    1382         2664 :             .eviction_policy
    1383         2664 :             .unwrap_or(self.conf.default_tenant_conf.eviction_policy)
    1384         2664 :     }
    1385              : 
    1386         1654 :     fn get_evictions_low_residence_duration_metric_threshold(
    1387         1654 :         tenant_conf: &TenantConfOpt,
    1388         1654 :         default_tenant_conf: &TenantConf,
    1389         1654 :     ) -> Duration {
    1390         1654 :         tenant_conf
    1391         1654 :             .evictions_low_residence_duration_metric_threshold
    1392         1654 :             .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
    1393         1654 :     }
    1394              : 
    1395        12981 :     fn get_gc_feedback(&self) -> bool {
    1396        12981 :         let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
    1397        12981 :         tenant_conf
    1398        12981 :             .gc_feedback
    1399        12981 :             .unwrap_or(self.conf.default_tenant_conf.gc_feedback)
    1400        12981 :     }
    1401              : 
    1402           62 :     pub(super) fn tenant_conf_updated(&self) {
    1403           62 :         // NB: Most tenant conf options are read by background loops, so,
    1404           62 :         // changes will automatically be picked up.
    1405           62 : 
    1406           62 :         // The threshold is embedded in the metric. So, we need to update it.
    1407           62 :         {
    1408           62 :             let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
    1409           62 :                 &self.tenant_conf.read().unwrap().tenant_conf,
    1410           62 :                 &self.conf.default_tenant_conf,
    1411           62 :             );
    1412           62 : 
    1413           62 :             let tenant_id_str = self.tenant_shard_id.tenant_id.to_string();
    1414           62 :             let shard_id_str = format!("{}", self.tenant_shard_id.shard_slug());
    1415           62 : 
    1416           62 :             let timeline_id_str = self.timeline_id.to_string();
    1417           62 :             self.metrics
    1418           62 :                 .evictions_with_low_residence_duration
    1419           62 :                 .write()
    1420           62 :                 .unwrap()
    1421           62 :                 .change_threshold(
    1422           62 :                     &tenant_id_str,
    1423           62 :                     &shard_id_str,
    1424           62 :                     &timeline_id_str,
    1425           62 :                     new_threshold,
    1426           62 :                 );
    1427           62 :         }
    1428           62 :     }
    1429              : 
    1430              :     /// Open a Timeline handle.
    1431              :     ///
    1432              :     /// Loads the metadata for the timeline into memory, but not the layer map.
    1433              :     #[allow(clippy::too_many_arguments)]
    1434         1592 :     pub(super) fn new(
    1435         1592 :         conf: &'static PageServerConf,
    1436         1592 :         tenant_conf: Arc<RwLock<AttachedTenantConf>>,
    1437         1592 :         metadata: &TimelineMetadata,
    1438         1592 :         ancestor: Option<Arc<Timeline>>,
    1439         1592 :         timeline_id: TimelineId,
    1440         1592 :         tenant_shard_id: TenantShardId,
    1441         1592 :         generation: Generation,
    1442         1592 :         shard_identity: ShardIdentity,
    1443         1592 :         walredo_mgr: Option<Arc<super::WalRedoManager>>,
    1444         1592 :         resources: TimelineResources,
    1445         1592 :         pg_version: u32,
    1446         1592 :         state: TimelineState,
    1447         1592 :         cancel: CancellationToken,
    1448         1592 :     ) -> Arc<Self> {
    1449         1592 :         let disk_consistent_lsn = metadata.disk_consistent_lsn();
    1450         1592 :         let (state, _) = watch::channel(state);
    1451         1592 : 
    1452         1592 :         let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
    1453         1592 :         let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
    1454         1592 : 
    1455         1592 :         let tenant_conf_guard = tenant_conf.read().unwrap();
    1456         1592 : 
    1457         1592 :         let evictions_low_residence_duration_metric_threshold =
    1458         1592 :             Self::get_evictions_low_residence_duration_metric_threshold(
    1459         1592 :                 &tenant_conf_guard.tenant_conf,
    1460         1592 :                 &conf.default_tenant_conf,
    1461         1592 :             );
    1462         1592 :         drop(tenant_conf_guard);
    1463         1592 : 
    1464         1592 :         Arc::new_cyclic(|myself| {
    1465         1592 :             let mut result = Timeline {
    1466         1592 :                 conf,
    1467         1592 :                 tenant_conf,
    1468         1592 :                 myself: myself.clone(),
    1469         1592 :                 timeline_id,
    1470         1592 :                 tenant_shard_id,
    1471         1592 :                 generation,
    1472         1592 :                 shard_identity,
    1473         1592 :                 pg_version,
    1474         1592 :                 layers: Default::default(),
    1475         1592 :                 wanted_image_layers: Mutex::new(None),
    1476         1592 : 
    1477         1592 :                 walredo_mgr,
    1478         1592 :                 walreceiver: Mutex::new(None),
    1479         1592 : 
    1480         1592 :                 remote_client: resources.remote_client.map(Arc::new),
    1481         1592 : 
    1482         1592 :                 // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
    1483         1592 :                 last_record_lsn: SeqWait::new(RecordLsn {
    1484         1592 :                     last: disk_consistent_lsn,
    1485         1592 :                     prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
    1486         1592 :                 }),
    1487         1592 :                 disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
    1488         1592 : 
    1489         1592 :                 last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
    1490         1592 :                 last_freeze_ts: RwLock::new(Instant::now()),
    1491         1592 : 
    1492         1592 :                 loaded_at: (disk_consistent_lsn, SystemTime::now()),
    1493         1592 : 
    1494         1592 :                 ancestor_timeline: ancestor,
    1495         1592 :                 ancestor_lsn: metadata.ancestor_lsn(),
    1496         1592 : 
    1497         1592 :                 metrics: TimelineMetrics::new(
    1498         1592 :                     &tenant_shard_id,
    1499         1592 :                     &timeline_id,
    1500         1592 :                     crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
    1501         1592 :                         "mtime",
    1502         1592 :                         evictions_low_residence_duration_metric_threshold,
    1503         1592 :                     ),
    1504         1592 :                 ),
    1505         1592 : 
    1506         1592 :                 query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new(
    1507         1592 :                     &tenant_shard_id,
    1508         1592 :                     &timeline_id,
    1509         1592 :                 ),
    1510         1592 : 
    1511        11144 :                 directory_metrics: array::from_fn(|_| AtomicU64::new(0)),
    1512         1592 : 
    1513         1592 :                 flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
    1514         1592 : 
    1515         1592 :                 layer_flush_start_tx,
    1516         1592 :                 layer_flush_done_tx,
    1517         1592 : 
    1518         1592 :                 write_lock: tokio::sync::Mutex::new(()),
    1519         1592 : 
    1520         1592 :                 gc_info: std::sync::RwLock::new(GcInfo {
    1521         1592 :                     retain_lsns: Vec::new(),
    1522         1592 :                     horizon_cutoff: Lsn(0),
    1523         1592 :                     pitr_cutoff: Lsn(0),
    1524         1592 :                 }),
    1525         1592 : 
    1526         1592 :                 latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
    1527         1592 :                 initdb_lsn: metadata.initdb_lsn(),
    1528         1592 : 
    1529         1592 :                 current_logical_size: if disk_consistent_lsn.is_valid() {
    1530              :                     // we're creating timeline data with some layer files existing locally,
    1531              :                     // need to recalculate timeline's logical size based on data in the layers.
    1532          907 :                     LogicalSize::deferred_initial(disk_consistent_lsn)
    1533              :                 } else {
    1534              :                     // we're creating timeline data without any layers existing locally,
    1535              :                     // initial logical size is 0.
    1536          685 :                     LogicalSize::empty_initial()
    1537              :                 },
    1538         1592 :                 partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
    1539         1592 :                 repartition_threshold: 0,
    1540         1592 : 
    1541         1592 :                 last_received_wal: Mutex::new(None),
    1542         1592 :                 rel_size_cache: RwLock::new(HashMap::new()),
    1543         1592 : 
    1544         1592 :                 download_all_remote_layers_task_info: RwLock::new(None),
    1545         1592 : 
    1546         1592 :                 state,
    1547         1592 : 
    1548         1592 :                 eviction_task_timeline_state: tokio::sync::Mutex::new(
    1549         1592 :                     EvictionTaskTimelineState::default(),
    1550         1592 :                 ),
    1551         1592 :                 delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
    1552         1592 : 
    1553         1592 :                 cancel,
    1554         1592 :                 gate: Gate::default(),
    1555         1592 : 
    1556         1592 :                 compaction_lock: tokio::sync::Mutex::default(),
    1557         1592 :                 gc_lock: tokio::sync::Mutex::default(),
    1558         1592 :             };
    1559         1592 :             result.repartition_threshold =
    1560         1592 :                 result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
    1561         1592 :             result
    1562         1592 :                 .metrics
    1563         1592 :                 .last_record_gauge
    1564         1592 :                 .set(disk_consistent_lsn.0 as i64);
    1565         1592 :             result
    1566         1592 :         })
    1567         1592 :     }
    1568              : 
    1569         2245 :     pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
    1570         2245 :         let Ok(guard) = self.gate.enter() else {
    1571            0 :             info!("cannot start flush loop when the timeline gate has already been closed");
    1572            0 :             return;
    1573              :         };
    1574         2245 :         let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
    1575         2245 :         match *flush_loop_state {
    1576         1573 :             FlushLoopState::NotStarted => (),
    1577              :             FlushLoopState::Running { .. } => {
    1578          672 :                 info!(
    1579          672 :                     "skipping attempt to start flush_loop twice {}/{}",
    1580          672 :                     self.tenant_shard_id, self.timeline_id
    1581          672 :                 );
    1582          672 :                 return;
    1583              :             }
    1584              :             FlushLoopState::Exited => {
    1585            0 :                 warn!(
    1586            0 :                     "ignoring attempt to restart exited flush_loop {}/{}",
    1587            0 :                     self.tenant_shard_id, self.timeline_id
    1588            0 :                 );
    1589            0 :                 return;
    1590              :             }
    1591              :         }
    1592              : 
    1593         1573 :         let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
    1594         1573 :         let self_clone = Arc::clone(self);
    1595         1573 : 
    1596         1573 :         debug!("spawning flush loop");
    1597         1573 :         *flush_loop_state = FlushLoopState::Running {
    1598         1573 :             #[cfg(test)]
    1599         1573 :             expect_initdb_optimization: false,
    1600         1573 :             #[cfg(test)]
    1601         1573 :             initdb_optimization_count: 0,
    1602         1573 :         };
    1603         1573 :         task_mgr::spawn(
    1604         1573 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    1605         1573 :             task_mgr::TaskKind::LayerFlushTask,
    1606         1573 :             Some(self.tenant_shard_id),
    1607         1573 :             Some(self.timeline_id),
    1608         1573 :             "layer flush task",
    1609              :             false,
    1610         1573 :             async move {
    1611         1573 :                 let _guard = guard;
    1612         1573 :                 let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
    1613        31609 :                 self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
    1614          592 :                 let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
    1615          592 :                 assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
    1616          592 :                 *flush_loop_state  = FlushLoopState::Exited;
    1617          592 :                 Ok(())
    1618          592 :             }
    1619         1573 :             .instrument(info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
    1620              :         );
    1621         2245 :     }
    1622              : 
    1623              :     /// Creates and starts the wal receiver.
    1624              :     ///
    1625              :     /// This function is expected to be called at most once per Timeline's lifecycle
    1626              :     /// when the timeline is activated.
    1627         1257 :     fn launch_wal_receiver(
    1628         1257 :         self: &Arc<Self>,
    1629         1257 :         ctx: &RequestContext,
    1630         1257 :         broker_client: BrokerClientChannel,
    1631         1257 :     ) {
    1632         1257 :         info!(
    1633         1257 :             "launching WAL receiver for timeline {} of tenant {}",
    1634         1257 :             self.timeline_id, self.tenant_shard_id
    1635         1257 :         );
    1636              : 
    1637         1257 :         let tenant_conf_guard = self.tenant_conf.read().unwrap();
    1638         1257 :         let wal_connect_timeout = tenant_conf_guard
    1639         1257 :             .tenant_conf
    1640         1257 :             .walreceiver_connect_timeout
    1641         1257 :             .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
    1642         1257 :         let lagging_wal_timeout = tenant_conf_guard
    1643         1257 :             .tenant_conf
    1644         1257 :             .lagging_wal_timeout
    1645         1257 :             .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
    1646         1257 :         let max_lsn_wal_lag = tenant_conf_guard
    1647         1257 :             .tenant_conf
    1648         1257 :             .max_lsn_wal_lag
    1649         1257 :             .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
    1650         1257 :         drop(tenant_conf_guard);
    1651         1257 : 
    1652         1257 :         let mut guard = self.walreceiver.lock().unwrap();
    1653         1257 :         assert!(
    1654         1257 :             guard.is_none(),
    1655            0 :             "multiple launches / re-launches of WAL receiver are not supported"
    1656              :         );
    1657         1257 :         *guard = Some(WalReceiver::start(
    1658         1257 :             Arc::clone(self),
    1659         1257 :             WalReceiverConf {
    1660         1257 :                 wal_connect_timeout,
    1661         1257 :                 lagging_wal_timeout,
    1662         1257 :                 max_lsn_wal_lag,
    1663         1257 :                 auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
    1664         1257 :                 availability_zone: self.conf.availability_zone.clone(),
    1665         1257 :                 ingest_batch_size: self.conf.ingest_batch_size,
    1666         1257 :             },
    1667         1257 :             broker_client,
    1668         1257 :             ctx,
    1669         1257 :         ));
    1670         1257 :     }
    1671              : 
    1672              :     /// Initialize with an empty layer map. Used when creating a new timeline.
    1673         1149 :     pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
    1674         1149 :         let mut layers = self.layers.try_write().expect(
    1675         1149 :             "in the context where we call this function, no other task has access to the object",
    1676         1149 :         );
    1677         1149 :         layers.initialize_empty(Lsn(start_lsn.0));
    1678         1149 :     }
    1679              : 
    1680              :     /// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
    1681              :     /// files.
    1682          431 :     pub(super) async fn load_layer_map(
    1683          431 :         &self,
    1684          431 :         disk_consistent_lsn: Lsn,
    1685          431 :         index_part: Option<IndexPart>,
    1686          431 :     ) -> anyhow::Result<()> {
    1687              :         use init::{Decision::*, Discovered, DismissedLayer};
    1688              :         use LayerFileName::*;
    1689              : 
    1690          431 :         let mut guard = self.layers.write().await;
    1691              : 
    1692          431 :         let timer = self.metrics.load_layer_map_histo.start_timer();
    1693          431 : 
    1694          431 :         // Scan timeline directory and create ImageFileName and DeltaFilename
    1695          431 :         // structs representing all files on disk
    1696          431 :         let timeline_path = self
    1697          431 :             .conf
    1698          431 :             .timeline_path(&self.tenant_shard_id, &self.timeline_id);
    1699          431 :         let conf = self.conf;
    1700          431 :         let span = tracing::Span::current();
    1701          431 : 
    1702          431 :         // Copy to move into the task we're about to spawn
    1703          431 :         let generation = self.generation;
    1704          431 :         let shard = self.get_shard_index();
    1705          431 :         let this = self.myself.upgrade().expect("&self method holds the arc");
    1706              : 
    1707          431 :         let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
    1708          431 :             move || {
    1709          431 :                 let _g = span.entered();
    1710          431 :                 let discovered = init::scan_timeline_dir(&timeline_path)?;
    1711          431 :                 let mut discovered_layers = Vec::with_capacity(discovered.len());
    1712          431 :                 let mut unrecognized_files = Vec::new();
    1713          431 : 
    1714          431 :                 let mut path = timeline_path;
    1715              : 
    1716        13545 :                 for discovered in discovered {
    1717        13114 :                     let (name, kind) = match discovered {
    1718        12632 :                         Discovered::Layer(file_name, file_size) => {
    1719        12632 :                             discovered_layers.push((file_name, file_size));
    1720        12632 :                             continue;
    1721              :                         }
    1722              :                         Discovered::Metadata | Discovered::IgnoredBackup => {
    1723          431 :                             continue;
    1724              :                         }
    1725            0 :                         Discovered::Unknown(file_name) => {
    1726            0 :                             // we will later error if there are any
    1727            0 :                             unrecognized_files.push(file_name);
    1728            0 :                             continue;
    1729              :                         }
    1730           47 :                         Discovered::Ephemeral(name) => (name, "old ephemeral file"),
    1731            1 :                         Discovered::Temporary(name) => (name, "temporary timeline file"),
    1732            3 :                         Discovered::TemporaryDownload(name) => (name, "temporary download"),
    1733              :                     };
    1734           51 :                     path.push(Utf8Path::new(&name));
    1735           51 :                     init::cleanup(&path, kind)?;
    1736           51 :                     path.pop();
    1737              :                 }
    1738              : 
    1739          431 :                 if !unrecognized_files.is_empty() {
    1740              :                     // assume that if there are any there are many many.
    1741            0 :                     let n = unrecognized_files.len();
    1742            0 :                     let first = &unrecognized_files[..n.min(10)];
    1743            0 :                     anyhow::bail!(
    1744            0 :                         "unrecognized files in timeline dir (total {n}), first 10: {first:?}"
    1745            0 :                     );
    1746          431 :                 }
    1747          431 : 
    1748          431 :                 let decided = init::reconcile(
    1749          431 :                     discovered_layers,
    1750          431 :                     index_part.as_ref(),
    1751          431 :                     disk_consistent_lsn,
    1752          431 :                     generation,
    1753          431 :                     shard,
    1754          431 :                 );
    1755          431 : 
    1756          431 :                 let mut loaded_layers = Vec::new();
    1757          431 :                 let mut needs_cleanup = Vec::new();
    1758          431 :                 let mut total_physical_size = 0;
    1759              : 
    1760        53963 :                 for (name, decision) in decided {
    1761        53532 :                     let decision = match decision {
    1762        11635 :                         Ok(UseRemote { local, remote }) => {
    1763        11635 :                             // Remote is authoritative, but we may still choose to retain
    1764        11635 :                             // the local file if the contents appear to match
    1765        11635 :                             if local.file_size() == remote.file_size() {
    1766              :                                 // Use the local file, but take the remote metadata so that we pick up
    1767              :                                 // the correct generation.
    1768        11634 :                                 UseLocal(remote)
    1769              :                             } else {
    1770            1 :                                 path.push(name.file_name());
    1771            1 :                                 init::cleanup_local_file_for_remote(&path, &local, &remote)?;
    1772            1 :                                 path.pop();
    1773            1 :                                 UseRemote { local, remote }
    1774              :                             }
    1775              :                         }
    1776        41409 :                         Ok(decision) => decision,
    1777            2 :                         Err(DismissedLayer::Future { local }) => {
    1778            2 :                             if local.is_some() {
    1779            0 :                                 path.push(name.file_name());
    1780            0 :                                 init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
    1781            0 :                                 path.pop();
    1782            2 :                             }
    1783            2 :                             needs_cleanup.push(name);
    1784            2 :                             continue;
    1785              :                         }
    1786          486 :                         Err(DismissedLayer::LocalOnly(local)) => {
    1787          486 :                             path.push(name.file_name());
    1788          486 :                             init::cleanup_local_only_file(&path, &name, &local)?;
    1789          486 :                             path.pop();
    1790          486 :                             // this file never existed remotely, we will have to do rework
    1791          486 :                             continue;
    1792              :                         }
    1793              :                     };
    1794              : 
    1795        53044 :                     match &name {
    1796        22921 :                         Delta(d) => assert!(d.lsn_range.end <= disk_consistent_lsn + 1),
    1797        30123 :                         Image(i) => assert!(i.lsn <= disk_consistent_lsn),
    1798              :                     }
    1799              : 
    1800        53044 :                     tracing::debug!(layer=%name, ?decision, "applied");
    1801              : 
    1802        53044 :                     let layer = match decision {
    1803        12145 :                         UseLocal(m) => {
    1804        12145 :                             total_physical_size += m.file_size();
    1805        12145 :                             Layer::for_resident(conf, &this, name, m).drop_eviction_guard()
    1806              :                         }
    1807        40898 :                         Evicted(remote) | UseRemote { remote, .. } => {
    1808        40899 :                             Layer::for_evicted(conf, &this, name, remote)
    1809              :                         }
    1810              :                     };
    1811              : 
    1812        53044 :                     loaded_layers.push(layer);
    1813              :                 }
    1814          431 :                 Ok((loaded_layers, needs_cleanup, total_physical_size))
    1815          431 :             }
    1816          431 :         })
    1817          423 :         .await
    1818          431 :         .map_err(anyhow::Error::new)
    1819          431 :         .and_then(|x| x)?;
    1820              : 
    1821          431 :         let num_layers = loaded_layers.len();
    1822          431 : 
    1823          431 :         guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
    1824              : 
    1825          431 :         if let Some(rtc) = self.remote_client.as_ref() {
    1826          431 :             rtc.schedule_layer_file_deletion(&needs_cleanup)?;
    1827          431 :             rtc.schedule_index_upload_for_file_changes()?;
    1828              :             // This barrier orders above DELETEs before any later operations.
    1829              :             // This is critical because code executing after the barrier might
    1830              :             // create again objects with the same key that we just scheduled for deletion.
    1831              :             // For example, if we just scheduled deletion of an image layer "from the future",
    1832              :             // later compaction might run again and re-create the same image layer.
    1833              :             // "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
    1834              :             // "same" here means same key range and LSN.
    1835              :             //
    1836              :             // Without a barrier between above DELETEs and the re-creation's PUTs,
    1837              :             // the upload queue may execute the PUT first, then the DELETE.
    1838              :             // In our example, we will end up with an IndexPart referencing a non-existent object.
    1839              :             //
    1840              :             // 1. a future image layer is created and uploaded
    1841              :             // 2. ps restart
    1842              :             // 3. the future layer from (1) is deleted during load layer map
    1843              :             // 4. image layer is re-created and uploaded
    1844              :             // 5. deletion queue would like to delete (1) but actually deletes (4)
    1845              :             // 6. delete by name works as expected, but it now deletes the wrong (later) version
    1846              :             //
    1847              :             // See https://github.com/neondatabase/neon/issues/5878
    1848              :             //
    1849              :             // NB: generation numbers naturally protect against this because they disambiguate
    1850              :             //     (1) and (4)
    1851          431 :             rtc.schedule_barrier()?;
    1852              :             // Tenant::create_timeline will wait for these uploads to happen before returning, or
    1853              :             // on retry.
    1854            0 :         }
    1855              : 
    1856          431 :         info!(
    1857          431 :             "loaded layer map with {} layers at {}, total physical size: {}",
    1858          431 :             num_layers, disk_consistent_lsn, total_physical_size
    1859          431 :         );
    1860              : 
    1861          431 :         timer.stop_and_record();
    1862          431 :         Ok(())
    1863          431 :     }
    1864              : 
    1865              :     /// Retrieve current logical size of the timeline.
    1866              :     ///
    1867              :     /// The size could be lagging behind the actual number, in case
    1868              :     /// the initial size calculation has not been run (gets triggered on the first size access).
    1869              :     ///
    1870              :     /// return size and boolean flag that shows if the size is exact
    1871       665329 :     pub(crate) fn get_current_logical_size(
    1872       665329 :         self: &Arc<Self>,
    1873       665329 :         priority: GetLogicalSizePriority,
    1874       665329 :         ctx: &RequestContext,
    1875       665329 :     ) -> logical_size::CurrentLogicalSize {
    1876       665329 :         if !self.tenant_shard_id.is_zero() {
    1877              :             // Logical size is only accurately maintained on shard zero: when called elsewhere, for example
    1878              :             // when HTTP API is serving a GET for timeline zero, return zero
    1879          189 :             return logical_size::CurrentLogicalSize::Approximate(logical_size::Approximate::zero());
    1880       665140 :         }
    1881       665140 : 
    1882       665140 :         let current_size = self.current_logical_size.current_size();
    1883       665140 :         debug!("Current size: {current_size:?}");
    1884              : 
    1885       665140 :         match (current_size.accuracy(), priority) {
    1886       664356 :             (logical_size::Accuracy::Exact, _) => (), // nothing to do
    1887          159 :             (logical_size::Accuracy::Approximate, GetLogicalSizePriority::Background) => {
    1888          159 :                 // background task will eventually deliver an exact value, we're in no rush
    1889          159 :             }
    1890              :             (logical_size::Accuracy::Approximate, GetLogicalSizePriority::User) => {
    1891              :                 // background task is not ready, but user is asking for it now;
    1892              :                 // => make the background task skip the line
    1893              :                 // (The alternative would be to calculate the size here, but,
    1894              :                 //  it can actually take a long time if the user has a lot of rels.
    1895              :                 //  And we'll inevitable need it again; So, let the background task do the work.)
    1896          625 :                 match self
    1897          625 :                     .current_logical_size
    1898          625 :                     .cancel_wait_for_background_loop_concurrency_limit_semaphore
    1899          625 :                     .get()
    1900              :                 {
    1901          625 :                     Some(cancel) => cancel.cancel(),
    1902              :                     None => {
    1903            0 :                         let state = self.current_state();
    1904            0 :                         if matches!(
    1905            0 :                             state,
    1906              :                             TimelineState::Broken { .. } | TimelineState::Stopping
    1907            0 :                         ) {
    1908            0 : 
    1909            0 :                             // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
    1910            0 :                             // Don't make noise.
    1911            0 :                         } else {
    1912            0 :                             warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work");
    1913              :                         }
    1914              :                     }
    1915              :                 };
    1916              :             }
    1917              :         }
    1918              : 
    1919       665140 :         if let CurrentLogicalSize::Approximate(_) = &current_size {
    1920          784 :             if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler {
    1921          353 :                 let first = self
    1922          353 :                     .current_logical_size
    1923          353 :                     .did_return_approximate_to_walreceiver
    1924          353 :                     .compare_exchange(
    1925          353 :                         false,
    1926          353 :                         true,
    1927          353 :                         AtomicOrdering::Relaxed,
    1928          353 :                         AtomicOrdering::Relaxed,
    1929          353 :                     )
    1930          353 :                     .is_ok();
    1931          353 :                 if first {
    1932           57 :                     crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc();
    1933          296 :                 }
    1934          431 :             }
    1935       664356 :         }
    1936              : 
    1937       665140 :         current_size
    1938       665329 :     }
    1939              : 
    1940         1206 :     fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &RequestContext) {
    1941         1206 :         let Some(initial_part_end) = self.current_logical_size.initial_part_end else {
    1942              :             // nothing to do for freshly created timelines;
    1943          572 :             assert_eq!(
    1944          572 :                 self.current_logical_size.current_size().accuracy(),
    1945          572 :                 logical_size::Accuracy::Exact,
    1946          572 :             );
    1947          572 :             self.current_logical_size.initialized.add_permits(1);
    1948          572 :             return;
    1949              :         };
    1950              : 
    1951          634 :         let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new();
    1952          634 :         let token = cancel_wait_for_background_loop_concurrency_limit_semaphore.clone();
    1953          634 :         self.current_logical_size
    1954          634 :             .cancel_wait_for_background_loop_concurrency_limit_semaphore.set(token)
    1955          634 :             .expect("initial logical size calculation task must be spawned exactly once per Timeline object");
    1956          634 : 
    1957          634 :         let self_clone = Arc::clone(self);
    1958          634 :         let background_ctx = ctx.detached_child(
    1959          634 :             TaskKind::InitialLogicalSizeCalculation,
    1960          634 :             DownloadBehavior::Download,
    1961          634 :         );
    1962          634 :         task_mgr::spawn(
    1963          634 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    1964          634 :             task_mgr::TaskKind::InitialLogicalSizeCalculation,
    1965          634 :             Some(self.tenant_shard_id),
    1966          634 :             Some(self.timeline_id),
    1967          634 :             "initial size calculation",
    1968              :             false,
    1969              :             // NB: don't log errors here, task_mgr will do that.
    1970          634 :             async move {
    1971          634 :                 let cancel = task_mgr::shutdown_token();
    1972          634 :                 self_clone
    1973          634 :                     .initial_logical_size_calculation_task(
    1974          634 :                         initial_part_end,
    1975          634 :                         cancel_wait_for_background_loop_concurrency_limit_semaphore,
    1976          634 :                         cancel,
    1977          634 :                         background_ctx,
    1978          634 :                     )
    1979        73466 :                     .await;
    1980          626 :                 Ok(())
    1981          626 :             }
    1982          634 :             .instrument(info_span!(parent: None, "initial_size_calculation", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id)),
    1983              :         );
    1984         1206 :     }
    1985              : 
    1986          634 :     async fn initial_logical_size_calculation_task(
    1987          634 :         self: Arc<Self>,
    1988          634 :         initial_part_end: Lsn,
    1989          634 :         skip_concurrency_limiter: CancellationToken,
    1990          634 :         cancel: CancellationToken,
    1991          634 :         background_ctx: RequestContext,
    1992          634 :     ) {
    1993          626 :         scopeguard::defer! {
    1994          626 :             // Irrespective of the outcome of this operation, we should unblock anyone waiting for it.
    1995          626 :             self.current_logical_size.initialized.add_permits(1);
    1996          626 :         }
    1997              : 
    1998              :         enum BackgroundCalculationError {
    1999              :             Cancelled,
    2000              :             Other(anyhow::Error),
    2001              :         }
    2002              : 
    2003          634 :         let try_once = |attempt: usize| {
    2004          634 :             let background_ctx = &background_ctx;
    2005          634 :             let self_ref = &self;
    2006          634 :             let skip_concurrency_limiter = &skip_concurrency_limiter;
    2007          634 :             async move {
    2008          634 :                 let cancel = task_mgr::shutdown_token();
    2009          634 :                 let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
    2010          634 :                     BackgroundLoopKind::InitialLogicalSizeCalculation,
    2011          634 :                     background_ctx,
    2012          634 :                 );
    2013              : 
    2014              :                 use crate::metrics::initial_logical_size::StartCircumstances;
    2015          634 :                 let (_maybe_permit, circumstances) = tokio::select! {
    2016          214 :                     permit = wait_for_permit => {
    2017              :                         (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit)
    2018              :                     }
    2019              :                     _ = self_ref.cancel.cancelled() => {
    2020              :                         return Err(BackgroundCalculationError::Cancelled);
    2021              :                     }
    2022              :                     _ = cancel.cancelled() => {
    2023              :                         return Err(BackgroundCalculationError::Cancelled);
    2024              :                     },
    2025              :                     () = skip_concurrency_limiter.cancelled() => {
    2026              :                         // Some action that is part of a end user interaction requested logical size
    2027              :                         // => break out of the rate limit
    2028              :                         // TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime;
    2029              :                         // but then again what happens if they cancel; also, we should just be using
    2030              :                         // one runtime across the entire process, so, let's leave this for now.
    2031              :                         (None, StartCircumstances::SkippedConcurrencyLimiter)
    2032              :                     }
    2033              :                 };
    2034              : 
    2035          617 :                 let metrics_guard = if attempt == 1 {
    2036          617 :                     crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances)
    2037              :                 } else {
    2038            0 :                     crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
    2039              :                 };
    2040              : 
    2041          617 :                 match self_ref
    2042          617 :                     .logical_size_calculation_task(
    2043          617 :                         initial_part_end,
    2044          617 :                         LogicalSizeCalculationCause::Initial,
    2045          617 :                         background_ctx,
    2046          617 :                     )
    2047        72975 :                     .await
    2048              :                 {
    2049          577 :                     Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
    2050              :                     Err(CalculateLogicalSizeError::Cancelled) => {
    2051           27 :                         Err(BackgroundCalculationError::Cancelled)
    2052              :                     }
    2053            2 :                     Err(CalculateLogicalSizeError::Other(err)) => {
    2054            1 :                         if let Some(PageReconstructError::AncestorStopping(_)) =
    2055            2 :                             err.root_cause().downcast_ref()
    2056              :                         {
    2057            0 :                             Err(BackgroundCalculationError::Cancelled)
    2058              :                         } else {
    2059            2 :                             Err(BackgroundCalculationError::Other(err))
    2060              :                         }
    2061              :                     }
    2062              :                 }
    2063          623 :             }
    2064          634 :         };
    2065              : 
    2066          634 :         let retrying = async {
    2067          634 :             let mut attempt = 0;
    2068          634 :             loop {
    2069          634 :                 attempt += 1;
    2070          634 : 
    2071        73463 :                 match try_once(attempt).await {
    2072          577 :                     Ok(res) => return ControlFlow::Continue(res),
    2073           44 :                     Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
    2074            2 :                     Err(BackgroundCalculationError::Other(e)) => {
    2075            2 :                         warn!(attempt, "initial size calculation failed: {e:?}");
    2076              :                         // exponential back-off doesn't make sense at these long intervals;
    2077              :                         // use fixed retry interval with generous jitter instead
    2078            2 :                         let sleep_duration = Duration::from_secs(
    2079            2 :                             u64::try_from(
    2080            2 :                                 // 1hour base
    2081            2 :                                 (60_i64 * 60_i64)
    2082            2 :                                     // 10min jitter
    2083            2 :                                     + rand::thread_rng().gen_range(-10 * 60..10 * 60),
    2084            2 :                             )
    2085            2 :                             .expect("10min < 1hour"),
    2086            2 :                         );
    2087            2 :                         tokio::time::sleep(sleep_duration).await;
    2088              :                     }
    2089              :                 }
    2090              :             }
    2091          621 :         };
    2092              : 
    2093          634 :         let (calculated_size, metrics_guard) = tokio::select! {
    2094          621 :             res = retrying  => {
    2095              :                 match res {
    2096              :                     ControlFlow::Continue(calculated_size) => calculated_size,
    2097              :                     ControlFlow::Break(()) => return,
    2098              :                 }
    2099              :             }
    2100              :             _ = cancel.cancelled() => {
    2101              :                 return;
    2102              :             }
    2103              :         };
    2104              : 
    2105              :         // we cannot query current_logical_size.current_size() to know the current
    2106              :         // *negative* value, only truncated to u64.
    2107          577 :         let added = self
    2108          577 :             .current_logical_size
    2109          577 :             .size_added_after_initial
    2110          577 :             .load(AtomicOrdering::Relaxed);
    2111          577 : 
    2112          577 :         let sum = calculated_size.saturating_add_signed(added);
    2113          577 : 
    2114          577 :         // set the gauge value before it can be set in `update_current_logical_size`.
    2115          577 :         self.metrics.current_logical_size_gauge.set(sum);
    2116          577 : 
    2117          577 :         self.current_logical_size
    2118          577 :             .initial_logical_size
    2119          577 :             .set((calculated_size, metrics_guard.calculation_result_saved()))
    2120          577 :             .ok()
    2121          577 :             .expect("only this task sets it");
    2122          626 :     }
    2123              : 
    2124           36 :     pub(crate) fn spawn_ondemand_logical_size_calculation(
    2125           36 :         self: &Arc<Self>,
    2126           36 :         lsn: Lsn,
    2127           36 :         cause: LogicalSizeCalculationCause,
    2128           36 :         ctx: RequestContext,
    2129           36 :     ) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
    2130           36 :         let (sender, receiver) = oneshot::channel();
    2131           36 :         let self_clone = Arc::clone(self);
    2132           36 :         // XXX if our caller loses interest, i.e., ctx is cancelled,
    2133           36 :         // we should stop the size calculation work and return an error.
    2134           36 :         // That would require restructuring this function's API to
    2135           36 :         // return the result directly, instead of a Receiver for the result.
    2136           36 :         let ctx = ctx.detached_child(
    2137           36 :             TaskKind::OndemandLogicalSizeCalculation,
    2138           36 :             DownloadBehavior::Download,
    2139           36 :         );
    2140           36 :         task_mgr::spawn(
    2141           36 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    2142           36 :             task_mgr::TaskKind::OndemandLogicalSizeCalculation,
    2143           36 :             Some(self.tenant_shard_id),
    2144           36 :             Some(self.timeline_id),
    2145           36 :             "ondemand logical size calculation",
    2146           36 :             false,
    2147           36 :             async move {
    2148           36 :                 let res = self_clone
    2149           36 :                     .logical_size_calculation_task(lsn, cause, &ctx)
    2150         2106 :                     .await;
    2151           36 :                 let _ = sender.send(res).ok();
    2152           36 :                 Ok(()) // Receiver is responsible for handling errors
    2153           36 :             }
    2154           36 :             .in_current_span(),
    2155           36 :         );
    2156           36 :         receiver
    2157           36 :     }
    2158              : 
    2159              :     /// # Cancel-Safety
    2160              :     ///
    2161              :     /// This method is cancellation-safe.
    2162         1306 :     #[instrument(skip_all)]
    2163              :     async fn logical_size_calculation_task(
    2164              :         self: &Arc<Self>,
    2165              :         lsn: Lsn,
    2166              :         cause: LogicalSizeCalculationCause,
    2167              :         ctx: &RequestContext,
    2168              :     ) -> Result<u64, CalculateLogicalSizeError> {
    2169              :         crate::span::debug_assert_current_span_has_tenant_and_timeline_id();
    2170              :         // We should never be calculating logical sizes on shard !=0, because these shards do not have
    2171              :         // accurate relation sizes, and they do not emit consumption metrics.
    2172              :         debug_assert!(self.tenant_shard_id.is_zero());
    2173              : 
    2174              :         let _guard = self.gate.enter();
    2175              : 
    2176              :         let self_calculation = Arc::clone(self);
    2177              : 
    2178          653 :         let mut calculation = pin!(async {
    2179          653 :             let ctx = ctx.attached_child();
    2180          653 :             self_calculation
    2181          653 :                 .calculate_logical_size(lsn, cause, &ctx)
    2182        75084 :                 .await
    2183          642 :         });
    2184              : 
    2185        75730 :         tokio::select! {
    2186          641 :             res = &mut calculation => { res }
    2187              :             _ = self.cancel.cancelled() => {
    2188            0 :                 debug!("cancelling logical size calculation for timeline shutdown");
    2189              :                 calculation.await
    2190              :             }
    2191              :             _ = task_mgr::shutdown_watcher() => {
    2192            0 :                 debug!("cancelling logical size calculation for task shutdown");
    2193              :                 calculation.await
    2194              :             }
    2195              :         }
    2196              :     }
    2197              : 
    2198              :     /// Calculate the logical size of the database at the latest LSN.
    2199              :     ///
    2200              :     /// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
    2201              :     /// especially if we need to download remote layers.
    2202              :     ///
    2203              :     /// # Cancel-Safety
    2204              :     ///
    2205              :     /// This method is cancellation-safe.
    2206          660 :     async fn calculate_logical_size(
    2207          660 :         &self,
    2208          660 :         up_to_lsn: Lsn,
    2209          660 :         cause: LogicalSizeCalculationCause,
    2210          660 :         ctx: &RequestContext,
    2211          660 :     ) -> Result<u64, CalculateLogicalSizeError> {
    2212          660 :         info!(
    2213          660 :             "Calculating logical size for timeline {} at {}",
    2214          660 :             self.timeline_id, up_to_lsn
    2215          660 :         );
    2216              :         // These failpoints are used by python tests to ensure that we don't delete
    2217              :         // the timeline while the logical size computation is ongoing.
    2218              :         // The first failpoint is used to make this function pause.
    2219              :         // Then the python test initiates timeline delete operation in a thread.
    2220              :         // It waits for a few seconds, then arms the second failpoint and disables
    2221              :         // the first failpoint. The second failpoint prints an error if the timeline
    2222              :         // delete code has deleted the on-disk state while we're still running here.
    2223              :         // It shouldn't do that. If it does it anyway, the error will be caught
    2224              :         // by the test suite, highlighting the problem.
    2225            0 :         fail::fail_point!("timeline-calculate-logical-size-pause");
    2226          660 :         fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
    2227            2 :             if !self
    2228            2 :                 .conf
    2229            2 :                 .metadata_path(&self.tenant_shard_id, &self.timeline_id)
    2230            2 :                 .exists()
    2231              :             {
    2232            0 :                 error!("timeline-calculate-logical-size-pre metadata file does not exist")
    2233            2 :             }
    2234              :             // need to return something
    2235            2 :             Ok(0)
    2236          660 :         });
    2237              :         // See if we've already done the work for initial size calculation.
    2238              :         // This is a short-cut for timelines that are mostly unused.
    2239          658 :         if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
    2240            6 :             return Ok(size);
    2241          652 :         }
    2242          652 :         let storage_time_metrics = match cause {
    2243              :             LogicalSizeCalculationCause::Initial
    2244              :             | LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
    2245          639 :             | LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
    2246              :             LogicalSizeCalculationCause::EvictionTaskImitation => {
    2247           13 :                 &self.metrics.imitate_logical_size_histo
    2248              :             }
    2249              :         };
    2250          652 :         let timer = storage_time_metrics.start_timer();
    2251          652 :         let logical_size = self
    2252          652 :             .get_current_logical_size_non_incremental(up_to_lsn, ctx)
    2253        76220 :             .await?;
    2254            0 :         debug!("calculated logical size: {logical_size}");
    2255          611 :         timer.stop_and_record();
    2256          611 :         Ok(logical_size)
    2257          649 :     }
    2258              : 
    2259              :     /// Update current logical size, adding `delta' to the old value.
    2260       640015 :     fn update_current_logical_size(&self, delta: i64) {
    2261       640015 :         let logical_size = &self.current_logical_size;
    2262       640015 :         logical_size.increment_size(delta);
    2263       640015 : 
    2264       640015 :         // Also set the value in the prometheus gauge. Note that
    2265       640015 :         // there is a race condition here: if this is is called by two
    2266       640015 :         // threads concurrently, the prometheus gauge might be set to
    2267       640015 :         // one value while current_logical_size is set to the
    2268       640015 :         // other.
    2269       640015 :         match logical_size.current_size() {
    2270       639929 :             CurrentLogicalSize::Exact(ref new_current_size) => self
    2271       639929 :                 .metrics
    2272       639929 :                 .current_logical_size_gauge
    2273       639929 :                 .set(new_current_size.into()),
    2274           86 :             CurrentLogicalSize::Approximate(_) => {
    2275           86 :                 // don't update the gauge yet, this allows us not to update the gauge back and
    2276           86 :                 // forth between the initial size calculation task.
    2277           86 :             }
    2278              :         }
    2279       640015 :     }
    2280              : 
    2281       730583 :     pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: u64) {
    2282       730583 :         self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed);
    2283       730583 :         let aux_metric =
    2284       730583 :             self.directory_metrics[DirectoryKind::AuxFiles.offset()].load(AtomicOrdering::Relaxed);
    2285       730583 : 
    2286       730583 :         let sum_of_entries = self
    2287       730583 :             .directory_metrics
    2288       730583 :             .iter()
    2289      5114081 :             .map(|v| v.load(AtomicOrdering::Relaxed))
    2290       730583 :             .sum();
    2291       730583 :         // Set a high general threshold and a lower threshold for the auxiliary files,
    2292       730583 :         // as we can have large numbers of relations in the db directory.
    2293       730583 :         const SUM_THRESHOLD: u64 = 5000;
    2294       730583 :         const AUX_THRESHOLD: u64 = 1000;
    2295       730583 :         if sum_of_entries >= SUM_THRESHOLD || aux_metric >= AUX_THRESHOLD {
    2296            0 :             self.metrics
    2297            0 :                 .directory_entries_count_gauge
    2298            0 :                 .set(sum_of_entries);
    2299       730583 :         } else if let Some(metric) = Lazy::get(&self.metrics.directory_entries_count_gauge) {
    2300            0 :             metric.set(sum_of_entries);
    2301       730583 :         }
    2302       730583 :     }
    2303              : 
    2304         2208 :     async fn find_layer(&self, layer_file_name: &str) -> Option<Layer> {
    2305         2208 :         let guard = self.layers.read().await;
    2306       772744 :         for historic_layer in guard.layer_map().iter_historic_layers() {
    2307       772744 :             let historic_layer_name = historic_layer.filename().file_name();
    2308       772744 :             if layer_file_name == historic_layer_name {
    2309         2208 :                 return Some(guard.get_from_desc(&historic_layer));
    2310       770536 :             }
    2311              :         }
    2312              : 
    2313            0 :         None
    2314         2208 :     }
    2315              : 
    2316              :     /// The timeline heatmap is a hint to secondary locations from the primary location,
    2317              :     /// indicating which layers are currently on-disk on the primary.
    2318              :     ///
    2319              :     /// None is returned if the Timeline is in a state where uploading a heatmap
    2320              :     /// doesn't make sense, such as shutting down or initializing.  The caller
    2321              :     /// should treat this as a cue to simply skip doing any heatmap uploading
    2322              :     /// for this timeline.
    2323           12 :     pub(crate) async fn generate_heatmap(&self) -> Option<HeatMapTimeline> {
    2324              :         // no point in heatmaps without remote client
    2325           12 :         let _remote_client = self.remote_client.as_ref()?;
    2326              : 
    2327           12 :         if !self.is_active() {
    2328            0 :             return None;
    2329           12 :         }
    2330              : 
    2331           12 :         let guard = self.layers.read().await;
    2332              : 
    2333         2515 :         let resident = guard.resident_layers().map(|layer| {
    2334         2515 :             let last_activity_ts = layer.access_stats().latest_activity_or_now();
    2335         2515 : 
    2336         2515 :             HeatMapLayer::new(
    2337         2515 :                 layer.layer_desc().filename(),
    2338         2515 :                 layer.metadata().into(),
    2339         2515 :                 last_activity_ts,
    2340         2515 :             )
    2341         2515 :         });
    2342              : 
    2343           12 :         let layers = resident.collect().await;
    2344              : 
    2345           12 :         Some(HeatMapTimeline::new(self.timeline_id, layers))
    2346           12 :     }
    2347              : }
    2348              : 
    2349              : type TraversalId = String;
    2350              : 
    2351              : trait TraversalLayerExt {
    2352              :     fn traversal_id(&self) -> TraversalId;
    2353              : }
    2354              : 
    2355              : impl TraversalLayerExt for Layer {
    2356         1429 :     fn traversal_id(&self) -> TraversalId {
    2357         1429 :         self.local_path().to_string()
    2358         1429 :     }
    2359              : }
    2360              : 
    2361              : impl TraversalLayerExt for Arc<InMemoryLayer> {
    2362          346 :     fn traversal_id(&self) -> TraversalId {
    2363          346 :         format!("timeline {} in-memory {self}", self.get_timeline_id())
    2364          346 :     }
    2365              : }
    2366              : 
    2367              : impl Timeline {
    2368              :     ///
    2369              :     /// Get a handle to a Layer for reading.
    2370              :     ///
    2371              :     /// The returned Layer might be from an ancestor timeline, if the
    2372              :     /// segment hasn't been updated on this timeline yet.
    2373              :     ///
    2374              :     /// This function takes the current timeline's locked LayerMap as an argument,
    2375              :     /// so callers can avoid potential race conditions.
    2376              :     ///
    2377              :     /// # Cancel-Safety
    2378              :     ///
    2379              :     /// This method is cancellation-safe.
    2380      7322321 :     async fn get_reconstruct_data(
    2381      7322321 :         &self,
    2382      7322321 :         key: Key,
    2383      7322321 :         request_lsn: Lsn,
    2384      7322321 :         reconstruct_state: &mut ValueReconstructState,
    2385      7322321 :         ctx: &RequestContext,
    2386      7322329 :     ) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
    2387      7322329 :         // Start from the current timeline.
    2388      7322329 :         let mut timeline_owned;
    2389      7322329 :         let mut timeline = self;
    2390      7322329 : 
    2391      7322329 :         let mut read_count = scopeguard::guard(0, |cnt| {
    2392      7322321 :             crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
    2393      7322329 :         });
    2394      7322329 : 
    2395      7322329 :         // For debugging purposes, collect the path of layers that we traversed
    2396      7322329 :         // through. It's included in the error message if we fail to find the key.
    2397      7322329 :         let mut traversal_path = Vec::<TraversalPathItem>::new();
    2398              : 
    2399      7322329 :         let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
    2400      1710871 :             *cached_lsn
    2401              :         } else {
    2402      5611458 :             Lsn(0)
    2403              :         };
    2404              : 
    2405              :         // 'prev_lsn' tracks the last LSN that we were at in our search. It's used
    2406              :         // to check that each iteration make some progress, to break infinite
    2407              :         // looping if something goes wrong.
    2408      7322329 :         let mut prev_lsn = Lsn(u64::MAX);
    2409      7322329 : 
    2410      7322329 :         let mut result = ValueReconstructResult::Continue;
    2411      7322329 :         let mut cont_lsn = Lsn(request_lsn.0 + 1);
    2412              : 
    2413     31648875 :         'outer: loop {
    2414     31648875 :             if self.cancel.is_cancelled() {
    2415           39 :                 return Err(PageReconstructError::Cancelled);
    2416     31648836 :             }
    2417     31648836 : 
    2418     31648836 :             // The function should have updated 'state'
    2419     31648836 :             //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
    2420     31648836 :             match result {
    2421      5643396 :                 ValueReconstructResult::Complete => return Ok(traversal_path),
    2422              :                 ValueReconstructResult::Continue => {
    2423              :                     // If we reached an earlier cached page image, we're done.
    2424     26005433 :                     if cont_lsn == cached_lsn + 1 {
    2425      1677581 :                         MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
    2426      1677581 :                         return Ok(traversal_path);
    2427     24327852 :                     }
    2428     24327852 :                     if prev_lsn <= cont_lsn {
    2429              :                         // Didn't make any progress in last iteration. Error out to avoid
    2430              :                         // getting stuck in the loop.
    2431         1268 :                         return Err(layer_traversal_error(format!(
    2432         1268 :                             "could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
    2433         1268 :                             key,
    2434         1268 :                             Lsn(cont_lsn.0 - 1),
    2435         1268 :                             request_lsn,
    2436         1268 :                             timeline.ancestor_lsn
    2437         1268 :                         ), traversal_path));
    2438     24326584 :                     }
    2439     24326584 :                     prev_lsn = cont_lsn;
    2440              :                 }
    2441              :                 ValueReconstructResult::Missing => {
    2442              :                     return Err(layer_traversal_error(
    2443            7 :                         if cfg!(test) {
    2444            6 :                             format!(
    2445            6 :                                 "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}\n{}",
    2446            6 :                                 key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
    2447            6 :                             )
    2448              :                         } else {
    2449            1 :                             format!(
    2450            1 :                                 "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}",
    2451            1 :                                 key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn
    2452            1 :                             )
    2453              :                         },
    2454            7 :                         traversal_path,
    2455              :                     ));
    2456              :                 }
    2457              :             }
    2458              : 
    2459              :             // Recurse into ancestor if needed
    2460     24326584 :             if is_inherited_key(key) && Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
    2461            0 :                 trace!(
    2462            0 :                     "going into ancestor {}, cont_lsn is {}",
    2463            0 :                     timeline.ancestor_lsn,
    2464            0 :                     cont_lsn
    2465            0 :                 );
    2466              : 
    2467      1312450 :                 timeline_owned = timeline.get_ready_ancestor_timeline(ctx).await?;
    2468      1312446 :                 timeline = &*timeline_owned;
    2469      1312446 :                 prev_lsn = Lsn(u64::MAX);
    2470      1312446 :                 continue 'outer;
    2471     23014133 :             }
    2472              : 
    2473     23014133 :             let guard = timeline.layers.read().await;
    2474     23014132 :             let layers = guard.layer_map();
    2475              : 
    2476              :             // Check the open and frozen in-memory layers first, in order from newest
    2477              :             // to oldest.
    2478     23014132 :             if let Some(open_layer) = &layers.open_layer {
    2479     17878268 :                 let start_lsn = open_layer.get_lsn_range().start;
    2480     17878268 :                 if cont_lsn > start_lsn {
    2481              :                     //info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
    2482              :                     // Get all the data needed to reconstruct the page version from this layer.
    2483              :                     // But if we have an older cached page image, no need to go past that.
    2484      5733008 :                     let lsn_floor = max(cached_lsn + 1, start_lsn);
    2485      5733008 :                     result = match open_layer
    2486      5733008 :                         .get_value_reconstruct_data(
    2487      5733008 :                             key,
    2488      5733008 :                             lsn_floor..cont_lsn,
    2489      5733008 :                             reconstruct_state,
    2490      5733008 :                             ctx,
    2491      5733008 :                         )
    2492       667848 :                         .await
    2493              :                     {
    2494      5733008 :                         Ok(result) => result,
    2495            0 :                         Err(e) => return Err(PageReconstructError::from(e)),
    2496              :                     };
    2497      5733008 :                     cont_lsn = lsn_floor;
    2498      5733008 :                     // metrics: open_layer does not count as fs access, so we are not updating `read_count`
    2499      5733008 :                     traversal_path.push((
    2500      5733008 :                         result,
    2501      5733008 :                         cont_lsn,
    2502      5733008 :                         Box::new({
    2503      5733008 :                             let open_layer = Arc::clone(open_layer);
    2504      5733008 :                             move || open_layer.traversal_id()
    2505      5733008 :                         }),
    2506      5733008 :                     ));
    2507      5733008 :                     continue 'outer;
    2508     12145260 :                 }
    2509      5135864 :             }
    2510     17281124 :             for frozen_layer in layers.frozen_layers.iter().rev() {
    2511      1281763 :                 let start_lsn = frozen_layer.get_lsn_range().start;
    2512      1281763 :                 if cont_lsn > start_lsn {
    2513              :                     //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
    2514       292140 :                     let lsn_floor = max(cached_lsn + 1, start_lsn);
    2515       292140 :                     result = match frozen_layer
    2516       292140 :                         .get_value_reconstruct_data(
    2517       292140 :                             key,
    2518       292140 :                             lsn_floor..cont_lsn,
    2519       292140 :                             reconstruct_state,
    2520       292140 :                             ctx,
    2521       292140 :                         )
    2522        23211 :                         .await
    2523              :                     {
    2524       292140 :                         Ok(result) => result,
    2525            0 :                         Err(e) => return Err(PageReconstructError::from(e)),
    2526              :                     };
    2527       292140 :                     cont_lsn = lsn_floor;
    2528       292140 :                     // metrics: open_layer does not count as fs access, so we are not updating `read_count`
    2529       292140 :                     traversal_path.push((
    2530       292140 :                         result,
    2531       292140 :                         cont_lsn,
    2532       292140 :                         Box::new({
    2533       292140 :                             let frozen_layer = Arc::clone(frozen_layer);
    2534       292140 :                             move || frozen_layer.traversal_id()
    2535       292140 :                         }),
    2536       292140 :                     ));
    2537       292140 :                     continue 'outer;
    2538       989623 :                 }
    2539              :             }
    2540              : 
    2541     16988984 :             if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
    2542     16843804 :                 let layer = guard.get_from_desc(&layer);
    2543     16843804 :                 // Get all the data needed to reconstruct the page version from this layer.
    2544     16843804 :                 // But if we have an older cached page image, no need to go past that.
    2545     16843804 :                 let lsn_floor = max(cached_lsn + 1, lsn_floor);
    2546     16843804 :                 result = match layer
    2547     16843804 :                     .get_value_reconstruct_data(key, lsn_floor..cont_lsn, reconstruct_state, ctx)
    2548      1021995 :                     .await
    2549              :                 {
    2550     16843772 :                     Ok(result) => result,
    2551           21 :                     Err(e) => return Err(PageReconstructError::from(e)),
    2552              :                 };
    2553     16843772 :                 cont_lsn = lsn_floor;
    2554     16843772 :                 *read_count += 1;
    2555     16843772 :                 traversal_path.push((
    2556     16843772 :                     result,
    2557     16843772 :                     cont_lsn,
    2558     16843772 :                     Box::new({
    2559     16843772 :                         let layer = layer.to_owned();
    2560     16843772 :                         move || layer.traversal_id()
    2561     16843772 :                     }),
    2562     16843772 :                 ));
    2563     16843772 :                 continue 'outer;
    2564       145180 :             } else if timeline.ancestor_timeline.is_some() {
    2565              :                 // Nothing on this timeline. Traverse to parent
    2566       145177 :                 result = ValueReconstructResult::Continue;
    2567       145177 :                 cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
    2568       145177 :                 continue 'outer;
    2569              :             } else {
    2570              :                 // Nothing found
    2571            3 :                 result = ValueReconstructResult::Missing;
    2572            3 :                 continue 'outer;
    2573              :             }
    2574              :         }
    2575      7322316 :     }
    2576              : 
    2577              :     /// # Cancel-safety
    2578              :     ///
    2579              :     /// This method is cancellation-safe.
    2580      7398626 :     async fn lookup_cached_page(
    2581      7398626 :         &self,
    2582      7398626 :         key: &Key,
    2583      7398626 :         lsn: Lsn,
    2584      7398626 :         ctx: &RequestContext,
    2585      7398634 :     ) -> Option<(Lsn, Bytes)> {
    2586      7398634 :         let cache = page_cache::get();
    2587              : 
    2588              :         // FIXME: It's pointless to check the cache for things that are not 8kB pages.
    2589              :         // We should look at the key to determine if it's a cacheable object
    2590      7398634 :         let (lsn, read_guard) = cache
    2591      7398634 :             .lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
    2592      5611458 :             .await?;
    2593      1787176 :         let img = Bytes::from(read_guard.to_vec());
    2594      1787176 :         Some((lsn, img))
    2595      7398634 :     }
    2596              : 
    2597      1312449 :     async fn get_ready_ancestor_timeline(
    2598      1312449 :         &self,
    2599      1312449 :         ctx: &RequestContext,
    2600      1312450 :     ) -> Result<Arc<Timeline>, GetReadyAncestorError> {
    2601      1312450 :         let ancestor = match self.get_ancestor_timeline() {
    2602      1312450 :             Ok(timeline) => timeline,
    2603            0 :             Err(e) => return Err(GetReadyAncestorError::from(e)),
    2604              :         };
    2605              : 
    2606              :         // It's possible that the ancestor timeline isn't active yet, or
    2607              :         // is active but hasn't yet caught up to the branch point. Wait
    2608              :         // for it.
    2609              :         //
    2610              :         // This cannot happen while the pageserver is running normally,
    2611              :         // because you cannot create a branch from a point that isn't
    2612              :         // present in the pageserver yet. However, we don't wait for the
    2613              :         // branch point to be uploaded to cloud storage before creating
    2614              :         // a branch. I.e., the branch LSN need not be remote consistent
    2615              :         // for the branching operation to succeed.
    2616              :         //
    2617              :         // Hence, if we try to load a tenant in such a state where
    2618              :         // 1. the existence of the branch was persisted (in IndexPart and/or locally)
    2619              :         // 2. but the ancestor state is behind branch_lsn because it was not yet persisted
    2620              :         // then we will need to wait for the ancestor timeline to
    2621              :         // re-stream WAL up to branch_lsn before we access it.
    2622              :         //
    2623              :         // How can a tenant get in such a state?
    2624              :         // - ungraceful pageserver process exit
    2625              :         // - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
    2626              :         //
    2627              :         // NB: this could be avoided by requiring
    2628              :         //   branch_lsn >= remote_consistent_lsn
    2629              :         // during branch creation.
    2630      1312450 :         match ancestor.wait_to_become_active(ctx).await {
    2631      1312446 :             Ok(()) => {}
    2632              :             Err(TimelineState::Stopping) => {
    2633            2 :                 return Err(GetReadyAncestorError::AncestorStopping(
    2634            2 :                     ancestor.timeline_id,
    2635            2 :                 ));
    2636              :             }
    2637            2 :             Err(state) => {
    2638            2 :                 return Err(GetReadyAncestorError::Other(anyhow::anyhow!(
    2639            2 :                     "Timeline {} will not become active. Current state: {:?}",
    2640            2 :                     ancestor.timeline_id,
    2641            2 :                     &state,
    2642            2 :                 )));
    2643              :             }
    2644              :         }
    2645      1312446 :         ancestor
    2646      1312446 :             .wait_lsn(self.ancestor_lsn, ctx)
    2647            5 :             .await
    2648      1312446 :             .map_err(|e| match e {
    2649            0 :                 e @ WaitLsnError::Timeout(_) => GetReadyAncestorError::AncestorLsnTimeout(e),
    2650            0 :                 WaitLsnError::Shutdown => GetReadyAncestorError::Cancelled,
    2651            0 :                 e @ WaitLsnError::BadState => GetReadyAncestorError::Other(anyhow::anyhow!(e)),
    2652      1312446 :             })?;
    2653              : 
    2654      1312446 :         Ok(ancestor)
    2655      1312450 :     }
    2656              : 
    2657      1312449 :     fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
    2658      1312449 :         let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
    2659            0 :             format!(
    2660            0 :                 "Ancestor is missing. Timeline id: {} Ancestor id {:?}",
    2661            0 :                 self.timeline_id,
    2662            0 :                 self.get_ancestor_timeline_id(),
    2663            0 :             )
    2664      1312449 :         })?;
    2665      1312449 :         Ok(Arc::clone(ancestor))
    2666      1312449 :     }
    2667              : 
    2668      1084703 :     pub(crate) fn get_shard_identity(&self) -> &ShardIdentity {
    2669      1084703 :         &self.shard_identity
    2670      1084703 :     }
    2671              : 
    2672              :     ///
    2673              :     /// Get a handle to the latest layer for appending.
    2674              :     ///
    2675      2907667 :     async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
    2676      2907667 :         let mut guard = self.layers.write().await;
    2677      2907667 :         let layer = guard
    2678      2907667 :             .get_layer_for_write(
    2679      2907667 :                 lsn,
    2680      2907667 :                 self.get_last_record_lsn(),
    2681      2907667 :                 self.conf,
    2682      2907667 :                 self.timeline_id,
    2683      2907667 :                 self.tenant_shard_id,
    2684      2907667 :             )
    2685          307 :             .await?;
    2686      2907667 :         Ok(layer)
    2687      2907667 :     }
    2688              : 
    2689      1214102 :     async fn put_value(
    2690      1214102 :         &self,
    2691      1214102 :         key: Key,
    2692      1214102 :         lsn: Lsn,
    2693      1214102 :         val: &Value,
    2694      1214102 :         ctx: &RequestContext,
    2695      1214102 :     ) -> anyhow::Result<()> {
    2696              :         //info!("PUT: key {} at {}", key, lsn);
    2697      1214102 :         let layer = self.get_layer_for_write(lsn).await?;
    2698      1214102 :         layer.put_value(key, lsn, val, ctx).await?;
    2699      1214102 :         Ok(())
    2700      1214102 :     }
    2701              : 
    2702      1674304 :     async fn put_values(
    2703      1674304 :         &self,
    2704      1674304 :         values: &HashMap<Key, Vec<(Lsn, Value)>>,
    2705      1674304 :         ctx: &RequestContext,
    2706      1674304 :     ) -> anyhow::Result<()> {
    2707              :         // Pick the first LSN in the batch to get the layer to write to.
    2708      1674304 :         for lsns in values.values() {
    2709      1674304 :             if let Some((lsn, _)) = lsns.first() {
    2710      1674304 :                 let layer = self.get_layer_for_write(*lsn).await?;
    2711      1674304 :                 layer.put_values(values, ctx).await?;
    2712      1674303 :                 break;
    2713            0 :             }
    2714              :         }
    2715      1674303 :         Ok(())
    2716      1674303 :     }
    2717              : 
    2718        19261 :     async fn put_tombstones(&self, tombstones: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
    2719        19261 :         if let Some((_, lsn)) = tombstones.first() {
    2720        19261 :             let layer = self.get_layer_for_write(*lsn).await?;
    2721        19261 :             layer.put_tombstones(tombstones).await?;
    2722            0 :         }
    2723        19261 :         Ok(())
    2724        19261 :     }
    2725              : 
    2726     76004085 :     pub(crate) fn finish_write(&self, new_lsn: Lsn) {
    2727     76004085 :         assert!(new_lsn.is_aligned());
    2728              : 
    2729     76004085 :         self.metrics.last_record_gauge.set(new_lsn.0 as i64);
    2730     76004085 :         self.last_record_lsn.advance(new_lsn);
    2731     76004085 :     }
    2732              : 
    2733         5410 :     async fn freeze_inmem_layer(&self, write_lock_held: bool) {
    2734              :         // Freeze the current open in-memory layer. It will be written to disk on next
    2735              :         // iteration.
    2736         5410 :         let _write_guard = if write_lock_held {
    2737         3504 :             None
    2738              :         } else {
    2739         1906 :             Some(self.write_lock.lock().await)
    2740              :         };
    2741         5410 :         let mut guard = self.layers.write().await;
    2742         5410 :         guard
    2743         5410 :             .try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at)
    2744            6 :             .await;
    2745         5410 :     }
    2746              : 
    2747              :     /// Layer flusher task's main loop.
    2748         1573 :     async fn flush_loop(
    2749         1573 :         self: &Arc<Self>,
    2750         1573 :         mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
    2751         1573 :         ctx: &RequestContext,
    2752         1573 :     ) {
    2753         1573 :         info!("started flush loop");
    2754              :         loop {
    2755         6110 :             tokio::select! {
    2756        10845 :                 _ = self.cancel.cancelled() => {
    2757        10845 :                     info!("shutting down layer flush task");
    2758        10845 :                     break;
    2759        10845 :                 },
    2760        10845 :                 _ = task_mgr::shutdown_watcher() => {
    2761        10845 :                     info!("shutting down layer flush task");
    2762        10845 :                     break;
    2763        10845 :                 },
    2764        10845 :                 _ = layer_flush_start_rx.changed() => {}
    2765        10845 :             }
    2766              : 
    2767            0 :             trace!("waking up");
    2768         4544 :             let timer = self.metrics.flush_time_histo.start_timer();
    2769         4544 :             let flush_counter = *layer_flush_start_rx.borrow();
    2770         9717 :             let result = loop {
    2771         9717 :                 if self.cancel.is_cancelled() {
    2772            0 :                     info!("dropping out of flush loop for timeline shutdown");
    2773              :                     // Note: we do not bother transmitting into [`layer_flush_done_tx`], because
    2774              :                     // anyone waiting on that will respect self.cancel as well: they will stop
    2775              :                     // waiting at the same time we as drop out of this loop.
    2776            0 :                     return;
    2777         9717 :                 }
    2778              : 
    2779         9717 :                 let layer_to_flush = {
    2780         9717 :                     let guard = self.layers.read().await;
    2781         9717 :                     guard.layer_map().frozen_layers.front().cloned()
    2782              :                     // drop 'layers' lock to allow concurrent reads and writes
    2783              :                 };
    2784         9717 :                 let Some(layer_to_flush) = layer_to_flush else {
    2785         4537 :                     break Ok(());
    2786              :                 };
    2787        26672 :                 match self.flush_frozen_layer(layer_to_flush, ctx).await {
    2788         5173 :                     Ok(()) => {}
    2789              :                     Err(FlushLayerError::Cancelled) => {
    2790            1 :                         info!("dropping out of flush loop for timeline shutdown");
    2791            1 :                         return;
    2792              :                     }
    2793            0 :                     err @ Err(
    2794              :                         FlushLayerError::Other(_) | FlushLayerError::CreateImageLayersError(_),
    2795              :                     ) => {
    2796            0 :                         error!("could not flush frozen layer: {err:?}");
    2797            0 :                         break err;
    2798              :                     }
    2799              :                 }
    2800              :             };
    2801              :             // Notify any listeners that we're done
    2802         4537 :             let _ = self
    2803         4537 :                 .layer_flush_done_tx
    2804         4537 :                 .send_replace((flush_counter, result));
    2805         4537 : 
    2806         4537 :             timer.stop_and_record();
    2807              :         }
    2808          592 :     }
    2809              : 
    2810         1906 :     async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
    2811         1906 :         let mut rx = self.layer_flush_done_tx.subscribe();
    2812         1906 : 
    2813         1906 :         // Increment the flush cycle counter and wake up the flush task.
    2814         1906 :         // Remember the new value, so that when we listen for the flush
    2815         1906 :         // to finish, we know when the flush that we initiated has
    2816         1906 :         // finished, instead of some other flush that was started earlier.
    2817         1906 :         let mut my_flush_request = 0;
    2818         1906 : 
    2819         1906 :         let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
    2820         1906 :         if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
    2821           45 :             anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
    2822         1861 :         }
    2823         1861 : 
    2824         1861 :         self.layer_flush_start_tx.send_modify(|counter| {
    2825         1861 :             my_flush_request = *counter + 1;
    2826         1861 :             *counter = my_flush_request;
    2827         1861 :         });
    2828              : 
    2829         3721 :         loop {
    2830         3721 :             {
    2831         3721 :                 let (last_result_counter, last_result) = &*rx.borrow();
    2832         3721 :                 if *last_result_counter >= my_flush_request {
    2833         1860 :                     if let Err(_err) = last_result {
    2834              :                         // We already logged the original error in
    2835              :                         // flush_loop. We cannot propagate it to the caller
    2836              :                         // here, because it might not be Cloneable
    2837            0 :                         anyhow::bail!(
    2838            0 :                             "Could not flush frozen layer. Request id: {}",
    2839            0 :                             my_flush_request
    2840            0 :                         );
    2841              :                     } else {
    2842         1860 :                         return Ok(());
    2843              :                     }
    2844         1861 :                 }
    2845              :             }
    2846            0 :             trace!("waiting for flush to complete");
    2847         1861 :             tokio::select! {
    2848         1860 :                 rx_e = rx.changed() => {
    2849              :                     rx_e?;
    2850              :                 },
    2851              :                 // Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring
    2852              :                 // the notification from [`flush_loop`] that it completed.
    2853              :                 _ = self.cancel.cancelled() => {
    2854            1 :                     tracing::info!("Cancelled layer flush due on timeline shutdown");
    2855              :                     return Ok(())
    2856              :                 }
    2857              :             };
    2858            0 :             trace!("done")
    2859              :         }
    2860         1906 :     }
    2861              : 
    2862         3504 :     fn flush_frozen_layers(&self) {
    2863         3504 :         self.layer_flush_start_tx.send_modify(|val| *val += 1);
    2864         3504 :     }
    2865              : 
    2866              :     /// Flush one frozen in-memory layer to disk, as a new delta layer.
    2867        10360 :     #[instrument(skip_all, fields(layer=%frozen_layer))]
    2868              :     async fn flush_frozen_layer(
    2869              :         self: &Arc<Self>,
    2870              :         frozen_layer: Arc<InMemoryLayer>,
    2871              :         ctx: &RequestContext,
    2872              :     ) -> Result<(), FlushLayerError> {
    2873              :         debug_assert_current_span_has_tenant_and_timeline_id();
    2874              :         // As a special case, when we have just imported an image into the repository,
    2875              :         // instead of writing out a L0 delta layer, we directly write out image layer
    2876              :         // files instead. This is possible as long as *all* the data imported into the
    2877              :         // repository have the same LSN.
    2878              :         let lsn_range = frozen_layer.get_lsn_range();
    2879              :         let (layers_to_upload, delta_layer_to_add) =
    2880              :             if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
    2881              :                 #[cfg(test)]
    2882              :                 match &mut *self.flush_loop_state.lock().unwrap() {
    2883              :                     FlushLoopState::NotStarted | FlushLoopState::Exited => {
    2884              :                         panic!("flush loop not running")
    2885              :                     }
    2886              :                     FlushLoopState::Running {
    2887              :                         initdb_optimization_count,
    2888              :                         ..
    2889              :                     } => {
    2890              :                         *initdb_optimization_count += 1;
    2891              :                     }
    2892              :                 }
    2893              :                 // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
    2894              :                 // require downloading anything during initial import.
    2895              :                 let (partitioning, _lsn) = self
    2896              :                     .repartition(
    2897              :                         self.initdb_lsn,
    2898              :                         self.get_compaction_target_size(),
    2899              :                         EnumSet::empty(),
    2900              :                         ctx,
    2901              :                     )
    2902              :                     .await?;
    2903              : 
    2904              :                 if self.cancel.is_cancelled() {
    2905              :                     return Err(FlushLayerError::Cancelled);
    2906              :                 }
    2907              : 
    2908              :                 // For image layers, we add them immediately into the layer map.
    2909              :                 (
    2910              :                     self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
    2911              :                         .await?,
    2912              :                     None,
    2913              :                 )
    2914              :             } else {
    2915              :                 #[cfg(test)]
    2916              :                 match &mut *self.flush_loop_state.lock().unwrap() {
    2917              :                     FlushLoopState::NotStarted | FlushLoopState::Exited => {
    2918              :                         panic!("flush loop not running")
    2919              :                     }
    2920              :                     FlushLoopState::Running {
    2921              :                         expect_initdb_optimization,
    2922              :                         ..
    2923              :                     } => {
    2924              :                         assert!(!*expect_initdb_optimization, "expected initdb optimization");
    2925              :                     }
    2926              :                 }
    2927              :                 // Normal case, write out a L0 delta layer file.
    2928              :                 // `create_delta_layer` will not modify the layer map.
    2929              :                 // We will remove frozen layer and add delta layer in one atomic operation later.
    2930              :                 let layer = self.create_delta_layer(&frozen_layer, ctx).await?;
    2931              :                 (
    2932              :                     // FIXME: even though we have a single image and single delta layer assumption
    2933              :                     // we push them to vec
    2934              :                     vec![layer.clone()],
    2935              :                     Some(layer),
    2936              :                 )
    2937              :             };
    2938              : 
    2939         5178 :         pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
    2940              : 
    2941              :         if self.cancel.is_cancelled() {
    2942              :             return Err(FlushLayerError::Cancelled);
    2943              :         }
    2944              : 
    2945              :         let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
    2946              :         let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
    2947              : 
    2948              :         // The new on-disk layers are now in the layer map. We can remove the
    2949              :         // in-memory layer from the map now. The flushed layer is stored in
    2950              :         // the mapping in `create_delta_layer`.
    2951              :         let metadata = {
    2952              :             let mut guard = self.layers.write().await;
    2953              : 
    2954              :             if self.cancel.is_cancelled() {
    2955              :                 return Err(FlushLayerError::Cancelled);
    2956              :             }
    2957              : 
    2958              :             guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
    2959              : 
    2960              :             if disk_consistent_lsn != old_disk_consistent_lsn {
    2961              :                 assert!(disk_consistent_lsn > old_disk_consistent_lsn);
    2962              :                 self.disk_consistent_lsn.store(disk_consistent_lsn);
    2963              : 
    2964              :                 // Schedule remote uploads that will reflect our new disk_consistent_lsn
    2965              :                 Some(self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?)
    2966              :             } else {
    2967              :                 None
    2968              :             }
    2969              :             // release lock on 'layers'
    2970              :         };
    2971              : 
    2972              :         // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
    2973              :         // a compaction can delete the file and then it won't be available for uploads any more.
    2974              :         // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
    2975              :         // race situation.
    2976              :         // See https://github.com/neondatabase/neon/issues/4526
    2977         5177 :         pausable_failpoint!("flush-frozen-pausable");
    2978              : 
    2979              :         // This failpoint is used by another test case `test_pageserver_recovery`.
    2980            0 :         fail_point!("flush-frozen-exit");
    2981              : 
    2982              :         // Update the metadata file, with new 'disk_consistent_lsn'
    2983              :         //
    2984              :         // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
    2985              :         // *all* the layers, to avoid fsyncing the file multiple times.
    2986              : 
    2987              :         // If we updated our disk_consistent_lsn, persist the updated metadata to local disk.
    2988              :         if let Some(metadata) = metadata {
    2989              :             save_metadata(
    2990              :                 self.conf,
    2991              :                 &self.tenant_shard_id,
    2992              :                 &self.timeline_id,
    2993              :                 &metadata,
    2994              :             )
    2995              :             .await
    2996              :             .context("save_metadata")?;
    2997              :         }
    2998              :         Ok(())
    2999              :     }
    3000              : 
    3001              :     /// Update metadata file
    3002         5197 :     fn schedule_uploads(
    3003         5197 :         &self,
    3004         5197 :         disk_consistent_lsn: Lsn,
    3005         5197 :         layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
    3006         5197 :     ) -> anyhow::Result<TimelineMetadata> {
    3007         5197 :         // We can only save a valid 'prev_record_lsn' value on disk if we
    3008         5197 :         // flushed *all* in-memory changes to disk. We only track
    3009         5197 :         // 'prev_record_lsn' in memory for the latest processed record, so we
    3010         5197 :         // don't remember what the correct value that corresponds to some old
    3011         5197 :         // LSN is. But if we flush everything, then the value corresponding
    3012         5197 :         // current 'last_record_lsn' is correct and we can store it on disk.
    3013         5197 :         let RecordLsn {
    3014         5197 :             last: last_record_lsn,
    3015         5197 :             prev: prev_record_lsn,
    3016         5197 :         } = self.last_record_lsn.load();
    3017         5197 :         let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
    3018         1576 :             Some(prev_record_lsn)
    3019              :         } else {
    3020         3621 :             None
    3021              :         };
    3022              : 
    3023         5197 :         let ancestor_timeline_id = self
    3024         5197 :             .ancestor_timeline
    3025         5197 :             .as_ref()
    3026         5197 :             .map(|ancestor| ancestor.timeline_id);
    3027         5197 : 
    3028         5197 :         let metadata = TimelineMetadata::new(
    3029         5197 :             disk_consistent_lsn,
    3030         5197 :             ondisk_prev_record_lsn,
    3031         5197 :             ancestor_timeline_id,
    3032         5197 :             self.ancestor_lsn,
    3033         5197 :             *self.latest_gc_cutoff_lsn.read(),
    3034         5197 :             self.initdb_lsn,
    3035         5197 :             self.pg_version,
    3036         5197 :         );
    3037         5197 : 
    3038         5197 :         fail_point!("checkpoint-before-saving-metadata", |x| bail!(
    3039            0 :             "{}",
    3040            0 :             x.unwrap()
    3041         5197 :         ));
    3042              : 
    3043         5197 :         if let Some(remote_client) = &self.remote_client {
    3044        10374 :             for layer in layers_to_upload {
    3045         5177 :                 remote_client.schedule_layer_file_upload(layer)?;
    3046              :             }
    3047         5197 :             remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
    3048            0 :         }
    3049              : 
    3050         5197 :         Ok(metadata)
    3051         5197 :     }
    3052              : 
    3053           20 :     async fn update_metadata_file(
    3054           20 :         &self,
    3055           20 :         disk_consistent_lsn: Lsn,
    3056           20 :         layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
    3057           20 :     ) -> anyhow::Result<()> {
    3058           20 :         let metadata = self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?;
    3059              : 
    3060           20 :         save_metadata(
    3061           20 :             self.conf,
    3062           20 :             &self.tenant_shard_id,
    3063           20 :             &self.timeline_id,
    3064           20 :             &metadata,
    3065           20 :         )
    3066           20 :         .await
    3067           20 :         .context("save_metadata")?;
    3068              : 
    3069           20 :         Ok(())
    3070           20 :     }
    3071              : 
    3072            2 :     pub(crate) async fn preserve_initdb_archive(&self) -> anyhow::Result<()> {
    3073            2 :         if let Some(remote_client) = &self.remote_client {
    3074            2 :             remote_client
    3075            2 :                 .preserve_initdb_archive(
    3076            2 :                     &self.tenant_shard_id.tenant_id,
    3077            2 :                     &self.timeline_id,
    3078            2 :                     &self.cancel,
    3079            2 :                 )
    3080            2 :                 .await?;
    3081              :         } else {
    3082            0 :             bail!("No remote storage configured, but was asked to backup the initdb archive for {} / {}", self.tenant_shard_id.tenant_id, self.timeline_id);
    3083              :         }
    3084            2 :         Ok(())
    3085            2 :     }
    3086              : 
    3087              :     // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
    3088              :     // in layer map immediately. The caller is responsible to put it into the layer map.
    3089         5100 :     async fn create_delta_layer(
    3090         5100 :         self: &Arc<Self>,
    3091         5100 :         frozen_layer: &Arc<InMemoryLayer>,
    3092         5100 :         ctx: &RequestContext,
    3093         5100 :     ) -> anyhow::Result<ResidentLayer> {
    3094         5100 :         let span = tracing::info_span!("blocking");
    3095         5100 :         let new_delta: ResidentLayer = tokio::task::spawn_blocking({
    3096         5100 :             let self_clone = Arc::clone(self);
    3097         5100 :             let frozen_layer = Arc::clone(frozen_layer);
    3098         5100 :             let ctx = ctx.attached_child();
    3099         5100 :             move || {
    3100         5100 :                 // Write it out
    3101         5100 :                 // Keep this inside `spawn_blocking` and `Handle::current`
    3102         5100 :                 // as long as the write path is still sync and the read impl
    3103         5100 :                 // is still not fully async. Otherwise executor threads would
    3104         5100 :                 // be blocked.
    3105         5100 :                 let _g = span.entered();
    3106         5100 :                 let new_delta =
    3107         5100 :                     Handle::current().block_on(frozen_layer.write_to_disk(&self_clone, &ctx))?;
    3108         5100 :                 let new_delta_path = new_delta.local_path().to_owned();
    3109         5100 : 
    3110         5100 :                 // Sync it to disk.
    3111         5100 :                 //
    3112         5100 :                 // We must also fsync the timeline dir to ensure the directory entries for
    3113         5100 :                 // new layer files are durable.
    3114         5100 :                 //
    3115         5100 :                 // NB: timeline dir must be synced _after_ the file contents are durable.
    3116         5100 :                 // So, two separate fsyncs are required, they mustn't be batched.
    3117         5100 :                 //
    3118         5100 :                 // TODO: If we're running inside 'flush_frozen_layers' and there are multiple
    3119         5100 :                 // files to flush, the fsync overhead can be reduces as follows:
    3120         5100 :                 // 1. write them all to temporary file names
    3121         5100 :                 // 2. fsync them
    3122         5100 :                 // 3. rename to the final name
    3123         5100 :                 // 4. fsync the parent directory.
    3124         5100 :                 // Note that (1),(2),(3) today happen inside write_to_disk().
    3125         5100 :                 //
    3126         5100 :                 // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
    3127         5100 :                 par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
    3128         5100 :                 par_fsync::par_fsync(&[self_clone
    3129         5100 :                     .conf
    3130         5100 :                     .timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id)])
    3131         5100 :                 .context("fsync of timeline dir")?;
    3132              : 
    3133         5098 :                 anyhow::Ok(new_delta)
    3134         5100 :             }
    3135         5100 :         })
    3136         5098 :         .await
    3137         5098 :         .context("spawn_blocking")
    3138         5098 :         .and_then(|x| x)?;
    3139              : 
    3140         5098 :         Ok(new_delta)
    3141         5098 :     }
    3142              : 
    3143         1703 :     async fn repartition(
    3144         1703 :         &self,
    3145         1703 :         lsn: Lsn,
    3146         1703 :         partition_size: u64,
    3147         1703 :         flags: EnumSet<CompactFlags>,
    3148         1703 :         ctx: &RequestContext,
    3149         1703 :     ) -> anyhow::Result<(KeyPartitioning, Lsn)> {
    3150         1703 :         {
    3151         1703 :             let partitioning_guard = self.partitioning.lock().unwrap();
    3152         1703 :             let distance = lsn.0 - partitioning_guard.1 .0;
    3153         1703 :             if partitioning_guard.1 != Lsn(0)
    3154          979 :                 && distance <= self.repartition_threshold
    3155          805 :                 && !flags.contains(CompactFlags::ForceRepartition)
    3156              :             {
    3157            0 :                 debug!(
    3158            0 :                     distance,
    3159            0 :                     threshold = self.repartition_threshold,
    3160            0 :                     "no repartitioning needed"
    3161            0 :                 );
    3162          803 :                 return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
    3163          900 :             }
    3164              :         }
    3165       176985 :         let keyspace = self.collect_keyspace(lsn, ctx).await?;
    3166          898 :         let partitioning = keyspace.partition(partition_size);
    3167          898 : 
    3168          898 :         let mut partitioning_guard = self.partitioning.lock().unwrap();
    3169          898 :         if lsn > partitioning_guard.1 {
    3170          898 :             *partitioning_guard = (partitioning, lsn);
    3171          898 :         } else {
    3172            0 :             warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
    3173              :         }
    3174          898 :         Ok((partitioning_guard.0.clone(), partitioning_guard.1))
    3175         1702 :     }
    3176              : 
    3177              :     // Is it time to create a new image layer for the given partition?
    3178        36833 :     async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
    3179        36833 :         let threshold = self.get_image_creation_threshold();
    3180              : 
    3181        36833 :         let guard = self.layers.read().await;
    3182        36833 :         let layers = guard.layer_map();
    3183        36833 : 
    3184        36833 :         let mut max_deltas = 0;
    3185        36833 :         {
    3186        36833 :             let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
    3187        36833 :             if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
    3188         4387 :                 let img_range =
    3189         4387 :                     partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
    3190         4387 :                 if wanted.overlaps(&img_range) {
    3191              :                     //
    3192              :                     // gc_timeline only pays attention to image layers that are older than the GC cutoff,
    3193              :                     // but create_image_layers creates image layers at last-record-lsn.
    3194              :                     // So it's possible that gc_timeline wants a new image layer to be created for a key range,
    3195              :                     // but the range is already covered by image layers at more recent LSNs. Before we
    3196              :                     // create a new image layer, check if the range is already covered at more recent LSNs.
    3197            0 :                     if !layers
    3198            0 :                         .image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))
    3199              :                     {
    3200            0 :                         debug!(
    3201            0 :                             "Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
    3202            0 :                             img_range.start, img_range.end, cutoff_lsn, lsn
    3203            0 :                         );
    3204            0 :                         return true;
    3205            0 :                     }
    3206         4387 :                 }
    3207        32446 :             }
    3208              :         }
    3209              : 
    3210      2230963 :         for part_range in &partition.ranges {
    3211      2200525 :             let image_coverage = layers.image_coverage(part_range, lsn);
    3212      4051749 :             for (img_range, last_img) in image_coverage {
    3213      1857619 :                 let img_lsn = if let Some(last_img) = last_img {
    3214       356586 :                     last_img.get_lsn_range().end
    3215              :                 } else {
    3216      1501033 :                     Lsn(0)
    3217              :                 };
    3218              :                 // Let's consider an example:
    3219              :                 //
    3220              :                 // delta layer with LSN range 71-81
    3221              :                 // delta layer with LSN range 81-91
    3222              :                 // delta layer with LSN range 91-101
    3223              :                 // image layer at LSN 100
    3224              :                 //
    3225              :                 // If 'lsn' is still 100, i.e. no new WAL has been processed since the last image layer,
    3226              :                 // there's no need to create a new one. We check this case explicitly, to avoid passing
    3227              :                 // a bogus range to count_deltas below, with start > end. It's even possible that there
    3228              :                 // are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
    3229              :                 // after we read last_record_lsn, which is passed here in the 'lsn' argument.
    3230      1857619 :                 if img_lsn < lsn {
    3231      1824740 :                     let num_deltas =
    3232      1824740 :                         layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold));
    3233      1824740 : 
    3234      1824740 :                     max_deltas = max_deltas.max(num_deltas);
    3235      1824740 :                     if num_deltas >= threshold {
    3236            0 :                         debug!(
    3237            0 :                             "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
    3238            0 :                             img_range.start, img_range.end, num_deltas, img_lsn, lsn
    3239            0 :                         );
    3240         6395 :                         return true;
    3241      1818345 :                     }
    3242        32879 :                 }
    3243              :             }
    3244              :         }
    3245              : 
    3246            0 :         debug!(
    3247            0 :             max_deltas,
    3248            0 :             "none of the partitioned ranges had >= {threshold} deltas"
    3249            0 :         );
    3250        30438 :         false
    3251        36833 :     }
    3252              : 
    3253         3398 :     #[tracing::instrument(skip_all, fields(%lsn, %force))]
    3254              :     async fn create_image_layers(
    3255              :         self: &Arc<Timeline>,
    3256              :         partitioning: &KeyPartitioning,
    3257              :         lsn: Lsn,
    3258              :         force: bool,
    3259              :         ctx: &RequestContext,
    3260              :     ) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
    3261              :         let timer = self.metrics.create_images_time_histo.start_timer();
    3262              :         let mut image_layers = Vec::new();
    3263              : 
    3264              :         // We need to avoid holes between generated image layers.
    3265              :         // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
    3266              :         // image layer with hole between them. In this case such layer can not be utilized by GC.
    3267              :         //
    3268              :         // How such hole between partitions can appear?
    3269              :         // if we have relation with relid=1 and size 100 and relation with relid=2 with size 200 then result of
    3270              :         // KeySpace::partition may contain partitions <100000000..100000099> and <200000000..200000199>.
    3271              :         // If there is delta layer <100000000..300000000> then it never be garbage collected because
    3272              :         // image layers  <100000000..100000099> and <200000000..200000199> are not completely covering it.
    3273              :         let mut start = Key::MIN;
    3274              : 
    3275              :         for partition in partitioning.parts.iter() {
    3276              :             let img_range = start..partition.ranges.last().unwrap().end;
    3277              :             start = img_range.end;
    3278              :             if force || self.time_for_new_image_layer(partition, lsn).await {
    3279              :                 let mut image_layer_writer = ImageLayerWriter::new(
    3280              :                     self.conf,
    3281              :                     self.timeline_id,
    3282              :                     self.tenant_shard_id,
    3283              :                     &img_range,
    3284              :                     lsn,
    3285              :                 )
    3286              :                 .await?;
    3287              : 
    3288            0 :                 fail_point!("image-layer-writer-fail-before-finish", |_| {
    3289            0 :                     Err(CreateImageLayersError::Other(anyhow::anyhow!(
    3290            0 :                         "failpoint image-layer-writer-fail-before-finish"
    3291            0 :                     )))
    3292            0 :                 });
    3293              : 
    3294              :                 let mut key_request_accum = KeySpaceAccum::new();
    3295              :                 for range in &partition.ranges {
    3296              :                     let mut key = range.start;
    3297              :                     while key < range.end {
    3298              :                         if self.shard_identity.is_key_disposable(&key) {
    3299            0 :                             debug!(
    3300            0 :                                 "Dropping key {} during compaction (it belongs on shard {:?})",
    3301            0 :                                 key,
    3302            0 :                                 self.shard_identity.get_shard_number(&key)
    3303            0 :                             );
    3304              :                             key = key.next();
    3305              :                             continue;
    3306              :                         }
    3307              : 
    3308              :                         key_request_accum.add_key(key);
    3309              :                         if key_request_accum.size() >= Timeline::MAX_GET_VECTORED_KEYS
    3310              :                             || key.next() == range.end
    3311              :                         {
    3312              :                             let results = self
    3313              :                                 .get_vectored(
    3314              :                                     &key_request_accum.consume_keyspace().ranges,
    3315              :                                     lsn,
    3316              :                                     ctx,
    3317              :                                 )
    3318              :                                 .await?;
    3319              : 
    3320              :                             for (img_key, img) in results {
    3321              :                                 let img = match img {
    3322              :                                     Ok(img) => img,
    3323              :                                     Err(err) => {
    3324              :                                         // If we fail to reconstruct a VM or FSM page, we can zero the
    3325              :                                         // page without losing any actual user data. That seems better
    3326              :                                         // than failing repeatedly and getting stuck.
    3327              :                                         //
    3328              :                                         // We had a bug at one point, where we truncated the FSM and VM
    3329              :                                         // in the pageserver, but the Postgres didn't know about that
    3330              :                                         // and continued to generate incremental WAL records for pages
    3331              :                                         // that didn't exist in the pageserver. Trying to replay those
    3332              :                                         // WAL records failed to find the previous image of the page.
    3333              :                                         // This special case allows us to recover from that situation.
    3334              :                                         // See https://github.com/neondatabase/neon/issues/2601.
    3335              :                                         //
    3336              :                                         // Unfortunately we cannot do this for the main fork, or for
    3337              :                                         // any metadata keys, keys, as that would lead to actual data
    3338              :                                         // loss.
    3339              :                                         if is_rel_fsm_block_key(img_key)
    3340              :                                             || is_rel_vm_block_key(img_key)
    3341              :                                         {
    3342            0 :                                             warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
    3343              :                                             ZERO_PAGE.clone()
    3344              :                                         } else {
    3345              :                                             return Err(
    3346              :                                                 CreateImageLayersError::PageReconstructError(err),
    3347              :                                             );
    3348              :                                         }
    3349              :                                     }
    3350              :                                 };
    3351              : 
    3352              :                                 image_layer_writer.put_image(img_key, img).await?;
    3353              :                             }
    3354              :                         }
    3355              : 
    3356              :                         key = key.next();
    3357              :                     }
    3358              :                 }
    3359              :                 let image_layer = image_layer_writer.finish(self).await?;
    3360              :                 image_layers.push(image_layer);
    3361              :             }
    3362              :         }
    3363              :         // All layers that the GC wanted us to create have now been created.
    3364              :         //
    3365              :         // It's possible that another GC cycle happened while we were compacting, and added
    3366              :         // something new to wanted_image_layers, and we now clear that before processing it.
    3367              :         // That's OK, because the next GC iteration will put it back in.
    3368              :         *self.wanted_image_layers.lock().unwrap() = None;
    3369              : 
    3370              :         // Sync the new layer to disk before adding it to the layer map, to make sure
    3371              :         // we don't garbage collect something based on the new layer, before it has
    3372              :         // reached the disk.
    3373              :         //
    3374              :         // We must also fsync the timeline dir to ensure the directory entries for
    3375              :         // new layer files are durable
    3376              :         //
    3377              :         // Compaction creates multiple image layers. It would be better to create them all
    3378              :         // and fsync them all in parallel.
    3379              :         let all_paths = image_layers
    3380              :             .iter()
    3381         6475 :             .map(|layer| layer.local_path().to_owned())
    3382              :             .collect::<Vec<_>>();
    3383              : 
    3384              :         par_fsync::par_fsync_async(&all_paths)
    3385              :             .await
    3386              :             .context("fsync of newly created layer files")?;
    3387              : 
    3388              :         if !all_paths.is_empty() {
    3389              :             par_fsync::par_fsync_async(&[self
    3390              :                 .conf
    3391              :                 .timeline_path(&self.tenant_shard_id, &self.timeline_id)])
    3392              :             .await
    3393              :             .context("fsync of timeline dir")?;
    3394              :         }
    3395              : 
    3396              :         let mut guard = self.layers.write().await;
    3397              : 
    3398              :         // FIXME: we could add the images to be uploaded *before* returning from here, but right
    3399              :         // now they are being scheduled outside of write lock
    3400              :         guard.track_new_image_layers(&image_layers, &self.metrics);
    3401              :         drop_wlock(guard);
    3402              :         timer.stop_and_record();
    3403              : 
    3404              :         Ok(image_layers)
    3405              :     }
    3406              : 
    3407              :     /// Wait until the background initial logical size calculation is complete, or
    3408              :     /// this Timeline is shut down.  Calling this function will cause the initial
    3409              :     /// logical size calculation to skip waiting for the background jobs barrier.
    3410          257 :     pub(crate) async fn await_initial_logical_size(self: Arc<Self>) {
    3411          257 :         if let Some(await_bg_cancel) = self
    3412          257 :             .current_logical_size
    3413          257 :             .cancel_wait_for_background_loop_concurrency_limit_semaphore
    3414          257 :             .get()
    3415          244 :         {
    3416          244 :             await_bg_cancel.cancel();
    3417          244 :         } else {
    3418              :             // We should not wait if we were not able to explicitly instruct
    3419              :             // the logical size cancellation to skip the concurrency limit semaphore.
    3420              :             // TODO: this is an unexpected case.  We should restructure so that it
    3421              :             // can't happen.
    3422           13 :             tracing::info!(
    3423           13 :                 "await_initial_logical_size: can't get semaphore cancel token, skipping"
    3424           13 :             );
    3425              :         }
    3426              : 
    3427          257 :         tokio::select!(
    3428          495 :             _ = self.current_logical_size.initialized.acquire() => {},
    3429          495 :             _ = self.cancel.cancelled() => {}
    3430          495 :         )
    3431          243 :     }
    3432              : }
    3433              : 
    3434         1324 : #[derive(Default)]
    3435              : struct CompactLevel0Phase1Result {
    3436              :     new_layers: Vec<ResidentLayer>,
    3437              :     deltas_to_compact: Vec<Layer>,
    3438              : }
    3439              : 
    3440              : /// Top-level failure to compact.
    3441            0 : #[derive(Debug, thiserror::Error)]
    3442              : pub(crate) enum CompactionError {
    3443              :     #[error("The timeline or pageserver is shutting down")]
    3444              :     ShuttingDown,
    3445              :     /// Compaction cannot be done right now; page reconstruction and so on.
    3446              :     #[error(transparent)]
    3447              :     Other(#[from] anyhow::Error),
    3448              : }
    3449              : 
    3450              : #[serde_as]
    3451         4102 : #[derive(serde::Serialize)]
    3452              : struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
    3453              : 
    3454        11347 : #[derive(Default)]
    3455              : enum DurationRecorder {
    3456              :     #[default]
    3457              :     NotStarted,
    3458              :     Recorded(RecordedDuration, tokio::time::Instant),
    3459              : }
    3460              : 
    3461              : impl DurationRecorder {
    3462         3094 :     fn till_now(&self) -> DurationRecorder {
    3463         3094 :         match self {
    3464              :             DurationRecorder::NotStarted => {
    3465            0 :                 panic!("must only call on recorded measurements")
    3466              :             }
    3467         3094 :             DurationRecorder::Recorded(_, ended) => {
    3468         3094 :                 let now = tokio::time::Instant::now();
    3469         3094 :                 DurationRecorder::Recorded(RecordedDuration(now - *ended), now)
    3470         3094 :             }
    3471         3094 :         }
    3472         3094 :     }
    3473         2051 :     fn into_recorded(self) -> Option<RecordedDuration> {
    3474         2051 :         match self {
    3475            0 :             DurationRecorder::NotStarted => None,
    3476         2051 :             DurationRecorder::Recorded(recorded, _) => Some(recorded),
    3477              :         }
    3478         2051 :     }
    3479              : }
    3480              : 
    3481         1621 : #[derive(Default)]
    3482              : struct CompactLevel0Phase1StatsBuilder {
    3483              :     version: Option<u64>,
    3484              :     tenant_id: Option<TenantShardId>,
    3485              :     timeline_id: Option<TimelineId>,
    3486              :     read_lock_acquisition_micros: DurationRecorder,
    3487              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    3488              :     read_lock_held_key_sort_micros: DurationRecorder,
    3489              :     read_lock_held_prerequisites_micros: DurationRecorder,
    3490              :     read_lock_held_compute_holes_micros: DurationRecorder,
    3491              :     read_lock_drop_micros: DurationRecorder,
    3492              :     write_layer_files_micros: DurationRecorder,
    3493              :     level0_deltas_count: Option<usize>,
    3494              :     new_deltas_count: Option<usize>,
    3495              :     new_deltas_size: Option<u64>,
    3496              : }
    3497              : 
    3498          293 : #[derive(serde::Serialize)]
    3499              : struct CompactLevel0Phase1Stats {
    3500              :     version: u64,
    3501              :     tenant_id: TenantShardId,
    3502              :     timeline_id: TimelineId,
    3503              :     read_lock_acquisition_micros: RecordedDuration,
    3504              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    3505              :     read_lock_held_key_sort_micros: RecordedDuration,
    3506              :     read_lock_held_prerequisites_micros: RecordedDuration,
    3507              :     read_lock_held_compute_holes_micros: RecordedDuration,
    3508              :     read_lock_drop_micros: RecordedDuration,
    3509              :     write_layer_files_micros: RecordedDuration,
    3510              :     level0_deltas_count: usize,
    3511              :     new_deltas_count: usize,
    3512              :     new_deltas_size: u64,
    3513              : }
    3514              : 
    3515              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    3516              :     type Error = anyhow::Error;
    3517              : 
    3518          293 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    3519          293 :         Ok(Self {
    3520          293 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    3521          293 :             tenant_id: value
    3522          293 :                 .tenant_id
    3523          293 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    3524          293 :             timeline_id: value
    3525          293 :                 .timeline_id
    3526          293 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    3527          293 :             read_lock_acquisition_micros: value
    3528          293 :                 .read_lock_acquisition_micros
    3529          293 :                 .into_recorded()
    3530          293 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    3531          293 :             read_lock_held_spawn_blocking_startup_micros: value
    3532          293 :                 .read_lock_held_spawn_blocking_startup_micros
    3533          293 :                 .into_recorded()
    3534          293 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    3535          293 :             read_lock_held_key_sort_micros: value
    3536          293 :                 .read_lock_held_key_sort_micros
    3537          293 :                 .into_recorded()
    3538          293 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    3539          293 :             read_lock_held_prerequisites_micros: value
    3540          293 :                 .read_lock_held_prerequisites_micros
    3541          293 :                 .into_recorded()
    3542          293 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    3543          293 :             read_lock_held_compute_holes_micros: value
    3544          293 :                 .read_lock_held_compute_holes_micros
    3545          293 :                 .into_recorded()
    3546          293 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    3547          293 :             read_lock_drop_micros: value
    3548          293 :                 .read_lock_drop_micros
    3549          293 :                 .into_recorded()
    3550          293 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    3551          293 :             write_layer_files_micros: value
    3552          293 :                 .write_layer_files_micros
    3553          293 :                 .into_recorded()
    3554          293 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    3555          293 :             level0_deltas_count: value
    3556          293 :                 .level0_deltas_count
    3557          293 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    3558          293 :             new_deltas_count: value
    3559          293 :                 .new_deltas_count
    3560          293 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    3561          293 :             new_deltas_size: value
    3562          293 :                 .new_deltas_size
    3563          293 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    3564              :         })
    3565          293 :     }
    3566              : }
    3567              : 
    3568              : impl Timeline {
    3569              :     /// Level0 files first phase of compaction, explained in the [`Self::compact`] comment.
    3570         1621 :     async fn compact_level0_phase1(
    3571         1621 :         self: &Arc<Self>,
    3572         1621 :         guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
    3573         1621 :         mut stats: CompactLevel0Phase1StatsBuilder,
    3574         1621 :         target_file_size: u64,
    3575         1621 :         ctx: &RequestContext,
    3576         1621 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
    3577         1621 :         stats.read_lock_held_spawn_blocking_startup_micros =
    3578         1621 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
    3579         1621 :         let layers = guard.layer_map();
    3580         1621 :         let level0_deltas = layers.get_level0_deltas()?;
    3581         1621 :         let mut level0_deltas = level0_deltas
    3582         1621 :             .into_iter()
    3583         7263 :             .map(|x| guard.get_from_desc(&x))
    3584         1621 :             .collect_vec();
    3585         1621 :         stats.level0_deltas_count = Some(level0_deltas.len());
    3586         1621 :         // Only compact if enough layers have accumulated.
    3587         1621 :         let threshold = self.get_compaction_threshold();
    3588         1621 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
    3589            0 :             debug!(
    3590            0 :                 level0_deltas = level0_deltas.len(),
    3591            0 :                 threshold, "too few deltas to compact"
    3592            0 :             );
    3593         1324 :             return Ok(CompactLevel0Phase1Result::default());
    3594          297 :         }
    3595          297 : 
    3596          297 :         // This failpoint is used together with `test_duplicate_layers` integration test.
    3597          297 :         // It returns the compaction result exactly the same layers as input to compaction.
    3598          297 :         // We want to ensure that this will not cause any problem when updating the layer map
    3599          297 :         // after the compaction is finished.
    3600          297 :         //
    3601          297 :         // Currently, there are two rare edge cases that will cause duplicated layers being
    3602          297 :         // inserted.
    3603          297 :         // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
    3604          297 :         //    is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
    3605          297 :         //    map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
    3606          297 :         //    point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
    3607          297 :         //    and this causes an overwrite. This is acceptable because the content is the same, and we should do a
    3608          297 :         //    layer replace instead of the normal remove / upload process.
    3609          297 :         // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
    3610          297 :         //    size length. Compaction will likely create the same set of n files afterwards.
    3611          297 :         //
    3612          297 :         // This failpoint is a superset of both of the cases.
    3613          297 :         if cfg!(feature = "testing") {
    3614          297 :             let active = (|| {
    3615          297 :                 ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
    3616          297 :                 false
    3617          297 :             })();
    3618          297 : 
    3619          297 :             if active {
    3620            2 :                 let mut new_layers = Vec::with_capacity(level0_deltas.len());
    3621           30 :                 for delta in &level0_deltas {
    3622              :                     // we are just faking these layers as being produced again for this failpoint
    3623           28 :                     new_layers.push(
    3624           28 :                         delta
    3625           28 :                             .download_and_keep_resident()
    3626            0 :                             .await
    3627           28 :                             .context("download layer for failpoint")?,
    3628              :                     );
    3629              :                 }
    3630            2 :                 tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
    3631            2 :                 return Ok(CompactLevel0Phase1Result {
    3632            2 :                     new_layers,
    3633            2 :                     deltas_to_compact: level0_deltas,
    3634            2 :                 });
    3635          295 :             }
    3636            0 :         }
    3637              : 
    3638              :         // Gather the files to compact in this iteration.
    3639              :         //
    3640              :         // Start with the oldest Level 0 delta file, and collect any other
    3641              :         // level 0 files that form a contiguous sequence, such that the end
    3642              :         // LSN of previous file matches the start LSN of the next file.
    3643              :         //
    3644              :         // Note that if the files don't form such a sequence, we might
    3645              :         // "compact" just a single file. That's a bit pointless, but it allows
    3646              :         // us to get rid of the level 0 file, and compact the other files on
    3647              :         // the next iteration. This could probably made smarter, but such
    3648              :         // "gaps" in the sequence of level 0 files should only happen in case
    3649              :         // of a crash, partial download from cloud storage, or something like
    3650              :         // that, so it's not a big deal in practice.
    3651         7924 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
    3652          295 :         let mut level0_deltas_iter = level0_deltas.iter();
    3653          295 : 
    3654          295 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
    3655          295 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
    3656          295 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
    3657          295 : 
    3658          295 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
    3659         4104 :         for l in level0_deltas_iter {
    3660         3809 :             let lsn_range = &l.layer_desc().lsn_range;
    3661         3809 : 
    3662         3809 :             if lsn_range.start != prev_lsn_end {
    3663            0 :                 break;
    3664         3809 :             }
    3665         3809 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
    3666         3809 :             prev_lsn_end = lsn_range.end;
    3667              :         }
    3668          295 :         let lsn_range = Range {
    3669          295 :             start: deltas_to_compact
    3670          295 :                 .first()
    3671          295 :                 .unwrap()
    3672          295 :                 .layer_desc()
    3673          295 :                 .lsn_range
    3674          295 :                 .start,
    3675          295 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
    3676          295 :         };
    3677              : 
    3678          295 :         info!(
    3679          295 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
    3680          295 :             lsn_range.start,
    3681          295 :             lsn_range.end,
    3682          295 :             deltas_to_compact.len(),
    3683          295 :             level0_deltas.len()
    3684          295 :         );
    3685              : 
    3686         4104 :         for l in deltas_to_compact.iter() {
    3687         4104 :             info!("compact includes {l}");
    3688              :         }
    3689              : 
    3690              :         // We don't need the original list of layers anymore. Drop it so that
    3691              :         // we don't accidentally use it later in the function.
    3692          295 :         drop(level0_deltas);
    3693          295 : 
    3694          295 :         stats.read_lock_held_prerequisites_micros = stats
    3695          295 :             .read_lock_held_spawn_blocking_startup_micros
    3696          295 :             .till_now();
    3697          295 : 
    3698          295 :         // Determine N largest holes where N is number of compacted layers.
    3699          295 :         let max_holes = deltas_to_compact.len();
    3700          295 :         let last_record_lsn = self.get_last_record_lsn();
    3701          295 :         let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
    3702          295 :         let min_hole_coverage_size = 3; // TODO: something more flexible?
    3703          295 : 
    3704          295 :         // min-heap (reserve space for one more element added before eviction)
    3705          295 :         let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
    3706          295 :         let mut prev: Option<Key> = None;
    3707          295 : 
    3708          295 :         let mut all_keys = Vec::new();
    3709              : 
    3710         4104 :         for l in deltas_to_compact.iter() {
    3711         4104 :             all_keys.extend(l.load_keys(ctx).await?);
    3712              :         }
    3713              : 
    3714              :         // FIXME: should spawn_blocking the rest of this function
    3715              : 
    3716              :         // The current stdlib sorting implementation is designed in a way where it is
    3717              :         // particularly fast where the slice is made up of sorted sub-ranges.
    3718    155312530 :         all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    3719          295 : 
    3720          295 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
    3721              : 
    3722     17300095 :         for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
    3723     17300095 :             if let Some(prev_key) = prev {
    3724              :                 // just first fast filter
    3725     17299800 :                 if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
    3726       201573 :                     let key_range = prev_key..next_key;
    3727       201573 :                     // Measuring hole by just subtraction of i128 representation of key range boundaries
    3728       201573 :                     // has not so much sense, because largest holes will corresponds field1/field2 changes.
    3729       201573 :                     // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
    3730       201573 :                     // That is why it is better to measure size of hole as number of covering image layers.
    3731       201573 :                     let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
    3732       201573 :                     if coverage_size >= min_hole_coverage_size {
    3733          235 :                         heap.push(Hole {
    3734          235 :                             key_range,
    3735          235 :                             coverage_size,
    3736          235 :                         });
    3737          235 :                         if heap.len() > max_holes {
    3738          132 :                             heap.pop(); // remove smallest hole
    3739          132 :                         }
    3740       201338 :                     }
    3741     17098227 :                 }
    3742          295 :             }
    3743     17300095 :             prev = Some(next_key.next());
    3744              :         }
    3745          295 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
    3746          295 :         drop_rlock(guard);
    3747          295 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
    3748          295 :         let mut holes = heap.into_vec();
    3749          295 :         holes.sort_unstable_by_key(|hole| hole.key_range.start);
    3750          295 :         let mut next_hole = 0; // index of next hole in holes vector
    3751          295 : 
    3752          295 :         // This iterator walks through all key-value pairs from all the layers
    3753          295 :         // we're compacting, in key, LSN order.
    3754          295 :         let all_values_iter = all_keys.iter();
    3755          295 : 
    3756          295 :         // This iterator walks through all keys and is needed to calculate size used by each key
    3757          295 :         let mut all_keys_iter = all_keys
    3758          295 :             .iter()
    3759     16960274 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
    3760     16959979 :             .coalesce(|mut prev, cur| {
    3761     16959979 :                 // Coalesce keys that belong to the same key pair.
    3762     16959979 :                 // This ensures that compaction doesn't put them
    3763     16959979 :                 // into different layer files.
    3764     16959979 :                 // Still limit this by the target file size,
    3765     16959979 :                 // so that we keep the size of the files in
    3766     16959979 :                 // check.
    3767     16959979 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
    3768     14778956 :                     prev.2 += cur.2;
    3769     14778956 :                     Ok(prev)
    3770              :                 } else {
    3771      2181023 :                     Err((prev, cur))
    3772              :                 }
    3773     16959979 :             });
    3774          295 : 
    3775          295 :         // Merge the contents of all the input delta layers into a new set
    3776          295 :         // of delta layers, based on the current partitioning.
    3777          295 :         //
    3778          295 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
    3779          295 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
    3780          295 :         // would be too large. In that case, we also split on the LSN dimension.
    3781          295 :         //
    3782          295 :         // LSN
    3783          295 :         //  ^
    3784          295 :         //  |
    3785          295 :         //  | +-----------+            +--+--+--+--+
    3786          295 :         //  | |           |            |  |  |  |  |
    3787          295 :         //  | +-----------+            |  |  |  |  |
    3788          295 :         //  | |           |            |  |  |  |  |
    3789          295 :         //  | +-----------+     ==>    |  |  |  |  |
    3790          295 :         //  | |           |            |  |  |  |  |
    3791          295 :         //  | +-----------+            |  |  |  |  |
    3792          295 :         //  | |           |            |  |  |  |  |
    3793          295 :         //  | +-----------+            +--+--+--+--+
    3794          295 :         //  |
    3795          295 :         //  +--------------> key
    3796          295 :         //
    3797          295 :         //
    3798          295 :         // If one key (X) has a lot of page versions:
    3799          295 :         //
    3800          295 :         // LSN
    3801          295 :         //  ^
    3802          295 :         //  |                                 (X)
    3803          295 :         //  | +-----------+            +--+--+--+--+
    3804          295 :         //  | |           |            |  |  |  |  |
    3805          295 :         //  | +-----------+            |  |  +--+  |
    3806          295 :         //  | |           |            |  |  |  |  |
    3807          295 :         //  | +-----------+     ==>    |  |  |  |  |
    3808          295 :         //  | |           |            |  |  +--+  |
    3809          295 :         //  | +-----------+            |  |  |  |  |
    3810          295 :         //  | |           |            |  |  |  |  |
    3811          295 :         //  | +-----------+            +--+--+--+--+
    3812          295 :         //  |
    3813          295 :         //  +--------------> key
    3814          295 :         // TODO: this actually divides the layers into fixed-size chunks, not
    3815          295 :         // based on the partitioning.
    3816          295 :         //
    3817          295 :         // TODO: we should also opportunistically materialize and
    3818          295 :         // garbage collect what we can.
    3819          295 :         let mut new_layers = Vec::new();
    3820          295 :         let mut prev_key: Option<Key> = None;
    3821          295 :         let mut writer: Option<DeltaLayerWriter> = None;
    3822          295 :         let mut key_values_total_size = 0u64;
    3823          295 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
    3824          295 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
    3825              : 
    3826              :         for &DeltaEntry {
    3827     16960269 :             key, lsn, ref val, ..
    3828     16960562 :         } in all_values_iter
    3829              :         {
    3830     16960269 :             let value = val.load(ctx).await?;
    3831     16960268 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
    3832     16960268 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    3833     16960268 :             if !same_key || lsn == dup_end_lsn {
    3834      2181314 :                 let mut next_key_size = 0u64;
    3835      2181314 :                 let is_dup_layer = dup_end_lsn.is_valid();
    3836      2181314 :                 dup_start_lsn = Lsn::INVALID;
    3837      2181314 :                 if !same_key {
    3838      2181277 :                     dup_end_lsn = Lsn::INVALID;
    3839      2181277 :                 }
    3840              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    3841      2181316 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    3842      2181316 :                     next_key_size = next_size;
    3843      2181316 :                     if key != next_key {
    3844      2180984 :                         if dup_end_lsn.is_valid() {
    3845           12 :                             // We are writting segment with duplicates:
    3846           12 :                             // place all remaining values of this key in separate segment
    3847           12 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    3848           12 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    3849      2180972 :                         }
    3850      2180984 :                         break;
    3851          332 :                     }
    3852          332 :                     key_values_total_size += next_size;
    3853          332 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    3854          332 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    3855          332 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    3856              :                         // Split key between multiple layers: such layer can contain only single key
    3857           37 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    3858           25 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    3859              :                         } else {
    3860           12 :                             lsn // start with the first LSN for this key
    3861              :                         };
    3862           37 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    3863           37 :                         break;
    3864          295 :                     }
    3865              :                 }
    3866              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    3867      2181314 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    3868            0 :                     dup_start_lsn = dup_end_lsn;
    3869            0 :                     dup_end_lsn = lsn_range.end;
    3870      2181314 :                 }
    3871      2181314 :                 if writer.is_some() {
    3872      2181019 :                     let written_size = writer.as_mut().unwrap().size();
    3873      2181019 :                     let contains_hole =
    3874      2181019 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    3875              :                     // check if key cause layer overflow or contains hole...
    3876      2181019 :                     if is_dup_layer
    3877      2180970 :                         || dup_end_lsn.is_valid()
    3878      2180960 :                         || written_size + key_values_total_size > target_file_size
    3879      2170682 :                         || contains_hole
    3880              :                     {
    3881              :                         // ... if so, flush previous layer and prepare to write new one
    3882        10438 :                         new_layers.push(
    3883        10438 :                             writer
    3884        10438 :                                 .take()
    3885        10438 :                                 .unwrap()
    3886        10438 :                                 .finish(prev_key.unwrap().next(), self)
    3887         1284 :                                 .await?,
    3888              :                         );
    3889        10438 :                         writer = None;
    3890        10438 : 
    3891        10438 :                         if contains_hole {
    3892          103 :                             // skip hole
    3893          103 :                             next_hole += 1;
    3894        10335 :                         }
    3895      2170581 :                     }
    3896          295 :                 }
    3897              :                 // Remember size of key value because at next iteration we will access next item
    3898      2181314 :                 key_values_total_size = next_key_size;
    3899     14778954 :             }
    3900     16960268 :             if writer.is_none() {
    3901        10733 :                 // Create writer if not initiaized yet
    3902        10733 :                 writer = Some(
    3903              :                     DeltaLayerWriter::new(
    3904        10733 :                         self.conf,
    3905        10733 :                         self.timeline_id,
    3906        10733 :                         self.tenant_shard_id,
    3907        10733 :                         key,
    3908        10733 :                         if dup_end_lsn.is_valid() {
    3909              :                             // this is a layer containing slice of values of the same key
    3910            0 :                             debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    3911           49 :                             dup_start_lsn..dup_end_lsn
    3912              :                         } else {
    3913            0 :                             debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    3914        10684 :                             lsn_range.clone()
    3915              :                         },
    3916              :                     )
    3917           10 :                     .await?,
    3918              :                 );
    3919     16949535 :             }
    3920              : 
    3921     16960268 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    3922            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    3923            0 :                     "failpoint delta-layer-writer-fail-before-finish"
    3924            0 :                 )))
    3925     16960268 :             });
    3926              : 
    3927     16960268 :             if !self.shard_identity.is_key_disposable(&key) {
    3928     16960268 :                 writer.as_mut().unwrap().put_value(key, lsn, value).await?;
    3929              :             } else {
    3930            0 :                 debug!(
    3931            0 :                     "Dropping key {} during compaction (it belongs on shard {:?})",
    3932            0 :                     key,
    3933            0 :                     self.shard_identity.get_shard_number(&key)
    3934            0 :                 );
    3935              :             }
    3936              : 
    3937     16960267 :             if !new_layers.is_empty() {
    3938     13666937 :                 fail_point!("after-timeline-compacted-first-L1");
    3939     13666937 :             }
    3940              : 
    3941     16960267 :             prev_key = Some(key);
    3942              :         }
    3943          293 :         if let Some(writer) = writer {
    3944          293 :             new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
    3945            0 :         }
    3946              : 
    3947              :         // Sync layers
    3948          293 :         if !new_layers.is_empty() {
    3949              :             // Print a warning if the created layer is larger than double the target size
    3950              :             // Add two pages for potential overhead. This should in theory be already
    3951              :             // accounted for in the target calculation, but for very small targets,
    3952              :             // we still might easily hit the limit otherwise.
    3953          293 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    3954        10726 :             for layer in new_layers.iter() {
    3955        10726 :                 if layer.layer_desc().file_size > warn_limit {
    3956            0 :                     warn!(
    3957            0 :                         %layer,
    3958            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    3959            0 :                     );
    3960        10726 :                 }
    3961              :             }
    3962              : 
    3963              :             // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
    3964          293 :             let layer_paths: Vec<Utf8PathBuf> = new_layers
    3965          293 :                 .iter()
    3966        10726 :                 .map(|l| l.local_path().to_owned())
    3967          293 :                 .collect();
    3968          293 : 
    3969          293 :             // Fsync all the layer files and directory using multiple threads to
    3970          293 :             // minimize latency.
    3971          293 :             par_fsync::par_fsync_async(&layer_paths)
    3972          516 :                 .await
    3973          293 :                 .context("fsync all new layers")?;
    3974              : 
    3975          293 :             let timeline_dir = self
    3976          293 :                 .conf
    3977          293 :                 .timeline_path(&self.tenant_shard_id, &self.timeline_id);
    3978          293 : 
    3979          293 :             par_fsync::par_fsync_async(&[timeline_dir])
    3980          321 :                 .await
    3981          293 :                 .context("fsync of timeline dir")?;
    3982            0 :         }
    3983              : 
    3984          293 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    3985          293 :         stats.new_deltas_count = Some(new_layers.len());
    3986        10726 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    3987          293 : 
    3988          293 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    3989          293 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    3990              :         {
    3991          293 :             Ok(stats_json) => {
    3992          293 :                 info!(
    3993          293 :                     stats_json = stats_json.as_str(),
    3994          293 :                     "compact_level0_phase1 stats available"
    3995          293 :                 )
    3996              :             }
    3997            0 :             Err(e) => {
    3998            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    3999              :             }
    4000              :         }
    4001              : 
    4002          293 :         Ok(CompactLevel0Phase1Result {
    4003          293 :             new_layers,
    4004          293 :             deltas_to_compact: deltas_to_compact
    4005          293 :                 .into_iter()
    4006         4077 :                 .map(|x| x.drop_eviction_guard())
    4007          293 :                 .collect::<Vec<_>>(),
    4008          293 :         })
    4009         1619 :     }
    4010              : 
    4011              :     ///
    4012              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
    4013              :     /// as Level 1 files.
    4014              :     ///
    4015         1621 :     async fn compact_level0(
    4016         1621 :         self: &Arc<Self>,
    4017         1621 :         target_file_size: u64,
    4018         1621 :         ctx: &RequestContext,
    4019         1621 :     ) -> Result<(), CompactionError> {
    4020              :         let CompactLevel0Phase1Result {
    4021         1619 :             new_layers,
    4022         1619 :             deltas_to_compact,
    4023              :         } = {
    4024         1621 :             let phase1_span = info_span!("compact_level0_phase1");
    4025         1621 :             let ctx = ctx.attached_child();
    4026         1621 :             let mut stats = CompactLevel0Phase1StatsBuilder {
    4027         1621 :                 version: Some(2),
    4028         1621 :                 tenant_id: Some(self.tenant_shard_id),
    4029         1621 :                 timeline_id: Some(self.timeline_id),
    4030         1621 :                 ..Default::default()
    4031         1621 :             };
    4032         1621 : 
    4033         1621 :             let begin = tokio::time::Instant::now();
    4034         1621 :             let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
    4035         1621 :             let now = tokio::time::Instant::now();
    4036         1621 :             stats.read_lock_acquisition_micros =
    4037         1621 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
    4038         1621 :             self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
    4039         1621 :                 .instrument(phase1_span)
    4040       314557 :                 .await?
    4041              :         };
    4042              : 
    4043         1619 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
    4044              :             // nothing to do
    4045         1324 :             return Ok(());
    4046          295 :         }
    4047              : 
    4048          295 :         let mut guard = self.layers.write().await;
    4049              : 
    4050          295 :         let mut duplicated_layers = HashSet::new();
    4051          295 : 
    4052          295 :         let mut insert_layers = Vec::with_capacity(new_layers.len());
    4053              : 
    4054        11049 :         for l in &new_layers {
    4055        10754 :             if guard.contains(l.as_ref()) {
    4056              :                 // expected in tests
    4057           28 :                 tracing::error!(layer=%l, "duplicated L1 layer");
    4058              : 
    4059              :                 // good ways to cause a duplicate: we repeatedly error after taking the writelock
    4060              :                 // `guard`  on self.layers. as of writing this, there are no error returns except
    4061              :                 // for compact_level0_phase1 creating an L0, which does not happen in practice
    4062              :                 // because we have not implemented L0 => L0 compaction.
    4063           28 :                 duplicated_layers.insert(l.layer_desc().key());
    4064        10726 :             } else if LayerMap::is_l0(l.layer_desc()) {
    4065            0 :                 return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
    4066        10726 :             } else {
    4067        10726 :                 insert_layers.push(l.clone());
    4068        10726 :             }
    4069              :         }
    4070              : 
    4071          295 :         let remove_layers = {
    4072          295 :             let mut deltas_to_compact = deltas_to_compact;
    4073          295 :             // only remove those inputs which were not outputs
    4074         4105 :             deltas_to_compact.retain(|l| !duplicated_layers.contains(&l.layer_desc().key()));
    4075          295 :             deltas_to_compact
    4076          295 :         };
    4077          295 : 
    4078          295 :         // deletion will happen later, the layer file manager calls garbage_collect_on_drop
    4079          295 :         guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
    4080              : 
    4081          295 :         if let Some(remote_client) = self.remote_client.as_ref() {
    4082          295 :             remote_client.schedule_compaction_update(&remove_layers, &new_layers)?;
    4083            0 :         }
    4084              : 
    4085          295 :         drop_wlock(guard);
    4086          295 : 
    4087          295 :         Ok(())
    4088         1619 :     }
    4089              : 
    4090              :     /// Update information about which layer files need to be retained on
    4091              :     /// garbage collection. This is separate from actually performing the GC,
    4092              :     /// and is updated more frequently, so that compaction can remove obsolete
    4093              :     /// page versions more aggressively.
    4094              :     ///
    4095              :     /// TODO: that's wishful thinking, compaction doesn't actually do that
    4096              :     /// currently.
    4097              :     ///
    4098              :     /// The caller specifies how much history is needed with the 3 arguments:
    4099              :     ///
    4100              :     /// retain_lsns: keep a version of each page at these LSNs
    4101              :     /// cutoff_horizon: also keep everything newer than this LSN
    4102              :     /// pitr: the time duration required to keep data for PITR
    4103              :     ///
    4104              :     /// The 'retain_lsns' list is currently used to prevent removing files that
    4105              :     /// are needed by child timelines. In the future, the user might be able to
    4106              :     /// name additional points in time to retain. The caller is responsible for
    4107              :     /// collecting that information.
    4108              :     ///
    4109              :     /// The 'cutoff_horizon' point is used to retain recent versions that might still be
    4110              :     /// needed by read-only nodes. (As of this writing, the caller just passes
    4111              :     /// the latest LSN subtracted by a constant, and doesn't do anything smart
    4112              :     /// to figure out what read-only nodes might actually need.)
    4113              :     ///
    4114              :     /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
    4115              :     /// whether a record is needed for PITR.
    4116              :     ///
    4117              :     /// NOTE: This function holds a short-lived lock to protect the 'gc_info'
    4118              :     /// field, so that the three values passed as argument are stored
    4119              :     /// atomically. But the caller is responsible for ensuring that no new
    4120              :     /// branches are created that would need to be included in 'retain_lsns',
    4121              :     /// for example. The caller should hold `Tenant::gc_cs` lock to ensure
    4122              :     /// that.
    4123              :     ///
    4124         1678 :     #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
    4125              :     pub(super) async fn update_gc_info(
    4126              :         &self,
    4127              :         retain_lsns: Vec<Lsn>,
    4128              :         cutoff_horizon: Lsn,
    4129              :         pitr: Duration,
    4130              :         cancel: &CancellationToken,
    4131              :         ctx: &RequestContext,
    4132              :     ) -> anyhow::Result<()> {
    4133              :         // First, calculate pitr_cutoff_timestamp and then convert it to LSN.
    4134              :         //
    4135              :         // Some unit tests depend on garbage-collection working even when
    4136              :         // CLOG data is missing, so that find_lsn_for_timestamp() doesn't
    4137              :         // work, so avoid calling it altogether if time-based retention is not
    4138              :         // configured. It would be pointless anyway.
    4139              :         let pitr_cutoff = if pitr != Duration::ZERO {
    4140              :             let now = SystemTime::now();
    4141              :             if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
    4142              :                 let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
    4143              : 
    4144              :                 match self
    4145              :                     .find_lsn_for_timestamp(pitr_timestamp, cancel, ctx)
    4146              :                     .await?
    4147              :                 {
    4148              :                     LsnForTimestamp::Present(lsn) => lsn,
    4149              :                     LsnForTimestamp::Future(lsn) => {
    4150              :                         // The timestamp is in the future. That sounds impossible,
    4151              :                         // but what it really means is that there hasn't been
    4152              :                         // any commits since the cutoff timestamp.
    4153            0 :                         debug!("future({})", lsn);
    4154              :                         cutoff_horizon
    4155              :                     }
    4156              :                     LsnForTimestamp::Past(lsn) => {
    4157            0 :                         debug!("past({})", lsn);
    4158              :                         // conservative, safe default is to remove nothing, when we
    4159              :                         // have no commit timestamp data available
    4160              :                         *self.get_latest_gc_cutoff_lsn()
    4161              :                     }
    4162              :                     LsnForTimestamp::NoData(lsn) => {
    4163            0 :                         debug!("nodata({})", lsn);
    4164              :                         // conservative, safe default is to remove nothing, when we
    4165              :                         // have no commit timestamp data available
    4166              :                         *self.get_latest_gc_cutoff_lsn()
    4167              :                     }
    4168              :                 }
    4169              :             } else {
    4170              :                 // If we don't have enough data to convert to LSN,
    4171              :                 // play safe and don't remove any layers.
    4172              :                 *self.get_latest_gc_cutoff_lsn()
    4173              :             }
    4174              :         } else {
    4175              :             // No time-based retention was configured. Set time-based cutoff to
    4176              :             // same as LSN based.
    4177              :             cutoff_horizon
    4178              :         };
    4179              : 
    4180              :         // Grab the lock and update the values
    4181              :         *self.gc_info.write().unwrap() = GcInfo {
    4182              :             retain_lsns,
    4183              :             horizon_cutoff: cutoff_horizon,
    4184              :             pitr_cutoff,
    4185              :         };
    4186              : 
    4187              :         Ok(())
    4188              :     }
    4189              : 
    4190              :     /// Garbage collect layer files on a timeline that are no longer needed.
    4191              :     ///
    4192              :     /// Currently, we don't make any attempt at removing unneeded page versions
    4193              :     /// within a layer file. We can only remove the whole file if it's fully
    4194              :     /// obsolete.
    4195          776 :     pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
    4196          776 :         // this is most likely the background tasks, but it might be the spawned task from
    4197          776 :         // immediate_gc
    4198          776 :         let cancel = crate::task_mgr::shutdown_token();
    4199          776 :         let _g = tokio::select! {
    4200          776 :             guard = self.gc_lock.lock() => guard,
    4201              :             _ = self.cancel.cancelled() => return Ok(GcResult::default()),
    4202              :             _ = cancel.cancelled() => return Ok(GcResult::default()),
    4203              :         };
    4204          776 :         let timer = self.metrics.garbage_collect_histo.start_timer();
    4205              : 
    4206            0 :         fail_point!("before-timeline-gc");
    4207              : 
    4208              :         // Is the timeline being deleted?
    4209          776 :         if self.is_stopping() {
    4210            0 :             anyhow::bail!("timeline is Stopping");
    4211          776 :         }
    4212          776 : 
    4213          776 :         let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
    4214          776 :             let gc_info = self.gc_info.read().unwrap();
    4215          776 : 
    4216          776 :             let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
    4217          776 :             let pitr_cutoff = gc_info.pitr_cutoff;
    4218          776 :             let retain_lsns = gc_info.retain_lsns.clone();
    4219          776 :             (horizon_cutoff, pitr_cutoff, retain_lsns)
    4220          776 :         };
    4221          776 : 
    4222          776 :         let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
    4223              : 
    4224          776 :         let res = self
    4225          776 :             .gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
    4226          776 :             .instrument(
    4227          776 :                 info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
    4228              :             )
    4229          207 :             .await?;
    4230              : 
    4231              :         // only record successes
    4232          776 :         timer.stop_and_record();
    4233          776 : 
    4234          776 :         Ok(res)
    4235          776 :     }
    4236              : 
    4237          776 :     async fn gc_timeline(
    4238          776 :         &self,
    4239          776 :         horizon_cutoff: Lsn,
    4240          776 :         pitr_cutoff: Lsn,
    4241          776 :         retain_lsns: Vec<Lsn>,
    4242          776 :         new_gc_cutoff: Lsn,
    4243          776 :     ) -> anyhow::Result<GcResult> {
    4244          776 :         let now = SystemTime::now();
    4245          776 :         let mut result: GcResult = GcResult::default();
    4246          776 : 
    4247          776 :         // Nothing to GC. Return early.
    4248          776 :         let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
    4249          776 :         if latest_gc_cutoff >= new_gc_cutoff {
    4250           95 :             info!(
    4251           95 :                 "Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
    4252           95 :             );
    4253           95 :             return Ok(result);
    4254          681 :         }
    4255              : 
    4256              :         // We need to ensure that no one tries to read page versions or create
    4257              :         // branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
    4258              :         // for details. This will block until the old value is no longer in use.
    4259              :         //
    4260              :         // The GC cutoff should only ever move forwards.
    4261          681 :         let waitlist = {
    4262          681 :             let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
    4263          681 :             ensure!(
    4264          681 :                 *write_guard <= new_gc_cutoff,
    4265            0 :                 "Cannot move GC cutoff LSN backwards (was {}, new {})",
    4266            0 :                 *write_guard,
    4267              :                 new_gc_cutoff
    4268              :             );
    4269          681 :             write_guard.store_and_unlock(new_gc_cutoff)
    4270          681 :         };
    4271          681 :         waitlist.wait().await;
    4272              : 
    4273          681 :         info!("GC starting");
    4274              : 
    4275            0 :         debug!("retain_lsns: {:?}", retain_lsns);
    4276              : 
    4277          681 :         let mut layers_to_remove = Vec::new();
    4278          681 :         let mut wanted_image_layers = KeySpaceRandomAccum::default();
    4279              : 
    4280              :         // Scan all layers in the timeline (remote or on-disk).
    4281              :         //
    4282              :         // Garbage collect the layer if all conditions are satisfied:
    4283              :         // 1. it is older than cutoff LSN;
    4284              :         // 2. it is older than PITR interval;
    4285              :         // 3. it doesn't need to be retained for 'retain_lsns';
    4286              :         // 4. newer on-disk image layers cover the layer's whole key range
    4287              :         //
    4288              :         // TODO holding a write lock is too agressive and avoidable
    4289          681 :         let mut guard = self.layers.write().await;
    4290          681 :         let layers = guard.layer_map();
    4291        16763 :         'outer: for l in layers.iter_historic_layers() {
    4292        16763 :             result.layers_total += 1;
    4293        16763 : 
    4294        16763 :             // 1. Is it newer than GC horizon cutoff point?
    4295        16763 :             if l.get_lsn_range().end > horizon_cutoff {
    4296            0 :                 debug!(
    4297            0 :                     "keeping {} because it's newer than horizon_cutoff {}",
    4298            0 :                     l.filename(),
    4299            0 :                     horizon_cutoff,
    4300            0 :                 );
    4301         2056 :                 result.layers_needed_by_cutoff += 1;
    4302         2056 :                 continue 'outer;
    4303        14707 :             }
    4304        14707 : 
    4305        14707 :             // 2. It is newer than PiTR cutoff point?
    4306        14707 :             if l.get_lsn_range().end > pitr_cutoff {
    4307            0 :                 debug!(
    4308            0 :                     "keeping {} because it's newer than pitr_cutoff {}",
    4309            0 :                     l.filename(),
    4310            0 :                     pitr_cutoff,
    4311            0 :                 );
    4312           59 :                 result.layers_needed_by_pitr += 1;
    4313           59 :                 continue 'outer;
    4314        14648 :             }
    4315              : 
    4316              :             // 3. Is it needed by a child branch?
    4317              :             // NOTE With that we would keep data that
    4318              :             // might be referenced by child branches forever.
    4319              :             // We can track this in child timeline GC and delete parent layers when
    4320              :             // they are no longer needed. This might be complicated with long inheritance chains.
    4321              :             //
    4322              :             // TODO Vec is not a great choice for `retain_lsns`
    4323        14669 :             for retain_lsn in &retain_lsns {
    4324              :                 // start_lsn is inclusive
    4325          608 :                 if &l.get_lsn_range().start <= retain_lsn {
    4326            0 :                     debug!(
    4327            0 :                         "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
    4328            0 :                         l.filename(),
    4329            0 :                         retain_lsn,
    4330            0 :                         l.is_incremental(),
    4331            0 :                     );
    4332          587 :                     result.layers_needed_by_branches += 1;
    4333          587 :                     continue 'outer;
    4334           21 :                 }
    4335              :             }
    4336              : 
    4337              :             // 4. Is there a later on-disk layer for this relation?
    4338              :             //
    4339              :             // The end-LSN is exclusive, while disk_consistent_lsn is
    4340              :             // inclusive. For example, if disk_consistent_lsn is 100, it is
    4341              :             // OK for a delta layer to have end LSN 101, but if the end LSN
    4342              :             // is 102, then it might not have been fully flushed to disk
    4343              :             // before crash.
    4344              :             //
    4345              :             // For example, imagine that the following layers exist:
    4346              :             //
    4347              :             // 1000      - image (A)
    4348              :             // 1000-2000 - delta (B)
    4349              :             // 2000      - image (C)
    4350              :             // 2000-3000 - delta (D)
    4351              :             // 3000      - image (E)
    4352              :             //
    4353              :             // If GC horizon is at 2500, we can remove layers A and B, but
    4354              :             // we cannot remove C, even though it's older than 2500, because
    4355              :             // the delta layer 2000-3000 depends on it.
    4356        14061 :             if !layers
    4357        14061 :                 .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
    4358              :             {
    4359            0 :                 debug!("keeping {} because it is the latest layer", l.filename());
    4360              :                 // Collect delta key ranges that need image layers to allow garbage
    4361              :                 // collecting the layers.
    4362              :                 // It is not so obvious whether we need to propagate information only about
    4363              :                 // delta layers. Image layers can form "stairs" preventing old image from been deleted.
    4364              :                 // But image layers are in any case less sparse than delta layers. Also we need some
    4365              :                 // protection from replacing recent image layers with new one after each GC iteration.
    4366        12981 :                 if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
    4367            0 :                     wanted_image_layers.add_range(l.get_key_range());
    4368        12981 :                 }
    4369        12981 :                 result.layers_not_updated += 1;
    4370        12981 :                 continue 'outer;
    4371         1080 :             }
    4372              : 
    4373              :             // We didn't find any reason to keep this file, so remove it.
    4374            0 :             debug!(
    4375            0 :                 "garbage collecting {} is_dropped: xx is_incremental: {}",
    4376            0 :                 l.filename(),
    4377            0 :                 l.is_incremental(),
    4378            0 :             );
    4379         1080 :             layers_to_remove.push(l);
    4380              :         }
    4381          681 :         self.wanted_image_layers
    4382          681 :             .lock()
    4383          681 :             .unwrap()
    4384          681 :             .replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
    4385          681 : 
    4386          681 :         if !layers_to_remove.is_empty() {
    4387              :             // Persist the new GC cutoff value in the metadata file, before
    4388              :             // we actually remove anything.
    4389              :             //
    4390              :             // This does not in fact have any effect as we no longer consider local metadata unless
    4391              :             // running without remote storage.
    4392              :             //
    4393              :             // This unconditionally schedules also an index_part.json update, even though, we will
    4394              :             // be doing one a bit later with the unlinked gc'd layers.
    4395              :             //
    4396              :             // TODO: remove when implementing <https://github.com/neondatabase/neon/issues/4099>.
    4397           20 :             self.update_metadata_file(self.disk_consistent_lsn.load(), None)
    4398           20 :                 .await?;
    4399              : 
    4400           20 :             let gc_layers = layers_to_remove
    4401           20 :                 .iter()
    4402         1080 :                 .map(|x| guard.get_from_desc(x))
    4403           20 :                 .collect::<Vec<Layer>>();
    4404           20 : 
    4405           20 :             result.layers_removed = gc_layers.len() as u64;
    4406              : 
    4407           20 :             if let Some(remote_client) = self.remote_client.as_ref() {
    4408           20 :                 remote_client.schedule_gc_update(&gc_layers)?;
    4409            0 :             }
    4410              : 
    4411           20 :             guard.finish_gc_timeline(&gc_layers);
    4412           20 : 
    4413           20 :             #[cfg(feature = "testing")]
    4414           20 :             {
    4415           20 :                 result.doomed_layers = gc_layers;
    4416           20 :             }
    4417          661 :         }
    4418              : 
    4419          681 :         info!(
    4420          681 :             "GC completed removing {} layers, cutoff {}",
    4421          681 :             result.layers_removed, new_gc_cutoff
    4422          681 :         );
    4423              : 
    4424          681 :         result.elapsed = now.elapsed()?;
    4425          681 :         Ok(result)
    4426          776 :     }
    4427              : 
    4428              :     /// Reconstruct a value, using the given base image and WAL records in 'data'.
    4429      7320969 :     async fn reconstruct_value(
    4430      7320969 :         &self,
    4431      7320969 :         key: Key,
    4432      7320969 :         request_lsn: Lsn,
    4433      7320969 :         mut data: ValueReconstructState,
    4434      7320978 :     ) -> Result<Bytes, PageReconstructError> {
    4435      7320978 :         // Perform WAL redo if needed
    4436      7320978 :         data.records.reverse();
    4437      7320978 : 
    4438      7320978 :         // If we have a page image, and no WAL, we're all set
    4439      7320978 :         if data.records.is_empty() {
    4440      5001176 :             if let Some((img_lsn, img)) = &data.img {
    4441            0 :                 trace!(
    4442            0 :                     "found page image for key {} at {}, no WAL redo required, req LSN {}",
    4443            0 :                     key,
    4444            0 :                     img_lsn,
    4445            0 :                     request_lsn,
    4446            0 :                 );
    4447      5001176 :                 Ok(img.clone())
    4448              :             } else {
    4449            0 :                 Err(PageReconstructError::from(anyhow!(
    4450            0 :                     "base image for {key} at {request_lsn} not found"
    4451            0 :                 )))
    4452              :             }
    4453              :         } else {
    4454              :             // We need to do WAL redo.
    4455              :             //
    4456              :             // If we don't have a base image, then the oldest WAL record better initialize
    4457              :             // the page
    4458      2319802 :             if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
    4459            0 :                 Err(PageReconstructError::from(anyhow!(
    4460            0 :                     "Base image for {} at {} not found, but got {} WAL records",
    4461            0 :                     key,
    4462            0 :                     request_lsn,
    4463            0 :                     data.records.len()
    4464            0 :                 )))
    4465              :             } else {
    4466      2319802 :                 if data.img.is_some() {
    4467            0 :                     trace!(
    4468            0 :                         "found {} WAL records and a base image for {} at {}, performing WAL redo",
    4469            0 :                         data.records.len(),
    4470            0 :                         key,
    4471            0 :                         request_lsn
    4472            0 :                     );
    4473              :                 } else {
    4474            0 :                     trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
    4475              :                 };
    4476              : 
    4477      2319802 :                 let last_rec_lsn = data.records.last().unwrap().0;
    4478              : 
    4479      2319802 :                 let img = match self
    4480      2319802 :                     .walredo_mgr
    4481      2319802 :                     .as_ref()
    4482      2319802 :                     .context("timeline has no walredo manager")
    4483      2319802 :                     .map_err(PageReconstructError::WalRedo)?
    4484      2319802 :                     .request_redo(key, request_lsn, data.img, data.records, self.pg_version)
    4485            0 :                     .await
    4486      2319801 :                     .context("reconstruct a page image")
    4487              :                 {
    4488      2319801 :                     Ok(img) => img,
    4489            0 :                     Err(e) => return Err(PageReconstructError::WalRedo(e)),
    4490              :                 };
    4491              : 
    4492      2319801 :                 if img.len() == page_cache::PAGE_SZ {
    4493      2316191 :                     let cache = page_cache::get();
    4494      2316191 :                     if let Err(e) = cache
    4495      2316191 :                         .memorize_materialized_page(
    4496      2316191 :                             self.tenant_shard_id,
    4497      2316191 :                             self.timeline_id,
    4498      2316191 :                             key,
    4499      2316191 :                             last_rec_lsn,
    4500      2316191 :                             &img,
    4501      2316191 :                         )
    4502         8804 :                         .await
    4503      2316191 :                         .context("Materialized page memoization failed")
    4504              :                     {
    4505            0 :                         return Err(PageReconstructError::from(e));
    4506      2316191 :                     }
    4507         3610 :                 }
    4508              : 
    4509      2319801 :                 Ok(img)
    4510              :             }
    4511              :         }
    4512      7320977 :     }
    4513              : 
    4514            2 :     pub(crate) async fn spawn_download_all_remote_layers(
    4515            2 :         self: Arc<Self>,
    4516            2 :         request: DownloadRemoteLayersTaskSpawnRequest,
    4517            2 :     ) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
    4518            2 :         use pageserver_api::models::DownloadRemoteLayersTaskState;
    4519            2 : 
    4520            2 :         // this is not really needed anymore; it has tests which really check the return value from
    4521            2 :         // http api. it would be better not to maintain this anymore.
    4522            2 : 
    4523            2 :         let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
    4524            2 :         if let Some(st) = &*status_guard {
    4525            1 :             match &st.state {
    4526              :                 DownloadRemoteLayersTaskState::Running => {
    4527            0 :                     return Err(st.clone());
    4528              :                 }
    4529              :                 DownloadRemoteLayersTaskState::ShutDown
    4530            1 :                 | DownloadRemoteLayersTaskState::Completed => {
    4531            1 :                     *status_guard = None;
    4532            1 :                 }
    4533              :             }
    4534            1 :         }
    4535              : 
    4536            2 :         let self_clone = Arc::clone(&self);
    4537            2 :         let task_id = task_mgr::spawn(
    4538            2 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    4539            2 :             task_mgr::TaskKind::DownloadAllRemoteLayers,
    4540            2 :             Some(self.tenant_shard_id),
    4541            2 :             Some(self.timeline_id),
    4542            2 :             "download all remote layers task",
    4543              :             false,
    4544            2 :             async move {
    4545           10 :                 self_clone.download_all_remote_layers(request).await;
    4546            2 :                 let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
    4547            2 :                  match &mut *status_guard {
    4548              :                     None => {
    4549            0 :                         warn!("tasks status is supposed to be Some(), since we are running");
    4550              :                     }
    4551            2 :                     Some(st) => {
    4552            2 :                         let exp_task_id = format!("{}", task_mgr::current_task_id().unwrap());
    4553            2 :                         if st.task_id != exp_task_id {
    4554            0 :                             warn!("task id changed while we were still running, expecting {} but have {}", exp_task_id, st.task_id);
    4555            2 :                         } else {
    4556            2 :                             st.state = DownloadRemoteLayersTaskState::Completed;
    4557            2 :                         }
    4558              :                     }
    4559              :                 };
    4560            2 :                 Ok(())
    4561            2 :             }
    4562            2 :             .instrument(info_span!(parent: None, "download_all_remote_layers", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
    4563              :         );
    4564              : 
    4565            2 :         let initial_info = DownloadRemoteLayersTaskInfo {
    4566            2 :             task_id: format!("{task_id}"),
    4567            2 :             state: DownloadRemoteLayersTaskState::Running,
    4568            2 :             total_layer_count: 0,
    4569            2 :             successful_download_count: 0,
    4570            2 :             failed_download_count: 0,
    4571            2 :         };
    4572            2 :         *status_guard = Some(initial_info.clone());
    4573            2 : 
    4574            2 :         Ok(initial_info)
    4575            2 :     }
    4576              : 
    4577            2 :     async fn download_all_remote_layers(
    4578            2 :         self: &Arc<Self>,
    4579            2 :         request: DownloadRemoteLayersTaskSpawnRequest,
    4580            2 :     ) {
    4581              :         use pageserver_api::models::DownloadRemoteLayersTaskState;
    4582              : 
    4583            2 :         let remaining = {
    4584            2 :             let guard = self.layers.read().await;
    4585            2 :             guard
    4586            2 :                 .layer_map()
    4587            2 :                 .iter_historic_layers()
    4588           10 :                 .map(|desc| guard.get_from_desc(&desc))
    4589            2 :                 .collect::<Vec<_>>()
    4590            2 :         };
    4591            2 :         let total_layer_count = remaining.len();
    4592            2 : 
    4593            2 :         macro_rules! lock_status {
    4594            2 :             ($st:ident) => {
    4595            2 :                 let mut st = self.download_all_remote_layers_task_info.write().unwrap();
    4596            2 :                 let st = st
    4597            2 :                     .as_mut()
    4598            2 :                     .expect("this function is only called after the task has been spawned");
    4599            2 :                 assert_eq!(
    4600            2 :                     st.task_id,
    4601            2 :                     format!(
    4602            2 :                         "{}",
    4603            2 :                         task_mgr::current_task_id().expect("we run inside a task_mgr task")
    4604            2 :                     )
    4605            2 :                 );
    4606            2 :                 let $st = st;
    4607            2 :             };
    4608            2 :         }
    4609            2 : 
    4610            2 :         {
    4611            2 :             lock_status!(st);
    4612            2 :             st.total_layer_count = total_layer_count as u64;
    4613            2 :         }
    4614            2 : 
    4615            2 :         let mut remaining = remaining.into_iter();
    4616            2 :         let mut have_remaining = true;
    4617            2 :         let mut js = tokio::task::JoinSet::new();
    4618            2 : 
    4619            2 :         let cancel = task_mgr::shutdown_token();
    4620            2 : 
    4621            2 :         let limit = request.max_concurrent_downloads;
    4622              : 
    4623              :         loop {
    4624           12 :             while js.len() < limit.get() && have_remaining && !cancel.is_cancelled() {
    4625           12 :                 let Some(next) = remaining.next() else {
    4626            2 :                     have_remaining = false;
    4627            2 :                     break;
    4628              :                 };
    4629              : 
    4630           10 :                 let span = tracing::info_span!("download", layer = %next);
    4631              : 
    4632           10 :                 js.spawn(
    4633           10 :                     async move {
    4634           26 :                         let res = next.download().await;
    4635           10 :                         (next, res)
    4636           10 :                     }
    4637           10 :                     .instrument(span),
    4638           10 :                 );
    4639              :             }
    4640              : 
    4641           12 :             while let Some(res) = js.join_next().await {
    4642           10 :                 match res {
    4643              :                     Ok((_, Ok(_))) => {
    4644            5 :                         lock_status!(st);
    4645            5 :                         st.successful_download_count += 1;
    4646              :                     }
    4647            5 :                     Ok((layer, Err(e))) => {
    4648            5 :                         tracing::error!(%layer, "download failed: {e:#}");
    4649            5 :                         lock_status!(st);
    4650            5 :                         st.failed_download_count += 1;
    4651              :                     }
    4652            0 :                     Err(je) if je.is_cancelled() => unreachable!("not used here"),
    4653            0 :                     Err(je) if je.is_panic() => {
    4654            0 :                         lock_status!(st);
    4655            0 :                         st.failed_download_count += 1;
    4656              :                     }
    4657            0 :                     Err(je) => tracing::warn!("unknown joinerror: {je:?}"),
    4658              :                 }
    4659              :             }
    4660              : 
    4661            2 :             if js.is_empty() && (!have_remaining || cancel.is_cancelled()) {
    4662            2 :                 break;
    4663            0 :             }
    4664              :         }
    4665              : 
    4666              :         {
    4667            2 :             lock_status!(st);
    4668            2 :             st.state = DownloadRemoteLayersTaskState::Completed;
    4669            2 :         }
    4670            2 :     }
    4671              : 
    4672           55 :     pub(crate) fn get_download_all_remote_layers_task_info(
    4673           55 :         &self,
    4674           55 :     ) -> Option<DownloadRemoteLayersTaskInfo> {
    4675           55 :         self.download_all_remote_layers_task_info
    4676           55 :             .read()
    4677           55 :             .unwrap()
    4678           55 :             .clone()
    4679           55 :     }
    4680              : }
    4681              : 
    4682              : impl Timeline {
    4683              :     /// Returns non-remote layers for eviction.
    4684           35 :     pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
    4685           35 :         let guard = self.layers.read().await;
    4686           35 :         let mut max_layer_size: Option<u64> = None;
    4687              : 
    4688           35 :         let resident_layers = guard
    4689           35 :             .resident_layers()
    4690          524 :             .map(|layer| {
    4691          524 :                 let file_size = layer.layer_desc().file_size;
    4692          524 :                 max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
    4693          524 : 
    4694          524 :                 let last_activity_ts = layer.access_stats().latest_activity_or_now();
    4695          524 : 
    4696          524 :                 EvictionCandidate {
    4697          524 :                     layer: layer.into(),
    4698          524 :                     last_activity_ts,
    4699          524 :                     relative_last_activity: finite_f32::FiniteF32::ZERO,
    4700          524 :                 }
    4701          524 :             })
    4702           35 :             .collect()
    4703            0 :             .await;
    4704              : 
    4705           35 :         DiskUsageEvictionInfo {
    4706           35 :             max_layer_size,
    4707           35 :             resident_layers,
    4708           35 :         }
    4709           35 :     }
    4710              : 
    4711        22735 :     pub(crate) fn get_shard_index(&self) -> ShardIndex {
    4712        22735 :         ShardIndex {
    4713        22735 :             shard_number: self.tenant_shard_id.shard_number,
    4714        22735 :             shard_count: self.tenant_shard_id.shard_count,
    4715        22735 :         }
    4716        22735 :     }
    4717              : }
    4718              : 
    4719              : type TraversalPathItem = (
    4720              :     ValueReconstructResult,
    4721              :     Lsn,
    4722              :     Box<dyn Send + FnOnce() -> TraversalId>,
    4723              : );
    4724              : 
    4725              : /// Helper function for get_reconstruct_data() to add the path of layers traversed
    4726              : /// to an error, as anyhow context information.
    4727         1275 : fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructError {
    4728         1275 :     // We want the original 'msg' to be the outermost context. The outermost context
    4729         1275 :     // is the most high-level information, which also gets propagated to the client.
    4730         1275 :     let mut msg_iter = path
    4731         1275 :         .into_iter()
    4732         1775 :         .map(|(r, c, l)| {
    4733         1775 :             format!(
    4734         1775 :                 "layer traversal: result {:?}, cont_lsn {}, layer: {}",
    4735         1775 :                 r,
    4736         1775 :                 c,
    4737         1775 :                 l(),
    4738         1775 :             )
    4739         1775 :         })
    4740         1275 :         .chain(std::iter::once(msg));
    4741         1275 :     // Construct initial message from the first traversed layer
    4742         1275 :     let err = anyhow!(msg_iter.next().unwrap());
    4743         1275 : 
    4744         1275 :     // Append all subsequent traversals, and the error message 'msg', as contexts.
    4745         1775 :     let msg = msg_iter.fold(err, |err, msg| err.context(msg));
    4746         1275 :     PageReconstructError::from(msg)
    4747         1275 : }
    4748              : 
    4749              : /// Various functions to mutate the timeline.
    4750              : // TODO Currently, Deref is used to allow easy access to read methods from this trait.
    4751              : // This is probably considered a bad practice in Rust and should be fixed eventually,
    4752              : // but will cause large code changes.
    4753              : pub(crate) struct TimelineWriter<'a> {
    4754              :     tl: &'a Timeline,
    4755              :     _write_guard: tokio::sync::MutexGuard<'a, ()>,
    4756              : }
    4757              : 
    4758              : impl Deref for TimelineWriter<'_> {
    4759              :     type Target = Timeline;
    4760              : 
    4761       730583 :     fn deref(&self) -> &Self::Target {
    4762       730583 :         self.tl
    4763       730583 :     }
    4764              : }
    4765              : 
    4766              : impl<'a> TimelineWriter<'a> {
    4767              :     /// Put a new page version that can be constructed from a WAL record
    4768              :     ///
    4769              :     /// This will implicitly extend the relation, if the page is beyond the
    4770              :     /// current end-of-file.
    4771      1214102 :     pub(crate) async fn put(
    4772      1214102 :         &self,
    4773      1214102 :         key: Key,
    4774      1214102 :         lsn: Lsn,
    4775      1214102 :         value: &Value,
    4776      1214102 :         ctx: &RequestContext,
    4777      1214102 :     ) -> anyhow::Result<()> {
    4778      1214102 :         self.tl.put_value(key, lsn, value, ctx).await
    4779      1214102 :     }
    4780              : 
    4781      1674304 :     pub(crate) async fn put_batch(
    4782      1674304 :         &self,
    4783      1674304 :         batch: &HashMap<Key, Vec<(Lsn, Value)>>,
    4784      1674304 :         ctx: &RequestContext,
    4785      1674304 :     ) -> anyhow::Result<()> {
    4786      1674304 :         self.tl.put_values(batch, ctx).await
    4787      1674303 :     }
    4788              : 
    4789        19261 :     pub(crate) async fn delete_batch(&self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
    4790        19261 :         self.tl.put_tombstones(batch).await
    4791        19261 :     }
    4792              : 
    4793              :     /// Track the end of the latest digested WAL record.
    4794              :     /// Remember the (end of) last valid WAL record remembered in the timeline.
    4795              :     ///
    4796              :     /// Call this after you have finished writing all the WAL up to 'lsn'.
    4797              :     ///
    4798              :     /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for
    4799              :     /// the 'lsn' or anything older. The previous last record LSN is stored alongside
    4800              :     /// the latest and can be read.
    4801     76004085 :     pub(crate) fn finish_write(&self, new_lsn: Lsn) {
    4802     76004085 :         self.tl.finish_write(new_lsn);
    4803     76004085 :     }
    4804              : 
    4805       640015 :     pub(crate) fn update_current_logical_size(&self, delta: i64) {
    4806       640015 :         self.tl.update_current_logical_size(delta)
    4807       640015 :     }
    4808              : }
    4809              : 
    4810              : // We need TimelineWriter to be send in upcoming conversion of
    4811              : // Timeline::layers to tokio::sync::RwLock.
    4812            2 : #[test]
    4813            2 : fn is_send() {
    4814            2 :     fn _assert_send<T: Send>() {}
    4815            2 :     _assert_send::<TimelineWriter<'_>>();
    4816            2 : }
    4817              : 
    4818              : /// Add a suffix to a layer file's name: .{num}.old
    4819              : /// Uses the first available num (starts at 0)
    4820            1 : fn rename_to_backup(path: &Utf8Path) -> anyhow::Result<()> {
    4821            1 :     let filename = path
    4822            1 :         .file_name()
    4823            1 :         .ok_or_else(|| anyhow!("Path {path} don't have a file name"))?;
    4824            1 :     let mut new_path = path.to_owned();
    4825              : 
    4826            1 :     for i in 0u32.. {
    4827            1 :         new_path.set_file_name(format!("{filename}.{i}.old"));
    4828            1 :         if !new_path.exists() {
    4829            1 :             std::fs::rename(path, &new_path)
    4830            1 :                 .with_context(|| format!("rename {path:?} to {new_path:?}"))?;
    4831            1 :             return Ok(());
    4832            0 :         }
    4833              :     }
    4834              : 
    4835            0 :     bail!("couldn't find an unused backup number for {:?}", path)
    4836            1 : }
    4837              : 
    4838              : #[cfg(test)]
    4839              : mod tests {
    4840              :     use utils::{id::TimelineId, lsn::Lsn};
    4841              : 
    4842              :     use crate::tenant::{
    4843              :         harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline,
    4844              :     };
    4845              : 
    4846            2 :     #[tokio::test]
    4847            2 :     async fn two_layer_eviction_attempts_at_the_same_time() {
    4848            2 :         let harness =
    4849            2 :             TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
    4850            2 : 
    4851            2 :         let ctx = any_context();
    4852            2 :         let tenant = harness.try_load(&ctx).await.unwrap();
    4853            2 :         let timeline = tenant
    4854            2 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
    4855            7 :             .await
    4856            2 :             .unwrap();
    4857            2 : 
    4858            2 :         let layer = find_some_layer(&timeline).await;
    4859            2 :         let layer = layer
    4860            2 :             .keep_resident()
    4861            2 :             .await
    4862            2 :             .expect("no download => no downloading errors")
    4863            2 :             .expect("should had been resident")
    4864            2 :             .drop_eviction_guard();
    4865            2 : 
    4866            2 :         let first = async { layer.evict_and_wait().await };
    4867            2 :         let second = async { layer.evict_and_wait().await };
    4868            2 : 
    4869            3 :         let (first, second) = tokio::join!(first, second);
    4870            2 : 
    4871            2 :         let res = layer.keep_resident().await;
    4872            2 :         assert!(matches!(res, Ok(None)), "{res:?}");
    4873            2 : 
    4874            2 :         match (first, second) {
    4875            2 :             (Ok(()), Ok(())) => {
    4876            0 :                 // because there are no more timeline locks being taken on eviction path, we can
    4877            0 :                 // witness all three outcomes here.
    4878            0 :             }
    4879            2 :             (Ok(()), Err(EvictionError::NotFound)) | (Err(EvictionError::NotFound), Ok(())) => {
    4880            2 :                 // if one completes before the other, this is fine just as well.
    4881            2 :             }
    4882            2 :             other => unreachable!("unexpected {:?}", other),
    4883            2 :         }
    4884            2 :     }
    4885              : 
    4886            2 :     fn any_context() -> crate::context::RequestContext {
    4887            2 :         use crate::context::*;
    4888            2 :         use crate::task_mgr::*;
    4889            2 :         RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
    4890            2 :     }
    4891              : 
    4892            2 :     async fn find_some_layer(timeline: &Timeline) -> Layer {
    4893            2 :         let layers = timeline.layers.read().await;
    4894            2 :         let desc = layers
    4895            2 :             .layer_map()
    4896            2 :             .iter_historic_layers()
    4897            2 :             .next()
    4898            2 :             .expect("must find one layer to evict");
    4899            2 : 
    4900            2 :         layers.get_from_desc(&desc)
    4901            2 :     }
    4902              : }
        

Generated by: LCOV version 2.1-beta