LCOV - code coverage report
Current view: top level - pageserver/src/tenant - timeline.rs (source / functions) Coverage Total Hit
Test: 190869232aac3a234374e5bb62582e91cf5f5818.info Lines: 60.8 % 3166 1926
Test Date: 2024-02-23 13:21:27 Functions: 44.0 % 445 196

            Line data    Source code
       1              : pub mod delete;
       2              : mod eviction_task;
       3              : mod init;
       4              : pub mod layer_manager;
       5              : pub(crate) mod logical_size;
       6              : pub mod span;
       7              : pub mod uninit;
       8              : mod walreceiver;
       9              : 
      10              : use anyhow::{anyhow, bail, ensure, Context, Result};
      11              : use bytes::Bytes;
      12              : use camino::{Utf8Path, Utf8PathBuf};
      13              : use enumset::EnumSet;
      14              : use fail::fail_point;
      15              : use futures::stream::StreamExt;
      16              : use itertools::Itertools;
      17              : use once_cell::sync::Lazy;
      18              : use pageserver_api::{
      19              :     keyspace::KeySpaceAccum,
      20              :     models::{
      21              :         DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
      22              :         LayerMapInfo, TimelineState,
      23              :     },
      24              :     reltag::BlockNumber,
      25              :     shard::{ShardIdentity, TenantShardId},
      26              : };
      27              : use rand::Rng;
      28              : use serde_with::serde_as;
      29              : use storage_broker::BrokerClientChannel;
      30              : use tokio::{
      31              :     runtime::Handle,
      32              :     sync::{oneshot, watch},
      33              : };
      34              : use tokio_util::sync::CancellationToken;
      35              : use tracing::*;
      36              : use utils::{bin_ser::BeSer, sync::gate::Gate};
      37              : 
      38              : use std::ops::{Deref, Range};
      39              : use std::pin::pin;
      40              : use std::sync::atomic::Ordering as AtomicOrdering;
      41              : use std::sync::{Arc, Mutex, RwLock, Weak};
      42              : use std::time::{Duration, Instant, SystemTime};
      43              : use std::{
      44              :     array,
      45              :     collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
      46              :     sync::atomic::AtomicU64,
      47              : };
      48              : use std::{
      49              :     cmp::{max, min, Ordering},
      50              :     ops::ControlFlow,
      51              : };
      52              : 
      53              : use crate::pgdatadir_mapping::DirectoryKind;
      54              : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
      55              : use crate::tenant::{
      56              :     layer_map::{LayerMap, SearchResult},
      57              :     metadata::{save_metadata, TimelineMetadata},
      58              :     par_fsync,
      59              : };
      60              : use crate::{
      61              :     context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder},
      62              :     disk_usage_eviction_task::DiskUsageEvictionInfo,
      63              : };
      64              : use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
      65              : use crate::{
      66              :     disk_usage_eviction_task::finite_f32,
      67              :     tenant::storage_layer::{
      68              :         AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
      69              :         LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
      70              :         ValueReconstructState, ValuesReconstructState,
      71              :     },
      72              : };
      73              : use crate::{
      74              :     disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
      75              : };
      76              : use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
      77              : 
      78              : use crate::config::PageServerConf;
      79              : use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
      80              : use crate::metrics::{
      81              :     TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
      82              : };
      83              : use crate::pgdatadir_mapping::CalculateLogicalSizeError;
      84              : use crate::tenant::config::TenantConfOpt;
      85              : use pageserver_api::key::{is_inherited_key, is_rel_fsm_block_key, is_rel_vm_block_key};
      86              : use pageserver_api::reltag::RelTag;
      87              : use pageserver_api::shard::ShardIndex;
      88              : 
      89              : use postgres_connection::PgConnectionConfig;
      90              : use postgres_ffi::to_pg_timestamp;
      91              : use utils::{
      92              :     completion,
      93              :     generation::Generation,
      94              :     id::TimelineId,
      95              :     lsn::{AtomicLsn, Lsn, RecordLsn},
      96              :     seqwait::SeqWait,
      97              :     simple_rcu::{Rcu, RcuReadGuard},
      98              : };
      99              : 
     100              : use crate::page_cache;
     101              : use crate::repository::GcResult;
     102              : use crate::repository::{Key, Value};
     103              : use crate::task_mgr;
     104              : use crate::task_mgr::TaskKind;
     105              : use crate::ZERO_PAGE;
     106              : 
     107              : use self::delete::DeleteTimelineFlow;
     108              : pub(super) use self::eviction_task::EvictionTaskTenantState;
     109              : use self::eviction_task::EvictionTaskTimelineState;
     110              : use self::layer_manager::LayerManager;
     111              : use self::logical_size::LogicalSize;
     112              : use self::walreceiver::{WalReceiver, WalReceiverConf};
     113              : 
     114              : use super::remote_timeline_client::RemoteTimelineClient;
     115              : use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
     116              : use super::{config::TenantConf, storage_layer::ReadableLayerDesc};
     117              : use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
     118              : use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
     119              : 
     120            2 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
     121              : pub(super) enum FlushLoopState {
     122              :     NotStarted,
     123              :     Running {
     124              :         #[cfg(test)]
     125              :         expect_initdb_optimization: bool,
     126              :         #[cfg(test)]
     127              :         initdb_optimization_count: usize,
     128              :     },
     129              :     Exited,
     130              : }
     131              : 
     132              : /// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
     133            0 : #[derive(Debug, Clone, PartialEq, Eq)]
     134              : pub(crate) struct Hole {
     135              :     key_range: Range<Key>,
     136              :     coverage_size: usize,
     137              : }
     138              : 
     139              : impl Ord for Hole {
     140            0 :     fn cmp(&self, other: &Self) -> Ordering {
     141            0 :         other.coverage_size.cmp(&self.coverage_size) // inverse order
     142            0 :     }
     143              : }
     144              : 
     145              : impl PartialOrd for Hole {
     146            0 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     147            0 :         Some(self.cmp(other))
     148            0 :     }
     149              : }
     150              : 
     151              : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
     152              : /// Can be removed after all refactors are done.
     153           30 : fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
     154           30 :     drop(rlock)
     155           30 : }
     156              : 
     157              : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
     158              : /// Can be removed after all refactors are done.
     159          508 : fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
     160          508 :     drop(rlock)
     161          508 : }
     162              : 
     163              : /// The outward-facing resources required to build a Timeline
     164              : pub struct TimelineResources {
     165              :     pub remote_client: Option<RemoteTimelineClient>,
     166              :     pub deletion_queue_client: DeletionQueueClient,
     167              :     pub timeline_get_throttle: Arc<
     168              :         crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
     169              :     >,
     170              : }
     171              : 
     172              : pub struct Timeline {
     173              :     conf: &'static PageServerConf,
     174              :     tenant_conf: Arc<RwLock<AttachedTenantConf>>,
     175              : 
     176              :     myself: Weak<Self>,
     177              : 
     178              :     pub(crate) tenant_shard_id: TenantShardId,
     179              :     pub timeline_id: TimelineId,
     180              : 
     181              :     /// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
     182              :     /// Never changes for the lifetime of this [`Timeline`] object.
     183              :     ///
     184              :     /// This duplicates the generation stored in LocationConf, but that structure is mutable:
     185              :     /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
     186              :     pub(crate) generation: Generation,
     187              : 
     188              :     /// The detailed sharding information from our parent Tenant.  This enables us to map keys
     189              :     /// to shards, and is constant through the lifetime of this Timeline.
     190              :     shard_identity: ShardIdentity,
     191              : 
     192              :     pub pg_version: u32,
     193              : 
     194              :     /// The tuple has two elements.
     195              :     /// 1. `LayerFileManager` keeps track of the various physical representations of the layer files (inmem, local, remote).
     196              :     /// 2. `LayerMap`, the acceleration data structure for `get_reconstruct_data`.
     197              :     ///
     198              :     /// `LayerMap` maps out the `(PAGE,LSN) / (KEY,LSN)` space, which is composed of `(KeyRange, LsnRange)` rectangles.
     199              :     /// We describe these rectangles through the `PersistentLayerDesc` struct.
     200              :     ///
     201              :     /// When we want to reconstruct a page, we first find the `PersistentLayerDesc`'s that we need for page reconstruction,
     202              :     /// using `LayerMap`. Then, we use `LayerFileManager` to get the `PersistentLayer`'s that correspond to the
     203              :     /// `PersistentLayerDesc`'s.
     204              :     ///
     205              :     /// Hence, it's important to keep things coherent. The `LayerFileManager` must always have an entry for all
     206              :     /// `PersistentLayerDesc`'s in the `LayerMap`. If it doesn't, `LayerFileManager::get_from_desc` will panic at
     207              :     /// runtime, e.g., during page reconstruction.
     208              :     ///
     209              :     /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
     210              :     /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
     211              :     pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
     212              : 
     213              :     /// Set of key ranges which should be covered by image layers to
     214              :     /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
     215              :     /// It is used by compaction task when it checks if new image layer should be created.
     216              :     /// Newly created image layer doesn't help to remove the delta layer, until the
     217              :     /// newly created image layer falls off the PITR horizon. So on next GC cycle,
     218              :     /// gc_timeline may still want the new image layer to be created. To avoid redundant
     219              :     /// image layers creation we should check if image layer exists but beyond PITR horizon.
     220              :     /// This is why we need remember GC cutoff LSN.
     221              :     ///
     222              :     wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
     223              : 
     224              :     last_freeze_at: AtomicLsn,
     225              :     // Atomic would be more appropriate here.
     226              :     last_freeze_ts: RwLock<Instant>,
     227              : 
     228              :     // WAL redo manager. `None` only for broken tenants.
     229              :     walredo_mgr: Option<Arc<super::WalRedoManager>>,
     230              : 
     231              :     /// Remote storage client.
     232              :     /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
     233              :     pub remote_client: Option<Arc<RemoteTimelineClient>>,
     234              : 
     235              :     // What page versions do we hold in the repository? If we get a
     236              :     // request > last_record_lsn, we need to wait until we receive all
     237              :     // the WAL up to the request. The SeqWait provides functions for
     238              :     // that. TODO: If we get a request for an old LSN, such that the
     239              :     // versions have already been garbage collected away, we should
     240              :     // throw an error, but we don't track that currently.
     241              :     //
     242              :     // last_record_lsn.load().last points to the end of last processed WAL record.
     243              :     //
     244              :     // We also remember the starting point of the previous record in
     245              :     // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
     246              :     // first WAL record when the node is started up. But here, we just
     247              :     // keep track of it.
     248              :     last_record_lsn: SeqWait<RecordLsn, Lsn>,
     249              : 
     250              :     // All WAL records have been processed and stored durably on files on
     251              :     // local disk, up to this LSN. On crash and restart, we need to re-process
     252              :     // the WAL starting from this point.
     253              :     //
     254              :     // Some later WAL records might have been processed and also flushed to disk
     255              :     // already, so don't be surprised to see some, but there's no guarantee on
     256              :     // them yet.
     257              :     disk_consistent_lsn: AtomicLsn,
     258              : 
     259              :     // Parent timeline that this timeline was branched from, and the LSN
     260              :     // of the branch point.
     261              :     ancestor_timeline: Option<Arc<Timeline>>,
     262              :     ancestor_lsn: Lsn,
     263              : 
     264              :     pub(super) metrics: TimelineMetrics,
     265              : 
     266              :     // `Timeline` doesn't write these metrics itself, but it manages the lifetime.  Code
     267              :     // in `crate::page_service` writes these metrics.
     268              :     pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
     269              : 
     270              :     directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM],
     271              : 
     272              :     /// Ensures layers aren't frozen by checkpointer between
     273              :     /// [`Timeline::get_layer_for_write`] and layer reads.
     274              :     /// Locked automatically by [`TimelineWriter`] and checkpointer.
     275              :     /// Must always be acquired before the layer map/individual layer lock
     276              :     /// to avoid deadlock.
     277              :     write_lock: tokio::sync::Mutex<Option<TimelineWriterState>>,
     278              : 
     279              :     /// Used to avoid multiple `flush_loop` tasks running
     280              :     pub(super) flush_loop_state: Mutex<FlushLoopState>,
     281              : 
     282              :     /// layer_flush_start_tx can be used to wake up the layer-flushing task.
     283              :     /// The value is a counter, incremented every time a new flush cycle is requested.
     284              :     /// The flush cycle counter is sent back on the layer_flush_done channel when
     285              :     /// the flush finishes. You can use that to wait for the flush to finish.
     286              :     layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
     287              :     /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
     288              :     layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
     289              : 
     290              :     // Needed to ensure that we can't create a branch at a point that was already garbage collected
     291              :     pub latest_gc_cutoff_lsn: Rcu<Lsn>,
     292              : 
     293              :     // List of child timelines and their branch points. This is needed to avoid
     294              :     // garbage collecting data that is still needed by the child timelines.
     295              :     pub gc_info: std::sync::RwLock<GcInfo>,
     296              : 
     297              :     // It may change across major versions so for simplicity
     298              :     // keep it after running initdb for a timeline.
     299              :     // It is needed in checks when we want to error on some operations
     300              :     // when they are requested for pre-initdb lsn.
     301              :     // It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn",
     302              :     // though let's keep them both for better error visibility.
     303              :     pub initdb_lsn: Lsn,
     304              : 
     305              :     /// When did we last calculate the partitioning?
     306              :     partitioning: Mutex<(KeyPartitioning, Lsn)>,
     307              : 
     308              :     /// Configuration: how often should the partitioning be recalculated.
     309              :     repartition_threshold: u64,
     310              : 
     311              :     /// Current logical size of the "datadir", at the last LSN.
     312              :     current_logical_size: LogicalSize,
     313              : 
     314              :     /// Information about the last processed message by the WAL receiver,
     315              :     /// or None if WAL receiver has not received anything for this timeline
     316              :     /// yet.
     317              :     pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
     318              :     pub walreceiver: Mutex<Option<WalReceiver>>,
     319              : 
     320              :     /// Relation size cache
     321              :     pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
     322              : 
     323              :     download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
     324              : 
     325              :     state: watch::Sender<TimelineState>,
     326              : 
     327              :     /// Prevent two tasks from deleting the timeline at the same time. If held, the
     328              :     /// timeline is being deleted. If 'true', the timeline has already been deleted.
     329              :     pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
     330              : 
     331              :     eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
     332              : 
     333              :     /// Load or creation time information about the disk_consistent_lsn and when the loading
     334              :     /// happened. Used for consumption metrics.
     335              :     pub(crate) loaded_at: (Lsn, SystemTime),
     336              : 
     337              :     /// Gate to prevent shutdown completing while I/O is still happening to this timeline's data
     338              :     pub(crate) gate: Gate,
     339              : 
     340              :     /// Cancellation token scoped to this timeline: anything doing long-running work relating
     341              :     /// to the timeline should drop out when this token fires.
     342              :     pub(crate) cancel: CancellationToken,
     343              : 
     344              :     /// Make sure we only have one running compaction at a time in tests.
     345              :     ///
     346              :     /// Must only be taken in two places:
     347              :     /// - [`Timeline::compact`] (this file)
     348              :     /// - [`delete::delete_local_layer_files`]
     349              :     ///
     350              :     /// Timeline deletion will acquire both compaction and gc locks in whatever order.
     351              :     compaction_lock: tokio::sync::Mutex<()>,
     352              : 
     353              :     /// Make sure we only have one running gc at a time.
     354              :     ///
     355              :     /// Must only be taken in two places:
     356              :     /// - [`Timeline::gc`] (this file)
     357              :     /// - [`delete::delete_local_layer_files`]
     358              :     ///
     359              :     /// Timeline deletion will acquire both compaction and gc locks in whatever order.
     360              :     gc_lock: tokio::sync::Mutex<()>,
     361              : 
     362              :     /// Cloned from [`super::Tenant::timeline_get_throttle`] on construction.
     363              :     timeline_get_throttle: Arc<
     364              :         crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
     365              :     >,
     366              : }
     367              : 
     368              : pub struct WalReceiverInfo {
     369              :     pub wal_source_connconf: PgConnectionConfig,
     370              :     pub last_received_msg_lsn: Lsn,
     371              :     pub last_received_msg_ts: u128,
     372              : }
     373              : 
     374              : ///
     375              : /// Information about how much history needs to be retained, needed by
     376              : /// Garbage Collection.
     377              : ///
     378              : pub struct GcInfo {
     379              :     /// Specific LSNs that are needed.
     380              :     ///
     381              :     /// Currently, this includes all points where child branches have
     382              :     /// been forked off from. In the future, could also include
     383              :     /// explicit user-defined snapshot points.
     384              :     pub retain_lsns: Vec<Lsn>,
     385              : 
     386              :     /// In addition to 'retain_lsns', keep everything newer than this
     387              :     /// point.
     388              :     ///
     389              :     /// This is calculated by subtracting 'gc_horizon' setting from
     390              :     /// last-record LSN
     391              :     ///
     392              :     /// FIXME: is this inclusive or exclusive?
     393              :     pub horizon_cutoff: Lsn,
     394              : 
     395              :     /// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this
     396              :     /// point.
     397              :     ///
     398              :     /// This is calculated by finding a number such that a record is needed for PITR
     399              :     /// if only if its LSN is larger than 'pitr_cutoff'.
     400              :     pub pitr_cutoff: Lsn,
     401              : }
     402              : 
     403              : /// An error happened in a get() operation.
     404            2 : #[derive(thiserror::Error, Debug)]
     405              : pub(crate) enum PageReconstructError {
     406              :     #[error(transparent)]
     407              :     Other(#[from] anyhow::Error),
     408              : 
     409              :     #[error("Ancestor LSN wait error: {0}")]
     410              :     AncestorLsnTimeout(#[from] WaitLsnError),
     411              : 
     412              :     #[error("timeline shutting down")]
     413              :     Cancelled,
     414              : 
     415              :     /// The ancestor of this is being stopped
     416              :     #[error("ancestor timeline {0} is being stopped")]
     417              :     AncestorStopping(TimelineId),
     418              : 
     419              :     /// An error happened replaying WAL records
     420              :     #[error(transparent)]
     421              :     WalRedo(anyhow::Error),
     422              : }
     423              : 
     424              : impl PageReconstructError {
     425              :     /// Returns true if this error indicates a tenant/timeline shutdown alike situation
     426            0 :     pub(crate) fn is_stopping(&self) -> bool {
     427            0 :         use PageReconstructError::*;
     428            0 :         match self {
     429            0 :             Other(_) => false,
     430            0 :             AncestorLsnTimeout(_) => false,
     431            0 :             Cancelled | AncestorStopping(_) => true,
     432            0 :             WalRedo(_) => false,
     433              :         }
     434            0 :     }
     435              : }
     436              : 
     437            0 : #[derive(thiserror::Error, Debug)]
     438              : enum CreateImageLayersError {
     439              :     #[error("timeline shutting down")]
     440              :     Cancelled,
     441              : 
     442              :     #[error(transparent)]
     443              :     GetVectoredError(GetVectoredError),
     444              : 
     445              :     #[error(transparent)]
     446              :     PageReconstructError(PageReconstructError),
     447              : 
     448              :     #[error(transparent)]
     449              :     Other(#[from] anyhow::Error),
     450              : }
     451              : 
     452            0 : #[derive(thiserror::Error, Debug)]
     453              : enum FlushLayerError {
     454              :     /// Timeline cancellation token was cancelled
     455              :     #[error("timeline shutting down")]
     456              :     Cancelled,
     457              : 
     458              :     #[error(transparent)]
     459              :     CreateImageLayersError(CreateImageLayersError),
     460              : 
     461              :     #[error(transparent)]
     462              :     Other(#[from] anyhow::Error),
     463              : }
     464              : 
     465            0 : #[derive(thiserror::Error, Debug)]
     466              : pub(crate) enum GetVectoredError {
     467              :     #[error("timeline shutting down")]
     468              :     Cancelled,
     469              : 
     470              :     #[error("Requested too many keys: {0} > {}", Timeline::MAX_GET_VECTORED_KEYS)]
     471              :     Oversized(u64),
     472              : 
     473              :     #[error("Requested at invalid LSN: {0}")]
     474              :     InvalidLsn(Lsn),
     475              : 
     476              :     #[error("Requested key {0} not found")]
     477              :     MissingKey(Key),
     478              : 
     479              :     #[error(transparent)]
     480              :     GetReadyAncestorError(GetReadyAncestorError),
     481              : 
     482              :     #[error(transparent)]
     483              :     Other(#[from] anyhow::Error),
     484              : }
     485              : 
     486            0 : #[derive(thiserror::Error, Debug)]
     487              : pub(crate) enum GetReadyAncestorError {
     488              :     #[error("ancestor timeline {0} is being stopped")]
     489              :     AncestorStopping(TimelineId),
     490              : 
     491              :     #[error("Ancestor LSN wait error: {0}")]
     492              :     AncestorLsnTimeout(#[from] WaitLsnError),
     493              : 
     494              :     #[error("Cancelled")]
     495              :     Cancelled,
     496              : 
     497              :     #[error(transparent)]
     498              :     Other(#[from] anyhow::Error),
     499              : }
     500              : 
     501            0 : #[derive(Clone, Copy)]
     502              : pub enum LogicalSizeCalculationCause {
     503              :     Initial,
     504              :     ConsumptionMetricsSyntheticSize,
     505              :     EvictionTaskImitation,
     506              :     TenantSizeHandler,
     507              : }
     508              : 
     509              : pub enum GetLogicalSizePriority {
     510              :     User,
     511              :     Background,
     512              : }
     513              : 
     514            0 : #[derive(enumset::EnumSetType)]
     515              : pub(crate) enum CompactFlags {
     516              :     ForceRepartition,
     517              : }
     518              : 
     519              : impl std::fmt::Debug for Timeline {
     520            0 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
     521            0 :         write!(f, "Timeline<{}>", self.timeline_id)
     522            0 :     }
     523              : }
     524              : 
     525            0 : #[derive(thiserror::Error, Debug)]
     526              : pub(crate) enum WaitLsnError {
     527              :     // Called on a timeline which is shutting down
     528              :     #[error("Shutdown")]
     529              :     Shutdown,
     530              : 
     531              :     // Called on an timeline not in active state or shutting down
     532              :     #[error("Bad state (not active)")]
     533              :     BadState,
     534              : 
     535              :     // Timeout expired while waiting for LSN to catch up with goal.
     536              :     #[error("{0}")]
     537              :     Timeout(String),
     538              : }
     539              : 
     540              : // The impls below achieve cancellation mapping for errors.
     541              : // Perhaps there's a way of achieving this with less cruft.
     542              : 
     543              : impl From<CreateImageLayersError> for CompactionError {
     544            0 :     fn from(e: CreateImageLayersError) -> Self {
     545            0 :         match e {
     546            0 :             CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
     547            0 :             _ => CompactionError::Other(e.into()),
     548              :         }
     549            0 :     }
     550              : }
     551              : 
     552              : impl From<CreateImageLayersError> for FlushLayerError {
     553            0 :     fn from(e: CreateImageLayersError) -> Self {
     554            0 :         match e {
     555            0 :             CreateImageLayersError::Cancelled => FlushLayerError::Cancelled,
     556            0 :             any => FlushLayerError::CreateImageLayersError(any),
     557              :         }
     558            0 :     }
     559              : }
     560              : 
     561              : impl From<PageReconstructError> for CreateImageLayersError {
     562            0 :     fn from(e: PageReconstructError) -> Self {
     563            0 :         match e {
     564            0 :             PageReconstructError::Cancelled => CreateImageLayersError::Cancelled,
     565            0 :             _ => CreateImageLayersError::PageReconstructError(e),
     566              :         }
     567            0 :     }
     568              : }
     569              : 
     570              : impl From<GetVectoredError> for CreateImageLayersError {
     571            0 :     fn from(e: GetVectoredError) -> Self {
     572            0 :         match e {
     573            0 :             GetVectoredError::Cancelled => CreateImageLayersError::Cancelled,
     574            0 :             _ => CreateImageLayersError::GetVectoredError(e),
     575              :         }
     576            0 :     }
     577              : }
     578              : 
     579              : impl From<GetReadyAncestorError> for PageReconstructError {
     580            2 :     fn from(e: GetReadyAncestorError) -> Self {
     581            2 :         use GetReadyAncestorError::*;
     582            2 :         match e {
     583            0 :             AncestorStopping(tid) => PageReconstructError::AncestorStopping(tid),
     584            0 :             AncestorLsnTimeout(wait_err) => PageReconstructError::AncestorLsnTimeout(wait_err),
     585            0 :             Cancelled => PageReconstructError::Cancelled,
     586            2 :             Other(other) => PageReconstructError::Other(other),
     587              :         }
     588            2 :     }
     589              : }
     590              : 
     591              : #[derive(
     592              :     Eq,
     593            4 :     PartialEq,
     594            0 :     Debug,
     595              :     Copy,
     596            0 :     Clone,
     597          114 :     strum_macros::EnumString,
     598            0 :     strum_macros::Display,
     599            0 :     serde_with::DeserializeFromStr,
     600            0 :     serde_with::SerializeDisplay,
     601              : )]
     602              : #[strum(serialize_all = "kebab-case")]
     603              : pub enum GetVectoredImpl {
     604              :     Sequential,
     605              :     Vectored,
     606              : }
     607              : 
     608              : /// Public interface functions
     609              : impl Timeline {
     610              :     /// Get the LSN where this branch was created
     611            6 :     pub(crate) fn get_ancestor_lsn(&self) -> Lsn {
     612            6 :         self.ancestor_lsn
     613            6 :     }
     614              : 
     615              :     /// Get the ancestor's timeline id
     616           14 :     pub(crate) fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
     617           14 :         self.ancestor_timeline
     618           14 :             .as_ref()
     619           14 :             .map(|ancestor| ancestor.timeline_id)
     620           14 :     }
     621              : 
     622              :     /// Lock and get timeline's GC cutoff
     623          622 :     pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
     624          622 :         self.latest_gc_cutoff_lsn.read()
     625          622 :     }
     626              : 
     627              :     /// Look up given page version.
     628              :     ///
     629              :     /// If a remote layer file is needed, it is downloaded as part of this
     630              :     /// call.
     631              :     ///
     632              :     /// NOTE: It is considered an error to 'get' a key that doesn't exist. The
     633              :     /// abstraction above this needs to store suitable metadata to track what
     634              :     /// data exists with what keys, in separate metadata entries. If a
     635              :     /// non-existent key is requested, we may incorrectly return a value from
     636              :     /// an ancestor branch, for example, or waste a lot of cycles chasing the
     637              :     /// non-existing key.
     638              :     ///
     639              :     /// # Cancel-Safety
     640              :     ///
     641              :     /// This method is cancellation-safe.
     642       502486 :     pub(crate) async fn get(
     643       502486 :         &self,
     644       502486 :         key: Key,
     645       502486 :         lsn: Lsn,
     646       502486 :         ctx: &RequestContext,
     647       502486 :     ) -> Result<Bytes, PageReconstructError> {
     648       502486 :         if !lsn.is_valid() {
     649            0 :             return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
     650       502486 :         }
     651       502486 : 
     652       502486 :         self.timeline_get_throttle.throttle(ctx, 1).await;
     653              : 
     654              :         // This check is debug-only because of the cost of hashing, and because it's a double-check: we
     655              :         // already checked the key against the shard_identity when looking up the Timeline from
     656              :         // page_service.
     657       502486 :         debug_assert!(!self.shard_identity.is_key_disposable(&key));
     658              : 
     659              :         // XXX: structured stats collection for layer eviction here.
     660            0 :         trace!(
     661            0 :             "get page request for {}@{} from task kind {:?}",
     662            0 :             key,
     663            0 :             lsn,
     664            0 :             ctx.task_kind()
     665            0 :         );
     666              : 
     667              :         // Check the page cache. We will get back the most recent page with lsn <= `lsn`.
     668              :         // The cached image can be returned directly if there is no WAL between the cached image
     669              :         // and requested LSN. The cached image can also be used to reduce the amount of WAL needed
     670              :         // for redo.
     671       502486 :         let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
     672            0 :             Some((cached_lsn, cached_img)) => {
     673            0 :                 match cached_lsn.cmp(&lsn) {
     674            0 :                     Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
     675              :                     Ordering::Equal => {
     676            0 :                         MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
     677            0 :                         return Ok(cached_img); // exact LSN match, return the image
     678              :                     }
     679              :                     Ordering::Greater => {
     680            0 :                         unreachable!("the returned lsn should never be after the requested lsn")
     681              :                     }
     682              :                 }
     683            0 :                 Some((cached_lsn, cached_img))
     684              :             }
     685       502486 :             None => None,
     686              :         };
     687              : 
     688       502486 :         let mut reconstruct_state = ValueReconstructState {
     689       502486 :             records: Vec::new(),
     690       502486 :             img: cached_page_img,
     691       502486 :         };
     692       502486 : 
     693       502486 :         let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
     694       502486 :         let path = self
     695       502486 :             .get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
     696        36411 :             .await?;
     697       502378 :         timer.stop_and_record();
     698       502378 : 
     699       502378 :         let start = Instant::now();
     700       502378 :         let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
     701       502378 :         let elapsed = start.elapsed();
     702       502378 :         crate::metrics::RECONSTRUCT_TIME
     703       502378 :             .for_result(&res)
     704       502378 :             .observe(elapsed.as_secs_f64());
     705       502378 : 
     706       502378 :         if cfg!(feature = "testing") && res.is_err() {
     707              :             // it can only be walredo issue
     708              :             use std::fmt::Write;
     709              : 
     710            0 :             let mut msg = String::new();
     711            0 : 
     712            0 :             path.into_iter().for_each(|(res, cont_lsn, layer)| {
     713            0 :                 writeln!(
     714            0 :                     msg,
     715            0 :                     "- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
     716            0 :                     layer(),
     717            0 :                 )
     718            0 :                 .expect("string grows")
     719            0 :             });
     720              : 
     721              :             // this is to rule out or provide evidence that we could in some cases read a duplicate
     722              :             // walrecord
     723            0 :             tracing::info!("walredo failed, path:\n{msg}");
     724       502378 :         }
     725              : 
     726       502378 :         res
     727       502486 :     }
     728              : 
     729              :     pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
     730              : 
     731              :     /// Look up multiple page versions at a given LSN
     732              :     ///
     733              :     /// This naive implementation will be replaced with a more efficient one
     734              :     /// which actually vectorizes the read path.
     735          420 :     pub(crate) async fn get_vectored(
     736          420 :         &self,
     737          420 :         keyspace: KeySpace,
     738          420 :         lsn: Lsn,
     739          420 :         ctx: &RequestContext,
     740          420 :     ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
     741          420 :         if !lsn.is_valid() {
     742            0 :             return Err(GetVectoredError::InvalidLsn(lsn));
     743          420 :         }
     744          420 : 
     745          420 :         let key_count = keyspace.total_size().try_into().unwrap();
     746          420 :         if key_count > Timeline::MAX_GET_VECTORED_KEYS {
     747            0 :             return Err(GetVectoredError::Oversized(key_count));
     748          420 :         }
     749          420 : 
     750          420 :         self.timeline_get_throttle
     751          420 :             .throttle(ctx, key_count as usize)
     752            0 :             .await;
     753              : 
     754          840 :         for range in &keyspace.ranges {
     755          420 :             let mut key = range.start;
     756          980 :             while key != range.end {
     757          560 :                 assert!(!self.shard_identity.is_key_disposable(&key));
     758          560 :                 key = key.next();
     759              :             }
     760              :         }
     761              : 
     762            0 :         trace!(
     763            0 :             "get vectored request for {:?}@{} from task kind {:?} will use {} implementation",
     764            0 :             keyspace,
     765            0 :             lsn,
     766            0 :             ctx.task_kind(),
     767            0 :             self.conf.get_vectored_impl
     768            0 :         );
     769              : 
     770          420 :         let _timer = crate::metrics::GET_VECTORED_LATENCY
     771          420 :             .for_task_kind(ctx.task_kind())
     772          420 :             .map(|t| t.start_timer());
     773          420 : 
     774          420 :         match self.conf.get_vectored_impl {
     775              :             GetVectoredImpl::Sequential => {
     776          420 :                 self.get_vectored_sequential_impl(keyspace, lsn, ctx).await
     777              :             }
     778              :             GetVectoredImpl::Vectored => {
     779            0 :                 let vectored_res = self.get_vectored_impl(keyspace.clone(), lsn, ctx).await;
     780              : 
     781            0 :                 self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
     782            0 :                     .await;
     783              : 
     784            0 :                 vectored_res
     785              :             }
     786              :         }
     787          420 :     }
     788              : 
     789          430 :     pub(super) async fn get_vectored_sequential_impl(
     790          430 :         &self,
     791          430 :         keyspace: KeySpace,
     792          430 :         lsn: Lsn,
     793          430 :         ctx: &RequestContext,
     794          430 :     ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
     795          430 :         let mut values = BTreeMap::new();
     796          860 :         for range in keyspace.ranges {
     797          430 :             let mut key = range.start;
     798         1310 :             while key != range.end {
     799          880 :                 let block = self.get(key, lsn, ctx).await;
     800              : 
     801              :                 use PageReconstructError::*;
     802            0 :                 match block {
     803              :                     Err(Cancelled | AncestorStopping(_)) => {
     804            0 :                         return Err(GetVectoredError::Cancelled)
     805              :                     }
     806            0 :                     Err(Other(err)) if err.to_string().contains("could not find data for key") => {
     807            0 :                         return Err(GetVectoredError::MissingKey(key))
     808              :                     }
     809          880 :                     _ => {
     810          880 :                         values.insert(key, block);
     811          880 :                         key = key.next();
     812          880 :                     }
     813              :                 }
     814              :             }
     815              :         }
     816              : 
     817          430 :         Ok(values)
     818          430 :     }
     819              : 
     820           10 :     pub(super) async fn get_vectored_impl(
     821           10 :         &self,
     822           10 :         keyspace: KeySpace,
     823           10 :         lsn: Lsn,
     824           10 :         ctx: &RequestContext,
     825           10 :     ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
     826           10 :         let mut reconstruct_state = ValuesReconstructState::new();
     827           10 : 
     828           10 :         self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx)
     829           28 :             .await?;
     830              : 
     831           10 :         let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
     832          330 :         for (key, res) in reconstruct_state.keys {
     833          320 :             match res {
     834            0 :                 Err(err) => {
     835            0 :                     results.insert(key, Err(err));
     836            0 :                 }
     837          320 :                 Ok(state) => {
     838          320 :                     let state = ValueReconstructState::from(state);
     839              : 
     840          320 :                     let reconstruct_res = self.reconstruct_value(key, lsn, state).await;
     841          320 :                     results.insert(key, reconstruct_res);
     842              :                 }
     843              :             }
     844              :         }
     845              : 
     846           10 :         Ok(results)
     847           10 :     }
     848              : 
     849           10 :     pub(super) async fn validate_get_vectored_impl(
     850           10 :         &self,
     851           10 :         vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>,
     852           10 :         keyspace: KeySpace,
     853           10 :         lsn: Lsn,
     854           10 :         ctx: &RequestContext,
     855           10 :     ) {
     856           10 :         let sequential_res = self
     857           10 :             .get_vectored_sequential_impl(keyspace.clone(), lsn, ctx)
     858           20 :             .await;
     859              : 
     860            0 :         fn errors_match(lhs: &GetVectoredError, rhs: &GetVectoredError) -> bool {
     861            0 :             use GetVectoredError::*;
     862            0 :             match (lhs, rhs) {
     863            0 :                 (Cancelled, Cancelled) => true,
     864            0 :                 (_, Cancelled) => true,
     865            0 :                 (Oversized(l), Oversized(r)) => l == r,
     866            0 :                 (InvalidLsn(l), InvalidLsn(r)) => l == r,
     867            0 :                 (MissingKey(l), MissingKey(r)) => l == r,
     868            0 :                 (GetReadyAncestorError(_), GetReadyAncestorError(_)) => true,
     869            0 :                 (Other(_), Other(_)) => true,
     870            0 :                 _ => false,
     871              :             }
     872            0 :         }
     873              : 
     874           10 :         match (&sequential_res, vectored_res) {
     875            0 :             (Err(seq_err), Ok(_)) => {
     876            0 :                 panic!(concat!("Sequential get failed with {}, but vectored get did not",
     877            0 :                                " - keyspace={:?} lsn={}"),
     878            0 :                        seq_err, keyspace, lsn) },
     879            0 :             (Ok(_), Err(vec_err)) => {
     880            0 :                 panic!(concat!("Vectored get failed with {}, but sequential get did not",
     881            0 :                                " - keyspace={:?} lsn={}"),
     882            0 :                        vec_err, keyspace, lsn) },
     883            0 :             (Err(seq_err), Err(vec_err)) => {
     884            0 :                 assert!(errors_match(seq_err, vec_err),
     885            0 :                         "Mismatched errors: {seq_err} != {vec_err} - keyspace={keyspace:?} lsn={lsn}")},
     886           10 :             (Ok(seq_values), Ok(vec_values)) => {
     887          320 :                 seq_values.iter().zip(vec_values.iter()).for_each(|((seq_key, seq_res), (vec_key, vec_res))| {
     888          320 :                     assert_eq!(seq_key, vec_key);
     889          320 :                     match (seq_res, vec_res) {
     890          320 :                         (Ok(seq_blob), Ok(vec_blob)) => {
     891          320 :                             assert_eq!(seq_blob, vec_blob,
     892            0 :                                        "Image mismatch for key {seq_key} - keyspace={keyspace:?} lsn={lsn}");
     893              :                         },
     894            0 :                         (Err(err), Ok(_)) => {
     895            0 :                             panic!(
     896            0 :                                 concat!("Sequential get failed with {} for key {}, but vectored get did not",
     897            0 :                                         " - keyspace={:?} lsn={}"),
     898            0 :                                 err, seq_key, keyspace, lsn) },
     899            0 :                         (Ok(_), Err(err)) => {
     900            0 :                             panic!(
     901            0 :                                 concat!("Vectored get failed with {} for key {}, but sequential get did not",
     902            0 :                                         " - keyspace={:?} lsn={}"),
     903            0 :                                 err, seq_key, keyspace, lsn) },
     904            0 :                         (Err(_), Err(_)) => {}
     905              :                     }
     906          320 :                 })
     907              :             }
     908              :         }
     909           10 :     }
     910              : 
     911              :     /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
     912      2900476 :     pub(crate) fn get_last_record_lsn(&self) -> Lsn {
     913      2900476 :         self.last_record_lsn.load().last
     914      2900476 :     }
     915              : 
     916            0 :     pub(crate) fn get_prev_record_lsn(&self) -> Lsn {
     917            0 :         self.last_record_lsn.load().prev
     918            0 :     }
     919              : 
     920              :     /// Atomically get both last and prev.
     921          210 :     pub(crate) fn get_last_record_rlsn(&self) -> RecordLsn {
     922          210 :         self.last_record_lsn.load()
     923          210 :     }
     924              : 
     925          694 :     pub(crate) fn get_disk_consistent_lsn(&self) -> Lsn {
     926          694 :         self.disk_consistent_lsn.load()
     927          694 :     }
     928              : 
     929              :     /// remote_consistent_lsn from the perspective of the tenant's current generation,
     930              :     /// not validated with control plane yet.
     931              :     /// See [`Self::get_remote_consistent_lsn_visible`].
     932            0 :     pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
     933            0 :         if let Some(remote_client) = &self.remote_client {
     934            0 :             remote_client.remote_consistent_lsn_projected()
     935              :         } else {
     936            0 :             None
     937              :         }
     938            0 :     }
     939              : 
     940              :     /// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
     941              :     /// i.e. a value of remote_consistent_lsn_projected which has undergone
     942              :     /// generation validation in the deletion queue.
     943            0 :     pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
     944            0 :         if let Some(remote_client) = &self.remote_client {
     945            0 :             remote_client.remote_consistent_lsn_visible()
     946              :         } else {
     947            0 :             None
     948              :         }
     949            0 :     }
     950              : 
     951              :     /// The sum of the file size of all historic layers in the layer map.
     952              :     /// This method makes no distinction between local and remote layers.
     953              :     /// Hence, the result **does not represent local filesystem usage**.
     954            0 :     pub(crate) async fn layer_size_sum(&self) -> u64 {
     955            0 :         let guard = self.layers.read().await;
     956            0 :         let layer_map = guard.layer_map();
     957            0 :         let mut size = 0;
     958            0 :         for l in layer_map.iter_historic_layers() {
     959            0 :             size += l.file_size();
     960            0 :         }
     961            0 :         size
     962            0 :     }
     963              : 
     964            0 :     pub(crate) fn resident_physical_size(&self) -> u64 {
     965            0 :         self.metrics.resident_physical_size_get()
     966            0 :     }
     967              : 
     968            0 :     pub(crate) fn get_directory_metrics(&self) -> [u64; DirectoryKind::KINDS_NUM] {
     969            0 :         array::from_fn(|idx| self.directory_metrics[idx].load(AtomicOrdering::Relaxed))
     970            0 :     }
     971              : 
     972              :     ///
     973              :     /// Wait until WAL has been received and processed up to this LSN.
     974              :     ///
     975              :     /// You should call this before any of the other get_* or list_* functions. Calling
     976              :     /// those functions with an LSN that has been processed yet is an error.
     977              :     ///
     978       226536 :     pub(crate) async fn wait_lsn(
     979       226536 :         &self,
     980       226536 :         lsn: Lsn,
     981       226536 :         _ctx: &RequestContext, /* Prepare for use by cancellation */
     982       226536 :     ) -> Result<(), WaitLsnError> {
     983       226536 :         if self.cancel.is_cancelled() {
     984            0 :             return Err(WaitLsnError::Shutdown);
     985       226536 :         } else if !self.is_active() {
     986            0 :             return Err(WaitLsnError::BadState);
     987       226536 :         }
     988              : 
     989              :         // This should never be called from the WAL receiver, because that could lead
     990              :         // to a deadlock.
     991              :         debug_assert!(
     992       226536 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
     993            0 :             "wait_lsn cannot be called in WAL receiver"
     994              :         );
     995              :         debug_assert!(
     996       226536 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
     997            0 :             "wait_lsn cannot be called in WAL receiver"
     998              :         );
     999              :         debug_assert!(
    1000       226536 :             task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
    1001            0 :             "wait_lsn cannot be called in WAL receiver"
    1002              :         );
    1003              : 
    1004       226536 :         let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
    1005       226536 : 
    1006       226536 :         match self
    1007       226536 :             .last_record_lsn
    1008       226536 :             .wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
    1009            0 :             .await
    1010              :         {
    1011       226536 :             Ok(()) => Ok(()),
    1012            0 :             Err(e) => {
    1013            0 :                 use utils::seqwait::SeqWaitError::*;
    1014            0 :                 match e {
    1015            0 :                     Shutdown => Err(WaitLsnError::Shutdown),
    1016              :                     Timeout => {
    1017              :                         // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
    1018            0 :                         drop(_timer);
    1019            0 :                         let walreceiver_status = self.walreceiver_status();
    1020            0 :                         Err(WaitLsnError::Timeout(format!(
    1021            0 :                         "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
    1022            0 :                         lsn,
    1023            0 :                         self.get_last_record_lsn(),
    1024            0 :                         self.get_disk_consistent_lsn(),
    1025            0 :                         walreceiver_status,
    1026            0 :                     )))
    1027              :                     }
    1028              :                 }
    1029              :             }
    1030              :         }
    1031       226536 :     }
    1032              : 
    1033            0 :     pub(crate) fn walreceiver_status(&self) -> String {
    1034            0 :         match &*self.walreceiver.lock().unwrap() {
    1035            0 :             None => "stopping or stopped".to_string(),
    1036            0 :             Some(walreceiver) => match walreceiver.status() {
    1037            0 :                 Some(status) => status.to_human_readable_string(),
    1038            0 :                 None => "Not active".to_string(),
    1039              :             },
    1040              :         }
    1041            0 :     }
    1042              : 
    1043              :     /// Check that it is valid to request operations with that lsn.
    1044          214 :     pub(crate) fn check_lsn_is_in_scope(
    1045          214 :         &self,
    1046          214 :         lsn: Lsn,
    1047          214 :         latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
    1048          214 :     ) -> anyhow::Result<()> {
    1049          214 :         ensure!(
    1050          214 :             lsn >= **latest_gc_cutoff_lsn,
    1051            4 :             "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
    1052            4 :             lsn,
    1053            4 :             **latest_gc_cutoff_lsn,
    1054              :         );
    1055          210 :         Ok(())
    1056          214 :     }
    1057              : 
    1058              :     /// Flush to disk all data that was written with the put_* functions
    1059         1052 :     #[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
    1060              :     pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
    1061              :         self.freeze_inmem_layer(false).await;
    1062              :         self.flush_frozen_layers_and_wait().await
    1063              :     }
    1064              : 
    1065              :     /// Outermost timeline compaction operation; downloads needed layers.
    1066          408 :     pub(crate) async fn compact(
    1067          408 :         self: &Arc<Self>,
    1068          408 :         cancel: &CancellationToken,
    1069          408 :         flags: EnumSet<CompactFlags>,
    1070          408 :         ctx: &RequestContext,
    1071          408 :     ) -> Result<(), CompactionError> {
    1072          408 :         // most likely the cancellation token is from background task, but in tests it could be the
    1073          408 :         // request task as well.
    1074          408 : 
    1075          408 :         let prepare = async move {
    1076          408 :             let guard = self.compaction_lock.lock().await;
    1077              : 
    1078          408 :             let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
    1079          408 :                 BackgroundLoopKind::Compaction,
    1080          408 :                 ctx,
    1081          408 :             )
    1082            0 :             .await;
    1083              : 
    1084          408 :             (guard, permit)
    1085          408 :         };
    1086              : 
    1087              :         // this wait probably never needs any "long time spent" logging, because we already nag if
    1088              :         // compaction task goes over it's period (20s) which is quite often in production.
    1089          408 :         let (_guard, _permit) = tokio::select! {
    1090          408 :             tuple = prepare => { tuple },
    1091              :             _ = self.cancel.cancelled() => return Ok(()),
    1092              :             _ = cancel.cancelled() => return Ok(()),
    1093              :         };
    1094              : 
    1095          408 :         let last_record_lsn = self.get_last_record_lsn();
    1096          408 : 
    1097          408 :         // Last record Lsn could be zero in case the timeline was just created
    1098          408 :         if !last_record_lsn.is_valid() {
    1099            0 :             warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
    1100            0 :             return Ok(());
    1101          408 :         }
    1102          408 : 
    1103          408 :         // High level strategy for compaction / image creation:
    1104          408 :         //
    1105          408 :         // 1. First, calculate the desired "partitioning" of the
    1106          408 :         // currently in-use key space. The goal is to partition the
    1107          408 :         // key space into roughly fixed-size chunks, but also take into
    1108          408 :         // account any existing image layers, and try to align the
    1109          408 :         // chunk boundaries with the existing image layers to avoid
    1110          408 :         // too much churn. Also try to align chunk boundaries with
    1111          408 :         // relation boundaries.  In principle, we don't know about
    1112          408 :         // relation boundaries here, we just deal with key-value
    1113          408 :         // pairs, and the code in pgdatadir_mapping.rs knows how to
    1114          408 :         // map relations into key-value pairs. But in practice we know
    1115          408 :         // that 'field6' is the block number, and the fields 1-5
    1116          408 :         // identify a relation. This is just an optimization,
    1117          408 :         // though.
    1118          408 :         //
    1119          408 :         // 2. Once we know the partitioning, for each partition,
    1120          408 :         // decide if it's time to create a new image layer. The
    1121          408 :         // criteria is: there has been too much "churn" since the last
    1122          408 :         // image layer? The "churn" is fuzzy concept, it's a
    1123          408 :         // combination of too many delta files, or too much WAL in
    1124          408 :         // total in the delta file. Or perhaps: if creating an image
    1125          408 :         // file would allow to delete some older files.
    1126          408 :         //
    1127          408 :         // 3. After that, we compact all level0 delta files if there
    1128          408 :         // are too many of them.  While compacting, we also garbage
    1129          408 :         // collect any page versions that are no longer needed because
    1130          408 :         // of the new image layers we created in step 2.
    1131          408 :         //
    1132          408 :         // TODO: This high level strategy hasn't been implemented yet.
    1133          408 :         // Below are functions compact_level0() and create_image_layers()
    1134          408 :         // but they are a bit ad hoc and don't quite work like it's explained
    1135          408 :         // above. Rewrite it.
    1136          408 : 
    1137          408 :         // Is the timeline being deleted?
    1138          408 :         if self.is_stopping() {
    1139            0 :             trace!("Dropping out of compaction on timeline shutdown");
    1140            0 :             return Err(CompactionError::ShuttingDown);
    1141          408 :         }
    1142          408 : 
    1143          408 :         let target_file_size = self.get_checkpoint_distance();
    1144          408 : 
    1145          408 :         // Define partitioning schema if needed
    1146          408 : 
    1147          408 :         // FIXME: the match should only cover repartitioning, not the next steps
    1148          408 :         match self
    1149          408 :             .repartition(
    1150          408 :                 self.get_last_record_lsn(),
    1151          408 :                 self.get_compaction_target_size(),
    1152          408 :                 flags,
    1153          408 :                 ctx,
    1154          408 :             )
    1155        13252 :             .await
    1156              :         {
    1157          408 :             Ok((partitioning, lsn)) => {
    1158          408 :                 // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
    1159          408 :                 let image_ctx = RequestContextBuilder::extend(ctx)
    1160          408 :                     .access_stats_behavior(AccessStatsBehavior::Skip)
    1161          408 :                     .build();
    1162          408 : 
    1163          408 :                 // 2. Compact
    1164          408 :                 let timer = self.metrics.compact_time_histo.start_timer();
    1165        39551 :                 self.compact_level0(target_file_size, ctx).await?;
    1166          408 :                 timer.stop_and_record();
    1167              : 
    1168              :                 // 3. Create new image layers for partitions that have been modified
    1169              :                 // "enough".
    1170          408 :                 let layers = self
    1171          408 :                     .create_image_layers(&partitioning, lsn, false, &image_ctx)
    1172            0 :                     .await
    1173          408 :                     .map_err(anyhow::Error::from)?;
    1174          408 :                 if let Some(remote_client) = &self.remote_client {
    1175          408 :                     for layer in layers {
    1176            0 :                         remote_client.schedule_layer_file_upload(layer)?;
    1177              :                     }
    1178            0 :                 }
    1179              : 
    1180          408 :                 if let Some(remote_client) = &self.remote_client {
    1181              :                     // should any new image layer been created, not uploading index_part will
    1182              :                     // result in a mismatch between remote_physical_size and layermap calculated
    1183              :                     // size, which will fail some tests, but should not be an issue otherwise.
    1184          408 :                     remote_client.schedule_index_upload_for_file_changes()?;
    1185            0 :                 }
    1186              :             }
    1187            0 :             Err(err) => {
    1188            0 :                 // no partitioning? This is normal, if the timeline was just created
    1189            0 :                 // as an empty timeline. Also in unit tests, when we use the timeline
    1190            0 :                 // as a simple key-value store, ignoring the datadir layout. Log the
    1191            0 :                 // error but continue.
    1192            0 :                 //
    1193            0 :                 // Suppress error when it's due to cancellation
    1194            0 :                 if !self.cancel.is_cancelled() {
    1195            0 :                     error!("could not compact, repartitioning keyspace failed: {err:?}");
    1196            0 :                 }
    1197              :             }
    1198              :         };
    1199              : 
    1200          408 :         Ok(())
    1201          408 :     }
    1202              : 
    1203              :     /// Mutate the timeline with a [`TimelineWriter`].
    1204      2957008 :     pub(crate) async fn writer(&self) -> TimelineWriter<'_> {
    1205      2957008 :         TimelineWriter {
    1206      2957008 :             tl: self,
    1207      2957008 :             write_guard: self.write_lock.lock().await,
    1208              :         }
    1209      2957008 :     }
    1210              : 
    1211            0 :     pub(crate) fn activate(
    1212            0 :         self: &Arc<Self>,
    1213            0 :         broker_client: BrokerClientChannel,
    1214            0 :         background_jobs_can_start: Option<&completion::Barrier>,
    1215            0 :         ctx: &RequestContext,
    1216            0 :     ) {
    1217            0 :         if self.tenant_shard_id.is_zero() {
    1218            0 :             // Logical size is only maintained accurately on shard zero.
    1219            0 :             self.spawn_initial_logical_size_computation_task(ctx);
    1220            0 :         }
    1221            0 :         self.launch_wal_receiver(ctx, broker_client);
    1222            0 :         self.set_state(TimelineState::Active);
    1223            0 :         self.launch_eviction_task(background_jobs_can_start);
    1224            0 :     }
    1225              : 
    1226              :     /// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
    1227              :     /// also to remote storage.  This method can easily take multiple seconds for a busy timeline.
    1228              :     ///
    1229              :     /// While we are flushing, we continue to accept read I/O.
    1230            6 :     pub(crate) async fn flush_and_shutdown(&self) {
    1231            6 :         debug_assert_current_span_has_tenant_and_timeline_id();
    1232              : 
    1233              :         // Stop ingesting data, so that we are not still writing to an InMemoryLayer while
    1234              :         // trying to flush
    1235            0 :         tracing::debug!("Waiting for WalReceiverManager...");
    1236            6 :         task_mgr::shutdown_tasks(
    1237            6 :             Some(TaskKind::WalReceiverManager),
    1238            6 :             Some(self.tenant_shard_id),
    1239            6 :             Some(self.timeline_id),
    1240            6 :         )
    1241            0 :         .await;
    1242              : 
    1243              :         // Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
    1244            6 :         self.last_record_lsn.shutdown();
    1245            6 : 
    1246            6 :         // now all writers to InMemory layer are gone, do the final flush if requested
    1247            6 :         match self.freeze_and_flush().await {
    1248              :             Ok(_) => {
    1249              :                 // drain the upload queue
    1250            6 :                 if let Some(client) = self.remote_client.as_ref() {
    1251              :                     // if we did not wait for completion here, it might be our shutdown process
    1252              :                     // didn't wait for remote uploads to complete at all, as new tasks can forever
    1253              :                     // be spawned.
    1254              :                     //
    1255              :                     // what is problematic is the shutting down of RemoteTimelineClient, because
    1256              :                     // obviously it does not make sense to stop while we wait for it, but what
    1257              :                     // about corner cases like s3 suddenly hanging up?
    1258            6 :                     if let Err(e) = client.shutdown().await {
    1259              :                         // Non-fatal.  Shutdown is infallible.  Failures to flush just mean that
    1260              :                         // we have some extra WAL replay to do next time the timeline starts.
    1261            0 :                         warn!("failed to flush to remote storage: {e:#}");
    1262            6 :                     }
    1263            0 :                 }
    1264              :             }
    1265            0 :             Err(e) => {
    1266              :                 // Non-fatal.  Shutdown is infallible.  Failures to flush just mean that
    1267              :                 // we have some extra WAL replay to do next time the timeline starts.
    1268            0 :                 warn!("failed to freeze and flush: {e:#}");
    1269              :             }
    1270              :         }
    1271              : 
    1272            6 :         self.shutdown().await;
    1273            6 :     }
    1274              : 
    1275              :     /// Shut down immediately, without waiting for any open layers to flush to disk.  This is a subset of
    1276              :     /// the graceful [`Timeline::flush_and_shutdown`] function.
    1277            8 :     pub(crate) async fn shutdown(&self) {
    1278            8 :         debug_assert_current_span_has_tenant_and_timeline_id();
    1279              : 
    1280              :         // Signal any subscribers to our cancellation token to drop out
    1281            0 :         tracing::debug!("Cancelling CancellationToken");
    1282            8 :         self.cancel.cancel();
    1283            8 : 
    1284            8 :         // Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
    1285            8 :         // while doing so.
    1286            8 :         self.last_record_lsn.shutdown();
    1287            8 : 
    1288            8 :         // Shut down the layer flush task before the remote client, as one depends on the other
    1289            8 :         task_mgr::shutdown_tasks(
    1290            8 :             Some(TaskKind::LayerFlushTask),
    1291            8 :             Some(self.tenant_shard_id),
    1292            8 :             Some(self.timeline_id),
    1293            8 :         )
    1294            6 :         .await;
    1295              : 
    1296              :         // Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
    1297              :         // case our caller wants to use that for a deletion
    1298            8 :         if let Some(remote_client) = self.remote_client.as_ref() {
    1299            8 :             match remote_client.stop() {
    1300            8 :                 Ok(()) => {}
    1301            0 :                 Err(StopError::QueueUninitialized) => {
    1302            0 :                     // Shutting down during initialization is legal
    1303            0 :                 }
    1304              :             }
    1305            0 :         }
    1306              : 
    1307            0 :         tracing::debug!("Waiting for tasks...");
    1308              : 
    1309            8 :         task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;
    1310              : 
    1311              :         // Finally wait until any gate-holders are complete
    1312            8 :         self.gate.close().await;
    1313            8 :     }
    1314              : 
    1315          294 :     pub(crate) fn set_state(&self, new_state: TimelineState) {
    1316          294 :         match (self.current_state(), new_state) {
    1317          294 :             (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
    1318            2 :                 info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
    1319              :             }
    1320            0 :             (st, TimelineState::Loading) => {
    1321            0 :                 error!("ignoring transition from {st:?} into Loading state");
    1322              :             }
    1323            0 :             (TimelineState::Broken { .. }, new_state) => {
    1324            0 :                 error!("Ignoring state update {new_state:?} for broken timeline");
    1325              :             }
    1326              :             (TimelineState::Stopping, TimelineState::Active) => {
    1327            0 :                 error!("Not activating a Stopping timeline");
    1328              :             }
    1329          292 :             (_, new_state) => {
    1330          292 :                 self.state.send_replace(new_state);
    1331          292 :             }
    1332              :         }
    1333          294 :     }
    1334              : 
    1335            2 :     pub(crate) fn set_broken(&self, reason: String) {
    1336            2 :         let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
    1337            2 :         let broken_state = TimelineState::Broken {
    1338            2 :             reason,
    1339            2 :             backtrace: backtrace_str,
    1340            2 :         };
    1341            2 :         self.set_state(broken_state);
    1342            2 : 
    1343            2 :         // Although the Broken state is not equivalent to shutdown() (shutdown will be called
    1344            2 :         // later when this tenant is detach or the process shuts down), firing the cancellation token
    1345            2 :         // here avoids the need for other tasks to watch for the Broken state explicitly.
    1346            2 :         self.cancel.cancel();
    1347            2 :     }
    1348              : 
    1349       227862 :     pub(crate) fn current_state(&self) -> TimelineState {
    1350       227862 :         self.state.borrow().clone()
    1351       227862 :     }
    1352              : 
    1353            6 :     pub(crate) fn is_broken(&self) -> bool {
    1354            6 :         matches!(&*self.state.borrow(), TimelineState::Broken { .. })
    1355            6 :     }
    1356              : 
    1357       226752 :     pub(crate) fn is_active(&self) -> bool {
    1358       226752 :         self.current_state() == TimelineState::Active
    1359       226752 :     }
    1360              : 
    1361          816 :     pub(crate) fn is_stopping(&self) -> bool {
    1362          816 :         self.current_state() == TimelineState::Stopping
    1363          816 :     }
    1364              : 
    1365            0 :     pub(crate) fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
    1366            0 :         self.state.subscribe()
    1367            0 :     }
    1368              : 
    1369       226538 :     pub(crate) async fn wait_to_become_active(
    1370       226538 :         &self,
    1371       226538 :         _ctx: &RequestContext, // Prepare for use by cancellation
    1372       226538 :     ) -> Result<(), TimelineState> {
    1373       226538 :         let mut receiver = self.state.subscribe();
    1374       226538 :         loop {
    1375       226538 :             let current_state = receiver.borrow().clone();
    1376       226538 :             match current_state {
    1377              :                 TimelineState::Loading => {
    1378            0 :                     receiver
    1379            0 :                         .changed()
    1380            0 :                         .await
    1381            0 :                         .expect("holding a reference to self");
    1382              :                 }
    1383              :                 TimelineState::Active { .. } => {
    1384       226536 :                     return Ok(());
    1385              :                 }
    1386              :                 TimelineState::Broken { .. } | TimelineState::Stopping => {
    1387              :                     // There's no chance the timeline can transition back into ::Active
    1388            2 :                     return Err(current_state);
    1389              :                 }
    1390              :             }
    1391              :         }
    1392       226538 :     }
    1393              : 
    1394            0 :     pub(crate) async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
    1395            0 :         let guard = self.layers.read().await;
    1396            0 :         let layer_map = guard.layer_map();
    1397            0 :         let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
    1398            0 :         if let Some(open_layer) = &layer_map.open_layer {
    1399            0 :             in_memory_layers.push(open_layer.info());
    1400            0 :         }
    1401            0 :         for frozen_layer in &layer_map.frozen_layers {
    1402            0 :             in_memory_layers.push(frozen_layer.info());
    1403            0 :         }
    1404              : 
    1405            0 :         let mut historic_layers = Vec::new();
    1406            0 :         for historic_layer in layer_map.iter_historic_layers() {
    1407            0 :             let historic_layer = guard.get_from_desc(&historic_layer);
    1408            0 :             historic_layers.push(historic_layer.info(reset));
    1409            0 :         }
    1410              : 
    1411            0 :         LayerMapInfo {
    1412            0 :             in_memory_layers,
    1413            0 :             historic_layers,
    1414            0 :         }
    1415            0 :     }
    1416              : 
    1417            0 :     #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
    1418              :     pub(crate) async fn download_layer(
    1419              :         &self,
    1420              :         layer_file_name: &str,
    1421              :     ) -> anyhow::Result<Option<bool>> {
    1422              :         let Some(layer) = self.find_layer(layer_file_name).await else {
    1423              :             return Ok(None);
    1424              :         };
    1425              : 
    1426              :         if self.remote_client.is_none() {
    1427              :             return Ok(Some(false));
    1428              :         }
    1429              : 
    1430              :         layer.download().await?;
    1431              : 
    1432              :         Ok(Some(true))
    1433              :     }
    1434              : 
    1435              :     /// Evict just one layer.
    1436              :     ///
    1437              :     /// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`.
    1438            0 :     pub(crate) async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
    1439            0 :         let _gate = self
    1440            0 :             .gate
    1441            0 :             .enter()
    1442            0 :             .map_err(|_| anyhow::anyhow!("Shutting down"))?;
    1443              : 
    1444            0 :         let Some(local_layer) = self.find_layer(layer_file_name).await else {
    1445            0 :             return Ok(None);
    1446              :         };
    1447              : 
    1448            0 :         match local_layer.evict_and_wait().await {
    1449            0 :             Ok(()) => Ok(Some(true)),
    1450            0 :             Err(EvictionError::NotFound) => Ok(Some(false)),
    1451            0 :             Err(EvictionError::Downloaded) => Ok(Some(false)),
    1452              :         }
    1453            0 :     }
    1454              : }
    1455              : 
    1456              : /// Number of times we will compute partition within a checkpoint distance.
    1457              : const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
    1458              : 
    1459              : // Private functions
    1460              : impl Timeline {
    1461            0 :     pub(crate) fn get_lazy_slru_download(&self) -> bool {
    1462            0 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    1463            0 :         tenant_conf
    1464            0 :             .lazy_slru_download
    1465            0 :             .unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
    1466            0 :     }
    1467              : 
    1468          796 :     fn get_checkpoint_distance(&self) -> u64 {
    1469          796 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    1470          796 :         tenant_conf
    1471          796 :             .checkpoint_distance
    1472          796 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
    1473          796 :     }
    1474              : 
    1475           48 :     fn get_checkpoint_timeout(&self) -> Duration {
    1476           48 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    1477           48 :         tenant_conf
    1478           48 :             .checkpoint_timeout
    1479           48 :             .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
    1480           48 :     }
    1481              : 
    1482          478 :     fn get_compaction_target_size(&self) -> u64 {
    1483          478 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    1484          478 :         tenant_conf
    1485          478 :             .compaction_target_size
    1486          478 :             .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
    1487          478 :     }
    1488              : 
    1489          408 :     fn get_compaction_threshold(&self) -> usize {
    1490          408 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    1491          408 :         tenant_conf
    1492          408 :             .compaction_threshold
    1493          408 :             .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
    1494          408 :     }
    1495              : 
    1496          408 :     fn get_image_creation_threshold(&self) -> usize {
    1497          408 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    1498          408 :         tenant_conf
    1499          408 :             .image_creation_threshold
    1500          408 :             .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
    1501          408 :     }
    1502              : 
    1503            0 :     fn get_eviction_policy(&self) -> EvictionPolicy {
    1504            0 :         let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
    1505            0 :         tenant_conf
    1506            0 :             .eviction_policy
    1507            0 :             .unwrap_or(self.conf.default_tenant_conf.eviction_policy)
    1508            0 :     }
    1509              : 
    1510          292 :     fn get_evictions_low_residence_duration_metric_threshold(
    1511          292 :         tenant_conf: &TenantConfOpt,
    1512          292 :         default_tenant_conf: &TenantConf,
    1513          292 :     ) -> Duration {
    1514          292 :         tenant_conf
    1515          292 :             .evictions_low_residence_duration_metric_threshold
    1516          292 :             .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
    1517          292 :     }
    1518              : 
    1519         1984 :     fn get_gc_feedback(&self) -> bool {
    1520         1984 :         let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf.clone();
    1521         1984 :         tenant_conf
    1522         1984 :             .gc_feedback
    1523         1984 :             .unwrap_or(self.conf.default_tenant_conf.gc_feedback)
    1524         1984 :     }
    1525              : 
    1526            0 :     pub(super) fn tenant_conf_updated(&self) {
    1527            0 :         // NB: Most tenant conf options are read by background loops, so,
    1528            0 :         // changes will automatically be picked up.
    1529            0 : 
    1530            0 :         // The threshold is embedded in the metric. So, we need to update it.
    1531            0 :         {
    1532            0 :             let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
    1533            0 :                 &self.tenant_conf.read().unwrap().tenant_conf,
    1534            0 :                 &self.conf.default_tenant_conf,
    1535            0 :             );
    1536            0 : 
    1537            0 :             let tenant_id_str = self.tenant_shard_id.tenant_id.to_string();
    1538            0 :             let shard_id_str = format!("{}", self.tenant_shard_id.shard_slug());
    1539            0 : 
    1540            0 :             let timeline_id_str = self.timeline_id.to_string();
    1541            0 :             self.metrics
    1542            0 :                 .evictions_with_low_residence_duration
    1543            0 :                 .write()
    1544            0 :                 .unwrap()
    1545            0 :                 .change_threshold(
    1546            0 :                     &tenant_id_str,
    1547            0 :                     &shard_id_str,
    1548            0 :                     &timeline_id_str,
    1549            0 :                     new_threshold,
    1550            0 :                 );
    1551            0 :         }
    1552            0 :     }
    1553              : 
    1554              :     /// Open a Timeline handle.
    1555              :     ///
    1556              :     /// Loads the metadata for the timeline into memory, but not the layer map.
    1557              :     #[allow(clippy::too_many_arguments)]
    1558          292 :     pub(super) fn new(
    1559          292 :         conf: &'static PageServerConf,
    1560          292 :         tenant_conf: Arc<RwLock<AttachedTenantConf>>,
    1561          292 :         metadata: &TimelineMetadata,
    1562          292 :         ancestor: Option<Arc<Timeline>>,
    1563          292 :         timeline_id: TimelineId,
    1564          292 :         tenant_shard_id: TenantShardId,
    1565          292 :         generation: Generation,
    1566          292 :         shard_identity: ShardIdentity,
    1567          292 :         walredo_mgr: Option<Arc<super::WalRedoManager>>,
    1568          292 :         resources: TimelineResources,
    1569          292 :         pg_version: u32,
    1570          292 :         state: TimelineState,
    1571          292 :         cancel: CancellationToken,
    1572          292 :     ) -> Arc<Self> {
    1573          292 :         let disk_consistent_lsn = metadata.disk_consistent_lsn();
    1574          292 :         let (state, _) = watch::channel(state);
    1575          292 : 
    1576          292 :         let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
    1577          292 :         let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
    1578          292 : 
    1579          292 :         let tenant_conf_guard = tenant_conf.read().unwrap();
    1580          292 : 
    1581          292 :         let evictions_low_residence_duration_metric_threshold =
    1582          292 :             Self::get_evictions_low_residence_duration_metric_threshold(
    1583          292 :                 &tenant_conf_guard.tenant_conf,
    1584          292 :                 &conf.default_tenant_conf,
    1585          292 :             );
    1586          292 :         drop(tenant_conf_guard);
    1587          292 : 
    1588          292 :         Arc::new_cyclic(|myself| {
    1589          292 :             let mut result = Timeline {
    1590          292 :                 conf,
    1591          292 :                 tenant_conf,
    1592          292 :                 myself: myself.clone(),
    1593          292 :                 timeline_id,
    1594          292 :                 tenant_shard_id,
    1595          292 :                 generation,
    1596          292 :                 shard_identity,
    1597          292 :                 pg_version,
    1598          292 :                 layers: Default::default(),
    1599          292 :                 wanted_image_layers: Mutex::new(None),
    1600          292 : 
    1601          292 :                 walredo_mgr,
    1602          292 :                 walreceiver: Mutex::new(None),
    1603          292 : 
    1604          292 :                 remote_client: resources.remote_client.map(Arc::new),
    1605          292 : 
    1606          292 :                 // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
    1607          292 :                 last_record_lsn: SeqWait::new(RecordLsn {
    1608          292 :                     last: disk_consistent_lsn,
    1609          292 :                     prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
    1610          292 :                 }),
    1611          292 :                 disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
    1612          292 : 
    1613          292 :                 last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
    1614          292 :                 last_freeze_ts: RwLock::new(Instant::now()),
    1615          292 : 
    1616          292 :                 loaded_at: (disk_consistent_lsn, SystemTime::now()),
    1617          292 : 
    1618          292 :                 ancestor_timeline: ancestor,
    1619          292 :                 ancestor_lsn: metadata.ancestor_lsn(),
    1620          292 : 
    1621          292 :                 metrics: TimelineMetrics::new(
    1622          292 :                     &tenant_shard_id,
    1623          292 :                     &timeline_id,
    1624          292 :                     crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
    1625          292 :                         "mtime",
    1626          292 :                         evictions_low_residence_duration_metric_threshold,
    1627          292 :                     ),
    1628          292 :                 ),
    1629          292 : 
    1630          292 :                 query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new(
    1631          292 :                     &tenant_shard_id,
    1632          292 :                     &timeline_id,
    1633          292 :                 ),
    1634          292 : 
    1635         2044 :                 directory_metrics: array::from_fn(|_| AtomicU64::new(0)),
    1636          292 : 
    1637          292 :                 flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
    1638          292 : 
    1639          292 :                 layer_flush_start_tx,
    1640          292 :                 layer_flush_done_tx,
    1641          292 : 
    1642          292 :                 write_lock: tokio::sync::Mutex::new(None),
    1643          292 : 
    1644          292 :                 gc_info: std::sync::RwLock::new(GcInfo {
    1645          292 :                     retain_lsns: Vec::new(),
    1646          292 :                     horizon_cutoff: Lsn(0),
    1647          292 :                     pitr_cutoff: Lsn(0),
    1648          292 :                 }),
    1649          292 : 
    1650          292 :                 latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
    1651          292 :                 initdb_lsn: metadata.initdb_lsn(),
    1652          292 : 
    1653          292 :                 current_logical_size: if disk_consistent_lsn.is_valid() {
    1654              :                     // we're creating timeline data with some layer files existing locally,
    1655              :                     // need to recalculate timeline's logical size based on data in the layers.
    1656          216 :                     LogicalSize::deferred_initial(disk_consistent_lsn)
    1657              :                 } else {
    1658              :                     // we're creating timeline data without any layers existing locally,
    1659              :                     // initial logical size is 0.
    1660           76 :                     LogicalSize::empty_initial()
    1661              :                 },
    1662          292 :                 partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
    1663          292 :                 repartition_threshold: 0,
    1664          292 : 
    1665          292 :                 last_received_wal: Mutex::new(None),
    1666          292 :                 rel_size_cache: RwLock::new(HashMap::new()),
    1667          292 : 
    1668          292 :                 download_all_remote_layers_task_info: RwLock::new(None),
    1669          292 : 
    1670          292 :                 state,
    1671          292 : 
    1672          292 :                 eviction_task_timeline_state: tokio::sync::Mutex::new(
    1673          292 :                     EvictionTaskTimelineState::default(),
    1674          292 :                 ),
    1675          292 :                 delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
    1676          292 : 
    1677          292 :                 cancel,
    1678          292 :                 gate: Gate::default(),
    1679          292 : 
    1680          292 :                 compaction_lock: tokio::sync::Mutex::default(),
    1681          292 :                 gc_lock: tokio::sync::Mutex::default(),
    1682          292 : 
    1683          292 :                 timeline_get_throttle: resources.timeline_get_throttle,
    1684          292 :             };
    1685          292 :             result.repartition_threshold =
    1686          292 :                 result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
    1687          292 :             result
    1688          292 :                 .metrics
    1689          292 :                 .last_record_gauge
    1690          292 :                 .set(disk_consistent_lsn.0 as i64);
    1691          292 :             result
    1692          292 :         })
    1693          292 :     }
    1694              : 
    1695          358 :     pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
    1696          358 :         let Ok(guard) = self.gate.enter() else {
    1697            0 :             info!("cannot start flush loop when the timeline gate has already been closed");
    1698            0 :             return;
    1699              :         };
    1700          358 :         let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
    1701          358 :         match *flush_loop_state {
    1702          288 :             FlushLoopState::NotStarted => (),
    1703              :             FlushLoopState::Running { .. } => {
    1704           70 :                 info!(
    1705           70 :                     "skipping attempt to start flush_loop twice {}/{}",
    1706           70 :                     self.tenant_shard_id, self.timeline_id
    1707           70 :                 );
    1708           70 :                 return;
    1709              :             }
    1710              :             FlushLoopState::Exited => {
    1711            0 :                 warn!(
    1712            0 :                     "ignoring attempt to restart exited flush_loop {}/{}",
    1713            0 :                     self.tenant_shard_id, self.timeline_id
    1714            0 :                 );
    1715            0 :                 return;
    1716              :             }
    1717              :         }
    1718              : 
    1719          288 :         let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
    1720          288 :         let self_clone = Arc::clone(self);
    1721          288 : 
    1722          288 :         debug!("spawning flush loop");
    1723          288 :         *flush_loop_state = FlushLoopState::Running {
    1724          288 :             #[cfg(test)]
    1725          288 :             expect_initdb_optimization: false,
    1726          288 :             #[cfg(test)]
    1727          288 :             initdb_optimization_count: 0,
    1728          288 :         };
    1729          288 :         task_mgr::spawn(
    1730          288 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    1731          288 :             task_mgr::TaskKind::LayerFlushTask,
    1732          288 :             Some(self.tenant_shard_id),
    1733          288 :             Some(self.timeline_id),
    1734          288 :             "layer flush task",
    1735              :             false,
    1736          288 :             async move {
    1737          288 :                 let _guard = guard;
    1738          288 :                 let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
    1739         2774 :                 self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
    1740            8 :                 let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
    1741            8 :                 assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
    1742            8 :                 *flush_loop_state  = FlushLoopState::Exited;
    1743            8 :                 Ok(())
    1744            8 :             }
    1745          288 :             .instrument(info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
    1746              :         );
    1747          358 :     }
    1748              : 
    1749              :     /// Creates and starts the wal receiver.
    1750              :     ///
    1751              :     /// This function is expected to be called at most once per Timeline's lifecycle
    1752              :     /// when the timeline is activated.
    1753            0 :     fn launch_wal_receiver(
    1754            0 :         self: &Arc<Self>,
    1755            0 :         ctx: &RequestContext,
    1756            0 :         broker_client: BrokerClientChannel,
    1757            0 :     ) {
    1758            0 :         info!(
    1759            0 :             "launching WAL receiver for timeline {} of tenant {}",
    1760            0 :             self.timeline_id, self.tenant_shard_id
    1761            0 :         );
    1762              : 
    1763            0 :         let tenant_conf_guard = self.tenant_conf.read().unwrap();
    1764            0 :         let wal_connect_timeout = tenant_conf_guard
    1765            0 :             .tenant_conf
    1766            0 :             .walreceiver_connect_timeout
    1767            0 :             .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
    1768            0 :         let lagging_wal_timeout = tenant_conf_guard
    1769            0 :             .tenant_conf
    1770            0 :             .lagging_wal_timeout
    1771            0 :             .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
    1772            0 :         let max_lsn_wal_lag = tenant_conf_guard
    1773            0 :             .tenant_conf
    1774            0 :             .max_lsn_wal_lag
    1775            0 :             .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
    1776            0 :         drop(tenant_conf_guard);
    1777            0 : 
    1778            0 :         let mut guard = self.walreceiver.lock().unwrap();
    1779            0 :         assert!(
    1780            0 :             guard.is_none(),
    1781            0 :             "multiple launches / re-launches of WAL receiver are not supported"
    1782              :         );
    1783            0 :         *guard = Some(WalReceiver::start(
    1784            0 :             Arc::clone(self),
    1785            0 :             WalReceiverConf {
    1786            0 :                 wal_connect_timeout,
    1787            0 :                 lagging_wal_timeout,
    1788            0 :                 max_lsn_wal_lag,
    1789            0 :                 auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
    1790            0 :                 availability_zone: self.conf.availability_zone.clone(),
    1791            0 :                 ingest_batch_size: self.conf.ingest_batch_size,
    1792            0 :             },
    1793            0 :             broker_client,
    1794            0 :             ctx,
    1795            0 :         ));
    1796            0 :     }
    1797              : 
    1798              :     /// Initialize with an empty layer map. Used when creating a new timeline.
    1799          286 :     pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
    1800          286 :         let mut layers = self.layers.try_write().expect(
    1801          286 :             "in the context where we call this function, no other task has access to the object",
    1802          286 :         );
    1803          286 :         layers.initialize_empty(Lsn(start_lsn.0));
    1804          286 :     }
    1805              : 
    1806              :     /// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
    1807              :     /// files.
    1808            6 :     pub(super) async fn load_layer_map(
    1809            6 :         &self,
    1810            6 :         disk_consistent_lsn: Lsn,
    1811            6 :         index_part: Option<IndexPart>,
    1812            6 :     ) -> anyhow::Result<()> {
    1813              :         use init::{Decision::*, Discovered, DismissedLayer};
    1814              :         use LayerFileName::*;
    1815              : 
    1816            6 :         let mut guard = self.layers.write().await;
    1817              : 
    1818            6 :         let timer = self.metrics.load_layer_map_histo.start_timer();
    1819            6 : 
    1820            6 :         // Scan timeline directory and create ImageFileName and DeltaFilename
    1821            6 :         // structs representing all files on disk
    1822            6 :         let timeline_path = self
    1823            6 :             .conf
    1824            6 :             .timeline_path(&self.tenant_shard_id, &self.timeline_id);
    1825            6 :         let conf = self.conf;
    1826            6 :         let span = tracing::Span::current();
    1827            6 : 
    1828            6 :         // Copy to move into the task we're about to spawn
    1829            6 :         let generation = self.generation;
    1830            6 :         let shard = self.get_shard_index();
    1831            6 :         let this = self.myself.upgrade().expect("&self method holds the arc");
    1832              : 
    1833            6 :         let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
    1834            6 :             move || {
    1835            6 :                 let _g = span.entered();
    1836            6 :                 let discovered = init::scan_timeline_dir(&timeline_path)?;
    1837            6 :                 let mut discovered_layers = Vec::with_capacity(discovered.len());
    1838            6 :                 let mut unrecognized_files = Vec::new();
    1839            6 : 
    1840            6 :                 let mut path = timeline_path;
    1841              : 
    1842           28 :                 for discovered in discovered {
    1843           22 :                     let (name, kind) = match discovered {
    1844           16 :                         Discovered::Layer(file_name, file_size) => {
    1845           16 :                             discovered_layers.push((file_name, file_size));
    1846           16 :                             continue;
    1847              :                         }
    1848              :                         Discovered::Metadata | Discovered::IgnoredBackup => {
    1849            6 :                             continue;
    1850              :                         }
    1851            0 :                         Discovered::Unknown(file_name) => {
    1852            0 :                             // we will later error if there are any
    1853            0 :                             unrecognized_files.push(file_name);
    1854            0 :                             continue;
    1855              :                         }
    1856            0 :                         Discovered::Ephemeral(name) => (name, "old ephemeral file"),
    1857            0 :                         Discovered::Temporary(name) => (name, "temporary timeline file"),
    1858            0 :                         Discovered::TemporaryDownload(name) => (name, "temporary download"),
    1859              :                     };
    1860            0 :                     path.push(Utf8Path::new(&name));
    1861            0 :                     init::cleanup(&path, kind)?;
    1862            0 :                     path.pop();
    1863              :                 }
    1864              : 
    1865            6 :                 if !unrecognized_files.is_empty() {
    1866              :                     // assume that if there are any there are many many.
    1867            0 :                     let n = unrecognized_files.len();
    1868            0 :                     let first = &unrecognized_files[..n.min(10)];
    1869            0 :                     anyhow::bail!(
    1870            0 :                         "unrecognized files in timeline dir (total {n}), first 10: {first:?}"
    1871            0 :                     );
    1872            6 :                 }
    1873            6 : 
    1874            6 :                 let decided = init::reconcile(
    1875            6 :                     discovered_layers,
    1876            6 :                     index_part.as_ref(),
    1877            6 :                     disk_consistent_lsn,
    1878            6 :                     generation,
    1879            6 :                     shard,
    1880            6 :                 );
    1881            6 : 
    1882            6 :                 let mut loaded_layers = Vec::new();
    1883            6 :                 let mut needs_cleanup = Vec::new();
    1884            6 :                 let mut total_physical_size = 0;
    1885              : 
    1886           22 :                 for (name, decision) in decided {
    1887           16 :                     let decision = match decision {
    1888            0 :                         Ok(UseRemote { local, remote }) => {
    1889            0 :                             // Remote is authoritative, but we may still choose to retain
    1890            0 :                             // the local file if the contents appear to match
    1891            0 :                             if local.file_size() == remote.file_size() {
    1892              :                                 // Use the local file, but take the remote metadata so that we pick up
    1893              :                                 // the correct generation.
    1894            0 :                                 UseLocal(remote)
    1895              :                             } else {
    1896            0 :                                 path.push(name.file_name());
    1897            0 :                                 init::cleanup_local_file_for_remote(&path, &local, &remote)?;
    1898            0 :                                 path.pop();
    1899            0 :                                 UseRemote { local, remote }
    1900              :                             }
    1901              :                         }
    1902           16 :                         Ok(decision) => decision,
    1903            0 :                         Err(DismissedLayer::Future { local }) => {
    1904            0 :                             if local.is_some() {
    1905            0 :                                 path.push(name.file_name());
    1906            0 :                                 init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
    1907            0 :                                 path.pop();
    1908            0 :                             }
    1909            0 :                             needs_cleanup.push(name);
    1910            0 :                             continue;
    1911              :                         }
    1912            0 :                         Err(DismissedLayer::LocalOnly(local)) => {
    1913            0 :                             path.push(name.file_name());
    1914            0 :                             init::cleanup_local_only_file(&path, &name, &local)?;
    1915            0 :                             path.pop();
    1916            0 :                             // this file never existed remotely, we will have to do rework
    1917            0 :                             continue;
    1918              :                         }
    1919              :                     };
    1920              : 
    1921           16 :                     match &name {
    1922           12 :                         Delta(d) => assert!(d.lsn_range.end <= disk_consistent_lsn + 1),
    1923            4 :                         Image(i) => assert!(i.lsn <= disk_consistent_lsn),
    1924              :                     }
    1925              : 
    1926           16 :                     tracing::debug!(layer=%name, ?decision, "applied");
    1927              : 
    1928           16 :                     let layer = match decision {
    1929           16 :                         UseLocal(m) => {
    1930           16 :                             total_physical_size += m.file_size();
    1931           16 :                             Layer::for_resident(conf, &this, name, m).drop_eviction_guard()
    1932              :                         }
    1933            0 :                         Evicted(remote) | UseRemote { remote, .. } => {
    1934            0 :                             Layer::for_evicted(conf, &this, name, remote)
    1935              :                         }
    1936              :                     };
    1937              : 
    1938           16 :                     loaded_layers.push(layer);
    1939              :                 }
    1940            6 :                 Ok((loaded_layers, needs_cleanup, total_physical_size))
    1941            6 :             }
    1942            6 :         })
    1943            6 :         .await
    1944            6 :         .map_err(anyhow::Error::new)
    1945            6 :         .and_then(|x| x)?;
    1946              : 
    1947            6 :         let num_layers = loaded_layers.len();
    1948            6 : 
    1949            6 :         guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
    1950              : 
    1951            6 :         if let Some(rtc) = self.remote_client.as_ref() {
    1952            6 :             rtc.schedule_layer_file_deletion(&needs_cleanup)?;
    1953            6 :             rtc.schedule_index_upload_for_file_changes()?;
    1954              :             // This barrier orders above DELETEs before any later operations.
    1955              :             // This is critical because code executing after the barrier might
    1956              :             // create again objects with the same key that we just scheduled for deletion.
    1957              :             // For example, if we just scheduled deletion of an image layer "from the future",
    1958              :             // later compaction might run again and re-create the same image layer.
    1959              :             // "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
    1960              :             // "same" here means same key range and LSN.
    1961              :             //
    1962              :             // Without a barrier between above DELETEs and the re-creation's PUTs,
    1963              :             // the upload queue may execute the PUT first, then the DELETE.
    1964              :             // In our example, we will end up with an IndexPart referencing a non-existent object.
    1965              :             //
    1966              :             // 1. a future image layer is created and uploaded
    1967              :             // 2. ps restart
    1968              :             // 3. the future layer from (1) is deleted during load layer map
    1969              :             // 4. image layer is re-created and uploaded
    1970              :             // 5. deletion queue would like to delete (1) but actually deletes (4)
    1971              :             // 6. delete by name works as expected, but it now deletes the wrong (later) version
    1972              :             //
    1973              :             // See https://github.com/neondatabase/neon/issues/5878
    1974              :             //
    1975              :             // NB: generation numbers naturally protect against this because they disambiguate
    1976              :             //     (1) and (4)
    1977            6 :             rtc.schedule_barrier()?;
    1978              :             // Tenant::create_timeline will wait for these uploads to happen before returning, or
    1979              :             // on retry.
    1980            0 :         }
    1981              : 
    1982            6 :         info!(
    1983            6 :             "loaded layer map with {} layers at {}, total physical size: {}",
    1984            6 :             num_layers, disk_consistent_lsn, total_physical_size
    1985            6 :         );
    1986              : 
    1987            6 :         timer.stop_and_record();
    1988            6 :         Ok(())
    1989            6 :     }
    1990              : 
    1991              :     /// Retrieve current logical size of the timeline.
    1992              :     ///
    1993              :     /// The size could be lagging behind the actual number, in case
    1994              :     /// the initial size calculation has not been run (gets triggered on the first size access).
    1995              :     ///
    1996              :     /// return size and boolean flag that shows if the size is exact
    1997            0 :     pub(crate) fn get_current_logical_size(
    1998            0 :         self: &Arc<Self>,
    1999            0 :         priority: GetLogicalSizePriority,
    2000            0 :         ctx: &RequestContext,
    2001            0 :     ) -> logical_size::CurrentLogicalSize {
    2002            0 :         if !self.tenant_shard_id.is_zero() {
    2003              :             // Logical size is only accurately maintained on shard zero: when called elsewhere, for example
    2004              :             // when HTTP API is serving a GET for timeline zero, return zero
    2005            0 :             return logical_size::CurrentLogicalSize::Approximate(logical_size::Approximate::zero());
    2006            0 :         }
    2007            0 : 
    2008            0 :         let current_size = self.current_logical_size.current_size();
    2009            0 :         debug!("Current size: {current_size:?}");
    2010              : 
    2011            0 :         match (current_size.accuracy(), priority) {
    2012            0 :             (logical_size::Accuracy::Exact, _) => (), // nothing to do
    2013            0 :             (logical_size::Accuracy::Approximate, GetLogicalSizePriority::Background) => {
    2014            0 :                 // background task will eventually deliver an exact value, we're in no rush
    2015            0 :             }
    2016              :             (logical_size::Accuracy::Approximate, GetLogicalSizePriority::User) => {
    2017              :                 // background task is not ready, but user is asking for it now;
    2018              :                 // => make the background task skip the line
    2019              :                 // (The alternative would be to calculate the size here, but,
    2020              :                 //  it can actually take a long time if the user has a lot of rels.
    2021              :                 //  And we'll inevitable need it again; So, let the background task do the work.)
    2022            0 :                 match self
    2023            0 :                     .current_logical_size
    2024            0 :                     .cancel_wait_for_background_loop_concurrency_limit_semaphore
    2025            0 :                     .get()
    2026              :                 {
    2027            0 :                     Some(cancel) => cancel.cancel(),
    2028              :                     None => {
    2029            0 :                         let state = self.current_state();
    2030            0 :                         if matches!(
    2031            0 :                             state,
    2032              :                             TimelineState::Broken { .. } | TimelineState::Stopping
    2033            0 :                         ) {
    2034            0 : 
    2035            0 :                             // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
    2036            0 :                             // Don't make noise.
    2037            0 :                         } else {
    2038            0 :                             warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work");
    2039              :                         }
    2040              :                     }
    2041              :                 };
    2042              :             }
    2043              :         }
    2044              : 
    2045            0 :         if let CurrentLogicalSize::Approximate(_) = &current_size {
    2046            0 :             if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler {
    2047            0 :                 let first = self
    2048            0 :                     .current_logical_size
    2049            0 :                     .did_return_approximate_to_walreceiver
    2050            0 :                     .compare_exchange(
    2051            0 :                         false,
    2052            0 :                         true,
    2053            0 :                         AtomicOrdering::Relaxed,
    2054            0 :                         AtomicOrdering::Relaxed,
    2055            0 :                     )
    2056            0 :                     .is_ok();
    2057            0 :                 if first {
    2058            0 :                     crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc();
    2059            0 :                 }
    2060            0 :             }
    2061            0 :         }
    2062              : 
    2063            0 :         current_size
    2064            0 :     }
    2065              : 
    2066            0 :     fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &RequestContext) {
    2067            0 :         let Some(initial_part_end) = self.current_logical_size.initial_part_end else {
    2068              :             // nothing to do for freshly created timelines;
    2069            0 :             assert_eq!(
    2070            0 :                 self.current_logical_size.current_size().accuracy(),
    2071            0 :                 logical_size::Accuracy::Exact,
    2072            0 :             );
    2073            0 :             self.current_logical_size.initialized.add_permits(1);
    2074            0 :             return;
    2075              :         };
    2076              : 
    2077            0 :         let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new();
    2078            0 :         let token = cancel_wait_for_background_loop_concurrency_limit_semaphore.clone();
    2079            0 :         self.current_logical_size
    2080            0 :             .cancel_wait_for_background_loop_concurrency_limit_semaphore.set(token)
    2081            0 :             .expect("initial logical size calculation task must be spawned exactly once per Timeline object");
    2082            0 : 
    2083            0 :         let self_clone = Arc::clone(self);
    2084            0 :         let background_ctx = ctx.detached_child(
    2085            0 :             TaskKind::InitialLogicalSizeCalculation,
    2086            0 :             DownloadBehavior::Download,
    2087            0 :         );
    2088            0 :         task_mgr::spawn(
    2089            0 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    2090            0 :             task_mgr::TaskKind::InitialLogicalSizeCalculation,
    2091            0 :             Some(self.tenant_shard_id),
    2092            0 :             Some(self.timeline_id),
    2093            0 :             "initial size calculation",
    2094              :             false,
    2095              :             // NB: don't log errors here, task_mgr will do that.
    2096            0 :             async move {
    2097            0 :                 let cancel = task_mgr::shutdown_token();
    2098            0 :                 self_clone
    2099            0 :                     .initial_logical_size_calculation_task(
    2100            0 :                         initial_part_end,
    2101            0 :                         cancel_wait_for_background_loop_concurrency_limit_semaphore,
    2102            0 :                         cancel,
    2103            0 :                         background_ctx,
    2104            0 :                     )
    2105            0 :                     .await;
    2106            0 :                 Ok(())
    2107            0 :             }
    2108            0 :             .instrument(info_span!(parent: None, "initial_size_calculation", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id)),
    2109              :         );
    2110            0 :     }
    2111              : 
    2112            0 :     async fn initial_logical_size_calculation_task(
    2113            0 :         self: Arc<Self>,
    2114            0 :         initial_part_end: Lsn,
    2115            0 :         skip_concurrency_limiter: CancellationToken,
    2116            0 :         cancel: CancellationToken,
    2117            0 :         background_ctx: RequestContext,
    2118            0 :     ) {
    2119            0 :         scopeguard::defer! {
    2120            0 :             // Irrespective of the outcome of this operation, we should unblock anyone waiting for it.
    2121            0 :             self.current_logical_size.initialized.add_permits(1);
    2122            0 :         }
    2123              : 
    2124              :         enum BackgroundCalculationError {
    2125              :             Cancelled,
    2126              :             Other(anyhow::Error),
    2127              :         }
    2128              : 
    2129            0 :         let try_once = |attempt: usize| {
    2130            0 :             let background_ctx = &background_ctx;
    2131            0 :             let self_ref = &self;
    2132            0 :             let skip_concurrency_limiter = &skip_concurrency_limiter;
    2133            0 :             async move {
    2134            0 :                 let cancel = task_mgr::shutdown_token();
    2135            0 :                 let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
    2136            0 :                     BackgroundLoopKind::InitialLogicalSizeCalculation,
    2137            0 :                     background_ctx,
    2138            0 :                 );
    2139              : 
    2140              :                 use crate::metrics::initial_logical_size::StartCircumstances;
    2141            0 :                 let (_maybe_permit, circumstances) = tokio::select! {
    2142            0 :                     permit = wait_for_permit => {
    2143              :                         (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit)
    2144              :                     }
    2145              :                     _ = self_ref.cancel.cancelled() => {
    2146              :                         return Err(BackgroundCalculationError::Cancelled);
    2147              :                     }
    2148              :                     _ = cancel.cancelled() => {
    2149              :                         return Err(BackgroundCalculationError::Cancelled);
    2150              :                     },
    2151              :                     () = skip_concurrency_limiter.cancelled() => {
    2152              :                         // Some action that is part of a end user interaction requested logical size
    2153              :                         // => break out of the rate limit
    2154              :                         // TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime;
    2155              :                         // but then again what happens if they cancel; also, we should just be using
    2156              :                         // one runtime across the entire process, so, let's leave this for now.
    2157              :                         (None, StartCircumstances::SkippedConcurrencyLimiter)
    2158              :                     }
    2159              :                 };
    2160              : 
    2161            0 :                 let metrics_guard = if attempt == 1 {
    2162            0 :                     crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances)
    2163              :                 } else {
    2164            0 :                     crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
    2165              :                 };
    2166              : 
    2167            0 :                 match self_ref
    2168            0 :                     .logical_size_calculation_task(
    2169            0 :                         initial_part_end,
    2170            0 :                         LogicalSizeCalculationCause::Initial,
    2171            0 :                         background_ctx,
    2172            0 :                     )
    2173            0 :                     .await
    2174              :                 {
    2175            0 :                     Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
    2176              :                     Err(CalculateLogicalSizeError::Cancelled) => {
    2177            0 :                         Err(BackgroundCalculationError::Cancelled)
    2178              :                     }
    2179            0 :                     Err(CalculateLogicalSizeError::Other(err)) => {
    2180            0 :                         if let Some(PageReconstructError::AncestorStopping(_)) =
    2181            0 :                             err.root_cause().downcast_ref()
    2182              :                         {
    2183            0 :                             Err(BackgroundCalculationError::Cancelled)
    2184              :                         } else {
    2185            0 :                             Err(BackgroundCalculationError::Other(err))
    2186              :                         }
    2187              :                     }
    2188              :                 }
    2189            0 :             }
    2190            0 :         };
    2191              : 
    2192            0 :         let retrying = async {
    2193            0 :             let mut attempt = 0;
    2194            0 :             loop {
    2195            0 :                 attempt += 1;
    2196            0 : 
    2197            0 :                 match try_once(attempt).await {
    2198            0 :                     Ok(res) => return ControlFlow::Continue(res),
    2199            0 :                     Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
    2200            0 :                     Err(BackgroundCalculationError::Other(e)) => {
    2201            0 :                         warn!(attempt, "initial size calculation failed: {e:?}");
    2202              :                         // exponential back-off doesn't make sense at these long intervals;
    2203              :                         // use fixed retry interval with generous jitter instead
    2204            0 :                         let sleep_duration = Duration::from_secs(
    2205            0 :                             u64::try_from(
    2206            0 :                                 // 1hour base
    2207            0 :                                 (60_i64 * 60_i64)
    2208            0 :                                     // 10min jitter
    2209            0 :                                     + rand::thread_rng().gen_range(-10 * 60..10 * 60),
    2210            0 :                             )
    2211            0 :                             .expect("10min < 1hour"),
    2212            0 :                         );
    2213            0 :                         tokio::time::sleep(sleep_duration).await;
    2214              :                     }
    2215              :                 }
    2216              :             }
    2217            0 :         };
    2218              : 
    2219            0 :         let (calculated_size, metrics_guard) = tokio::select! {
    2220            0 :             res = retrying  => {
    2221              :                 match res {
    2222              :                     ControlFlow::Continue(calculated_size) => calculated_size,
    2223              :                     ControlFlow::Break(()) => return,
    2224              :                 }
    2225              :             }
    2226              :             _ = cancel.cancelled() => {
    2227              :                 return;
    2228              :             }
    2229              :         };
    2230              : 
    2231              :         // we cannot query current_logical_size.current_size() to know the current
    2232              :         // *negative* value, only truncated to u64.
    2233            0 :         let added = self
    2234            0 :             .current_logical_size
    2235            0 :             .size_added_after_initial
    2236            0 :             .load(AtomicOrdering::Relaxed);
    2237            0 : 
    2238            0 :         let sum = calculated_size.saturating_add_signed(added);
    2239            0 : 
    2240            0 :         // set the gauge value before it can be set in `update_current_logical_size`.
    2241            0 :         self.metrics.current_logical_size_gauge.set(sum);
    2242            0 : 
    2243            0 :         self.current_logical_size
    2244            0 :             .initial_logical_size
    2245            0 :             .set((calculated_size, metrics_guard.calculation_result_saved()))
    2246            0 :             .ok()
    2247            0 :             .expect("only this task sets it");
    2248            0 :     }
    2249              : 
    2250            0 :     pub(crate) fn spawn_ondemand_logical_size_calculation(
    2251            0 :         self: &Arc<Self>,
    2252            0 :         lsn: Lsn,
    2253            0 :         cause: LogicalSizeCalculationCause,
    2254            0 :         ctx: RequestContext,
    2255            0 :     ) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
    2256            0 :         let (sender, receiver) = oneshot::channel();
    2257            0 :         let self_clone = Arc::clone(self);
    2258            0 :         // XXX if our caller loses interest, i.e., ctx is cancelled,
    2259            0 :         // we should stop the size calculation work and return an error.
    2260            0 :         // That would require restructuring this function's API to
    2261            0 :         // return the result directly, instead of a Receiver for the result.
    2262            0 :         let ctx = ctx.detached_child(
    2263            0 :             TaskKind::OndemandLogicalSizeCalculation,
    2264            0 :             DownloadBehavior::Download,
    2265            0 :         );
    2266            0 :         task_mgr::spawn(
    2267            0 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    2268            0 :             task_mgr::TaskKind::OndemandLogicalSizeCalculation,
    2269            0 :             Some(self.tenant_shard_id),
    2270            0 :             Some(self.timeline_id),
    2271            0 :             "ondemand logical size calculation",
    2272            0 :             false,
    2273            0 :             async move {
    2274            0 :                 let res = self_clone
    2275            0 :                     .logical_size_calculation_task(lsn, cause, &ctx)
    2276            0 :                     .await;
    2277            0 :                 let _ = sender.send(res).ok();
    2278            0 :                 Ok(()) // Receiver is responsible for handling errors
    2279            0 :             }
    2280            0 :             .in_current_span(),
    2281            0 :         );
    2282            0 :         receiver
    2283            0 :     }
    2284              : 
    2285              :     /// # Cancel-Safety
    2286              :     ///
    2287              :     /// This method is cancellation-safe.
    2288            0 :     #[instrument(skip_all)]
    2289              :     async fn logical_size_calculation_task(
    2290              :         self: &Arc<Self>,
    2291              :         lsn: Lsn,
    2292              :         cause: LogicalSizeCalculationCause,
    2293              :         ctx: &RequestContext,
    2294              :     ) -> Result<u64, CalculateLogicalSizeError> {
    2295              :         crate::span::debug_assert_current_span_has_tenant_and_timeline_id();
    2296              :         // We should never be calculating logical sizes on shard !=0, because these shards do not have
    2297              :         // accurate relation sizes, and they do not emit consumption metrics.
    2298              :         debug_assert!(self.tenant_shard_id.is_zero());
    2299              : 
    2300              :         let _guard = self.gate.enter();
    2301              : 
    2302              :         let self_calculation = Arc::clone(self);
    2303              : 
    2304            0 :         let mut calculation = pin!(async {
    2305            0 :             let ctx = ctx.attached_child();
    2306            0 :             self_calculation
    2307            0 :                 .calculate_logical_size(lsn, cause, &ctx)
    2308            0 :                 .await
    2309            0 :         });
    2310              : 
    2311            0 :         tokio::select! {
    2312            0 :             res = &mut calculation => { res }
    2313              :             _ = self.cancel.cancelled() => {
    2314            0 :                 debug!("cancelling logical size calculation for timeline shutdown");
    2315              :                 calculation.await
    2316              :             }
    2317              :             _ = task_mgr::shutdown_watcher() => {
    2318            0 :                 debug!("cancelling logical size calculation for task shutdown");
    2319              :                 calculation.await
    2320              :             }
    2321              :         }
    2322              :     }
    2323              : 
    2324              :     /// Calculate the logical size of the database at the latest LSN.
    2325              :     ///
    2326              :     /// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
    2327              :     /// especially if we need to download remote layers.
    2328              :     ///
    2329              :     /// # Cancel-Safety
    2330              :     ///
    2331              :     /// This method is cancellation-safe.
    2332            0 :     async fn calculate_logical_size(
    2333            0 :         &self,
    2334            0 :         up_to_lsn: Lsn,
    2335            0 :         cause: LogicalSizeCalculationCause,
    2336            0 :         ctx: &RequestContext,
    2337            0 :     ) -> Result<u64, CalculateLogicalSizeError> {
    2338            0 :         info!(
    2339            0 :             "Calculating logical size for timeline {} at {}",
    2340            0 :             self.timeline_id, up_to_lsn
    2341            0 :         );
    2342              :         // These failpoints are used by python tests to ensure that we don't delete
    2343              :         // the timeline while the logical size computation is ongoing.
    2344              :         // The first failpoint is used to make this function pause.
    2345              :         // Then the python test initiates timeline delete operation in a thread.
    2346              :         // It waits for a few seconds, then arms the second failpoint and disables
    2347              :         // the first failpoint. The second failpoint prints an error if the timeline
    2348              :         // delete code has deleted the on-disk state while we're still running here.
    2349              :         // It shouldn't do that. If it does it anyway, the error will be caught
    2350              :         // by the test suite, highlighting the problem.
    2351            0 :         fail::fail_point!("timeline-calculate-logical-size-pause");
    2352            0 :         fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
    2353            0 :             if !self
    2354            0 :                 .conf
    2355            0 :                 .metadata_path(&self.tenant_shard_id, &self.timeline_id)
    2356            0 :                 .exists()
    2357              :             {
    2358            0 :                 error!("timeline-calculate-logical-size-pre metadata file does not exist")
    2359            0 :             }
    2360              :             // need to return something
    2361            0 :             Ok(0)
    2362            0 :         });
    2363              :         // See if we've already done the work for initial size calculation.
    2364              :         // This is a short-cut for timelines that are mostly unused.
    2365            0 :         if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
    2366            0 :             return Ok(size);
    2367            0 :         }
    2368            0 :         let storage_time_metrics = match cause {
    2369              :             LogicalSizeCalculationCause::Initial
    2370              :             | LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
    2371            0 :             | LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
    2372              :             LogicalSizeCalculationCause::EvictionTaskImitation => {
    2373            0 :                 &self.metrics.imitate_logical_size_histo
    2374              :             }
    2375              :         };
    2376            0 :         let timer = storage_time_metrics.start_timer();
    2377            0 :         let logical_size = self
    2378            0 :             .get_current_logical_size_non_incremental(up_to_lsn, ctx)
    2379            0 :             .await?;
    2380            0 :         debug!("calculated logical size: {logical_size}");
    2381            0 :         timer.stop_and_record();
    2382            0 :         Ok(logical_size)
    2383            0 :     }
    2384              : 
    2385              :     /// Update current logical size, adding `delta' to the old value.
    2386       270570 :     fn update_current_logical_size(&self, delta: i64) {
    2387       270570 :         let logical_size = &self.current_logical_size;
    2388       270570 :         logical_size.increment_size(delta);
    2389       270570 : 
    2390       270570 :         // Also set the value in the prometheus gauge. Note that
    2391       270570 :         // there is a race condition here: if this is is called by two
    2392       270570 :         // threads concurrently, the prometheus gauge might be set to
    2393       270570 :         // one value while current_logical_size is set to the
    2394       270570 :         // other.
    2395       270570 :         match logical_size.current_size() {
    2396       270570 :             CurrentLogicalSize::Exact(ref new_current_size) => self
    2397       270570 :                 .metrics
    2398       270570 :                 .current_logical_size_gauge
    2399       270570 :                 .set(new_current_size.into()),
    2400            0 :             CurrentLogicalSize::Approximate(_) => {
    2401            0 :                 // don't update the gauge yet, this allows us not to update the gauge back and
    2402            0 :                 // forth between the initial size calculation task.
    2403            0 :             }
    2404              :         }
    2405       270570 :     }
    2406              : 
    2407         2400 :     pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: u64) {
    2408         2400 :         self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed);
    2409         2400 :         let aux_metric =
    2410         2400 :             self.directory_metrics[DirectoryKind::AuxFiles.offset()].load(AtomicOrdering::Relaxed);
    2411         2400 : 
    2412         2400 :         let sum_of_entries = self
    2413         2400 :             .directory_metrics
    2414         2400 :             .iter()
    2415        16800 :             .map(|v| v.load(AtomicOrdering::Relaxed))
    2416         2400 :             .sum();
    2417         2400 :         // Set a high general threshold and a lower threshold for the auxiliary files,
    2418         2400 :         // as we can have large numbers of relations in the db directory.
    2419         2400 :         const SUM_THRESHOLD: u64 = 5000;
    2420         2400 :         const AUX_THRESHOLD: u64 = 1000;
    2421         2400 :         if sum_of_entries >= SUM_THRESHOLD || aux_metric >= AUX_THRESHOLD {
    2422            0 :             self.metrics
    2423            0 :                 .directory_entries_count_gauge
    2424            0 :                 .set(sum_of_entries);
    2425         2400 :         } else if let Some(metric) = Lazy::get(&self.metrics.directory_entries_count_gauge) {
    2426            0 :             metric.set(sum_of_entries);
    2427         2400 :         }
    2428         2400 :     }
    2429              : 
    2430            0 :     async fn find_layer(&self, layer_file_name: &str) -> Option<Layer> {
    2431            0 :         let guard = self.layers.read().await;
    2432            0 :         for historic_layer in guard.layer_map().iter_historic_layers() {
    2433            0 :             let historic_layer_name = historic_layer.filename().file_name();
    2434            0 :             if layer_file_name == historic_layer_name {
    2435            0 :                 return Some(guard.get_from_desc(&historic_layer));
    2436            0 :             }
    2437              :         }
    2438              : 
    2439            0 :         None
    2440            0 :     }
    2441              : 
    2442              :     /// The timeline heatmap is a hint to secondary locations from the primary location,
    2443              :     /// indicating which layers are currently on-disk on the primary.
    2444              :     ///
    2445              :     /// None is returned if the Timeline is in a state where uploading a heatmap
    2446              :     /// doesn't make sense, such as shutting down or initializing.  The caller
    2447              :     /// should treat this as a cue to simply skip doing any heatmap uploading
    2448              :     /// for this timeline.
    2449            0 :     pub(crate) async fn generate_heatmap(&self) -> Option<HeatMapTimeline> {
    2450              :         // no point in heatmaps without remote client
    2451            0 :         let _remote_client = self.remote_client.as_ref()?;
    2452              : 
    2453            0 :         if !self.is_active() {
    2454            0 :             return None;
    2455            0 :         }
    2456              : 
    2457            0 :         let guard = self.layers.read().await;
    2458              : 
    2459            0 :         let resident = guard.resident_layers().map(|layer| {
    2460            0 :             let last_activity_ts = layer.access_stats().latest_activity_or_now();
    2461            0 : 
    2462            0 :             HeatMapLayer::new(
    2463            0 :                 layer.layer_desc().filename(),
    2464            0 :                 layer.metadata().into(),
    2465            0 :                 last_activity_ts,
    2466            0 :             )
    2467            0 :         });
    2468              : 
    2469            0 :         let layers = resident.collect().await;
    2470              : 
    2471            0 :         Some(HeatMapTimeline::new(self.timeline_id, layers))
    2472            0 :     }
    2473              : }
    2474              : 
    2475              : type TraversalId = String;
    2476              : 
    2477              : trait TraversalLayerExt {
    2478              :     fn traversal_id(&self) -> TraversalId;
    2479              : }
    2480              : 
    2481              : impl TraversalLayerExt for Layer {
    2482          104 :     fn traversal_id(&self) -> TraversalId {
    2483          104 :         self.local_path().to_string()
    2484          104 :     }
    2485              : }
    2486              : 
    2487              : impl TraversalLayerExt for Arc<InMemoryLayer> {
    2488            4 :     fn traversal_id(&self) -> TraversalId {
    2489            4 :         format!("timeline {} in-memory {self}", self.get_timeline_id())
    2490            4 :     }
    2491              : }
    2492              : 
    2493              : impl Timeline {
    2494              :     ///
    2495              :     /// Get a handle to a Layer for reading.
    2496              :     ///
    2497              :     /// The returned Layer might be from an ancestor timeline, if the
    2498              :     /// segment hasn't been updated on this timeline yet.
    2499              :     ///
    2500              :     /// This function takes the current timeline's locked LayerMap as an argument,
    2501              :     /// so callers can avoid potential race conditions.
    2502              :     ///
    2503              :     /// # Cancel-Safety
    2504              :     ///
    2505              :     /// This method is cancellation-safe.
    2506       502486 :     async fn get_reconstruct_data(
    2507       502486 :         &self,
    2508       502486 :         key: Key,
    2509       502486 :         request_lsn: Lsn,
    2510       502486 :         reconstruct_state: &mut ValueReconstructState,
    2511       502486 :         ctx: &RequestContext,
    2512       502486 :     ) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
    2513       502486 :         // Start from the current timeline.
    2514       502486 :         let mut timeline_owned;
    2515       502486 :         let mut timeline = self;
    2516       502486 : 
    2517       502486 :         let mut read_count = scopeguard::guard(0, |cnt| {
    2518       502486 :             crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
    2519       502486 :         });
    2520       502486 : 
    2521       502486 :         // For debugging purposes, collect the path of layers that we traversed
    2522       502486 :         // through. It's included in the error message if we fail to find the key.
    2523       502486 :         let mut traversal_path = Vec::<TraversalPathItem>::new();
    2524              : 
    2525       502486 :         let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
    2526            0 :             *cached_lsn
    2527              :         } else {
    2528       502486 :             Lsn(0)
    2529              :         };
    2530              : 
    2531              :         // 'prev_lsn' tracks the last LSN that we were at in our search. It's used
    2532              :         // to check that each iteration make some progress, to break infinite
    2533              :         // looping if something goes wrong.
    2534       502486 :         let mut prev_lsn = Lsn(u64::MAX);
    2535       502486 : 
    2536       502486 :         let mut result = ValueReconstructResult::Continue;
    2537       502486 :         let mut cont_lsn = Lsn(request_lsn.0 + 1);
    2538              : 
    2539      1356187 :         'outer: loop {
    2540      1356187 :             if self.cancel.is_cancelled() {
    2541            0 :                 return Err(PageReconstructError::Cancelled);
    2542      1356187 :             }
    2543      1356187 : 
    2544      1356187 :             // The function should have updated 'state'
    2545      1356187 :             //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
    2546      1356187 :             match result {
    2547       502378 :                 ValueReconstructResult::Complete => return Ok(traversal_path),
    2548              :                 ValueReconstructResult::Continue => {
    2549              :                     // If we reached an earlier cached page image, we're done.
    2550       853803 :                     if cont_lsn == cached_lsn + 1 {
    2551            0 :                         MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
    2552            0 :                         return Ok(traversal_path);
    2553       853803 :                     }
    2554       853803 :                     if prev_lsn <= cont_lsn {
    2555              :                         // Didn't make any progress in last iteration. Error out to avoid
    2556              :                         // getting stuck in the loop.
    2557          100 :                         return Err(layer_traversal_error(format!(
    2558          100 :                             "could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
    2559          100 :                             key,
    2560          100 :                             Lsn(cont_lsn.0 - 1),
    2561          100 :                             request_lsn,
    2562          100 :                             timeline.ancestor_lsn
    2563          100 :                         ), traversal_path));
    2564       853703 :                     }
    2565       853703 :                     prev_lsn = cont_lsn;
    2566              :                 }
    2567              :                 ValueReconstructResult::Missing => {
    2568              :                     return Err(layer_traversal_error(
    2569            6 :                         if cfg!(test) {
    2570            6 :                             format!(
    2571            6 :                                 "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}\n{}",
    2572            6 :                                 key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
    2573            6 :                             )
    2574              :                         } else {
    2575            0 :                             format!(
    2576            0 :                                 "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}",
    2577            0 :                                 key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn
    2578            0 :                             )
    2579              :                         },
    2580            6 :                         traversal_path,
    2581              :                     ));
    2582              :                 }
    2583              :             }
    2584              : 
    2585              :             // Recurse into ancestor if needed
    2586       853703 :             if is_inherited_key(key) && Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
    2587            0 :                 trace!(
    2588            0 :                     "going into ancestor {}, cont_lsn is {}",
    2589            0 :                     timeline.ancestor_lsn,
    2590            0 :                     cont_lsn
    2591            0 :                 );
    2592              : 
    2593       226538 :                 timeline_owned = timeline.get_ready_ancestor_timeline(ctx).await?;
    2594       226536 :                 timeline = &*timeline_owned;
    2595       226536 :                 prev_lsn = Lsn(u64::MAX);
    2596       226536 :                 continue 'outer;
    2597       627165 :             }
    2598              : 
    2599       627165 :             let guard = timeline.layers.read().await;
    2600       627165 :             let layers = guard.layer_map();
    2601              : 
    2602              :             // Check the open and frozen in-memory layers first, in order from newest
    2603              :             // to oldest.
    2604       627165 :             if let Some(open_layer) = &layers.open_layer {
    2605       556028 :                 let start_lsn = open_layer.get_lsn_range().start;
    2606       556028 :                 if cont_lsn > start_lsn {
    2607              :                     //info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
    2608              :                     // Get all the data needed to reconstruct the page version from this layer.
    2609              :                     // But if we have an older cached page image, no need to go past that.
    2610       502232 :                     let lsn_floor = max(cached_lsn + 1, start_lsn);
    2611       502232 :                     result = match open_layer
    2612       502232 :                         .get_value_reconstruct_data(
    2613       502232 :                             key,
    2614       502232 :                             lsn_floor..cont_lsn,
    2615       502232 :                             reconstruct_state,
    2616       502232 :                             ctx,
    2617       502232 :                         )
    2618         8395 :                         .await
    2619              :                     {
    2620       502232 :                         Ok(result) => result,
    2621            0 :                         Err(e) => return Err(PageReconstructError::from(e)),
    2622              :                     };
    2623       502232 :                     cont_lsn = lsn_floor;
    2624       502232 :                     // metrics: open_layer does not count as fs access, so we are not updating `read_count`
    2625       502232 :                     traversal_path.push((
    2626       502232 :                         result,
    2627       502232 :                         cont_lsn,
    2628       502232 :                         Box::new({
    2629       502232 :                             let open_layer = Arc::clone(open_layer);
    2630       502232 :                             move || open_layer.traversal_id()
    2631       502232 :                         }),
    2632       502232 :                     ));
    2633       502232 :                     continue 'outer;
    2634        53796 :                 }
    2635        71137 :             }
    2636       124933 :             for frozen_layer in layers.frozen_layers.iter().rev() {
    2637          980 :                 let start_lsn = frozen_layer.get_lsn_range().start;
    2638          980 :                 if cont_lsn > start_lsn {
    2639              :                     //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
    2640          980 :                     let lsn_floor = max(cached_lsn + 1, start_lsn);
    2641          980 :                     result = match frozen_layer
    2642          980 :                         .get_value_reconstruct_data(
    2643          980 :                             key,
    2644          980 :                             lsn_floor..cont_lsn,
    2645          980 :                             reconstruct_state,
    2646          980 :                             ctx,
    2647          980 :                         )
    2648            0 :                         .await
    2649              :                     {
    2650          980 :                         Ok(result) => result,
    2651            0 :                         Err(e) => return Err(PageReconstructError::from(e)),
    2652              :                     };
    2653          980 :                     cont_lsn = lsn_floor;
    2654          980 :                     // metrics: open_layer does not count as fs access, so we are not updating `read_count`
    2655          980 :                     traversal_path.push((
    2656          980 :                         result,
    2657          980 :                         cont_lsn,
    2658          980 :                         Box::new({
    2659          980 :                             let frozen_layer = Arc::clone(frozen_layer);
    2660          980 :                             move || frozen_layer.traversal_id()
    2661          980 :                         }),
    2662          980 :                     ));
    2663          980 :                     continue 'outer;
    2664            0 :                 }
    2665              :             }
    2666              : 
    2667       123953 :             if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
    2668       123851 :                 let layer = guard.get_from_desc(&layer);
    2669       123851 :                 // Get all the data needed to reconstruct the page version from this layer.
    2670       123851 :                 // But if we have an older cached page image, no need to go past that.
    2671       123851 :                 let lsn_floor = max(cached_lsn + 1, lsn_floor);
    2672       123851 :                 result = match layer
    2673       123851 :                     .get_value_reconstruct_data(key, lsn_floor..cont_lsn, reconstruct_state, ctx)
    2674        23596 :                     .await
    2675              :                 {
    2676       123851 :                     Ok(result) => result,
    2677            0 :                     Err(e) => return Err(PageReconstructError::from(e)),
    2678              :                 };
    2679       123851 :                 cont_lsn = lsn_floor;
    2680       123851 :                 *read_count += 1;
    2681       123851 :                 traversal_path.push((
    2682       123851 :                     result,
    2683       123851 :                     cont_lsn,
    2684       123851 :                     Box::new({
    2685       123851 :                         let layer = layer.to_owned();
    2686       123851 :                         move || layer.traversal_id()
    2687       123851 :                     }),
    2688       123851 :                 ));
    2689       123851 :                 continue 'outer;
    2690          102 :             } else if timeline.ancestor_timeline.is_some() {
    2691              :                 // Nothing on this timeline. Traverse to parent
    2692          100 :                 result = ValueReconstructResult::Continue;
    2693          100 :                 cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
    2694          100 :                 continue 'outer;
    2695              :             } else {
    2696              :                 // Nothing found
    2697            2 :                 result = ValueReconstructResult::Missing;
    2698            2 :                 continue 'outer;
    2699              :             }
    2700              :         }
    2701       502486 :     }
    2702              : 
    2703              :     /// Get the data needed to reconstruct all keys in the provided keyspace
    2704              :     ///
    2705              :     /// The algorithm is as follows:
    2706              :     /// 1.   While some keys are still not done and there's a timeline to visit:
    2707              :     /// 2.   Visit the timeline (see [`Timeline::get_vectored_reconstruct_data_timeline`]:
    2708              :     /// 2.1: Build the fringe for the current keyspace
    2709              :     /// 2.2  Visit the newest layer from the fringe to collect all values for the range it
    2710              :     ///      intersects
    2711              :     /// 2.3. Pop the timeline from the fringe
    2712              :     /// 2.4. If the fringe is empty, go back to 1
    2713           10 :     async fn get_vectored_reconstruct_data(
    2714           10 :         &self,
    2715           10 :         mut keyspace: KeySpace,
    2716           10 :         request_lsn: Lsn,
    2717           10 :         reconstruct_state: &mut ValuesReconstructState,
    2718           10 :         ctx: &RequestContext,
    2719           10 :     ) -> Result<(), GetVectoredError> {
    2720           10 :         let mut timeline_owned: Arc<Timeline>;
    2721           10 :         let mut timeline = self;
    2722           10 : 
    2723           10 :         let mut cont_lsn = Lsn(request_lsn.0 + 1);
    2724              : 
    2725              :         loop {
    2726           10 :             if self.cancel.is_cancelled() {
    2727            0 :                 return Err(GetVectoredError::Cancelled);
    2728           10 :             }
    2729              : 
    2730           10 :             let completed = Self::get_vectored_reconstruct_data_timeline(
    2731           10 :                 timeline,
    2732           10 :                 keyspace.clone(),
    2733           10 :                 cont_lsn,
    2734           10 :                 reconstruct_state,
    2735           10 :                 &self.cancel,
    2736           10 :                 ctx,
    2737           10 :             )
    2738           28 :             .await?;
    2739              : 
    2740           10 :             keyspace.remove_overlapping_with(&completed);
    2741           10 :             if keyspace.total_size() == 0 || timeline.ancestor_timeline.is_none() {
    2742           10 :                 break;
    2743            0 :             }
    2744            0 : 
    2745            0 :             cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
    2746            0 :             timeline_owned = timeline
    2747            0 :                 .get_ready_ancestor_timeline(ctx)
    2748            0 :                 .await
    2749            0 :                 .map_err(GetVectoredError::GetReadyAncestorError)?;
    2750            0 :             timeline = &*timeline_owned;
    2751              :         }
    2752              : 
    2753           10 :         if keyspace.total_size() != 0 {
    2754            0 :             return Err(GetVectoredError::MissingKey(keyspace.start().unwrap()));
    2755           10 :         }
    2756           10 : 
    2757           10 :         Ok(())
    2758           10 :     }
    2759              : 
    2760              :     /// Collect the reconstruct data for a ketspace from the specified timeline.
    2761              :     ///
    2762              :     /// Maintain a fringe [`LayerFringe`] which tracks all the layers that intersect
    2763              :     /// the current keyspace. The current keyspace of the search at any given timeline
    2764              :     /// is the original keyspace minus all the keys that have been completed minus
    2765              :     /// any keys for which we couldn't find an intersecting layer. It's not tracked explicitly,
    2766              :     /// but if you merge all the keyspaces in the fringe, you get the "current keyspace".
    2767              :     ///
    2768              :     /// This is basically a depth-first search visitor implementation where a vertex
    2769              :     /// is the (layer, lsn range, key space) tuple. The fringe acts as the stack.
    2770              :     ///
    2771              :     /// At each iteration pop the top of the fringe (the layer with the highest Lsn)
    2772              :     /// and get all the required reconstruct data from the layer in one go.
    2773           10 :     async fn get_vectored_reconstruct_data_timeline(
    2774           10 :         timeline: &Timeline,
    2775           10 :         keyspace: KeySpace,
    2776           10 :         mut cont_lsn: Lsn,
    2777           10 :         reconstruct_state: &mut ValuesReconstructState,
    2778           10 :         cancel: &CancellationToken,
    2779           10 :         ctx: &RequestContext,
    2780           10 :     ) -> Result<KeySpace, GetVectoredError> {
    2781           10 :         let mut unmapped_keyspace = keyspace.clone();
    2782           10 :         let mut fringe = LayerFringe::new();
    2783           10 : 
    2784           10 :         let mut completed_keyspace = KeySpace::default();
    2785              : 
    2786              :         // Hold the layer map whilst visiting the timeline to prevent
    2787              :         // compaction, eviction and flushes from rendering the layers unreadable.
    2788              :         //
    2789              :         // TODO: Do we actually need to do this? In theory holding on
    2790              :         // to [`tenant::storage_layer::Layer`] should be enough. However,
    2791              :         // [`Timeline::get`] also holds the lock during IO, so more investigation
    2792              :         // is needed.
    2793           10 :         let guard = timeline.layers.read().await;
    2794           10 :         let layers = guard.layer_map();
    2795              : 
    2796           20 :         'outer: loop {
    2797           20 :             if cancel.is_cancelled() {
    2798            0 :                 return Err(GetVectoredError::Cancelled);
    2799           20 :             }
    2800           20 : 
    2801           20 :             let keys_done_last_step = reconstruct_state.consume_done_keys();
    2802           20 :             unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
    2803           20 :             completed_keyspace.merge(&keys_done_last_step);
    2804           20 : 
    2805           20 :             let in_memory_layer = layers.find_in_memory_layer(|l| {
    2806            0 :                 let start_lsn = l.get_lsn_range().start;
    2807            0 :                 cont_lsn > start_lsn
    2808           20 :             });
    2809           20 : 
    2810           20 :             match in_memory_layer {
    2811            0 :                 Some(l) => {
    2812            0 :                     fringe.update(
    2813            0 :                         ReadableLayerDesc::InMemory {
    2814            0 :                             handle: l,
    2815            0 :                             lsn_ceil: cont_lsn,
    2816            0 :                         },
    2817            0 :                         unmapped_keyspace.clone(),
    2818            0 :                     );
    2819            0 :                 }
    2820              :                 None => {
    2821           20 :                     for range in unmapped_keyspace.ranges.iter() {
    2822           10 :                         let results = match layers.range_search(range.clone(), cont_lsn) {
    2823           10 :                             Some(res) => res,
    2824              :                             None => {
    2825            0 :                                 break 'outer;
    2826              :                             }
    2827              :                         };
    2828              : 
    2829           10 :                         results
    2830           10 :                             .found
    2831           10 :                             .into_iter()
    2832           10 :                             .map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
    2833           10 :                                 (
    2834           10 :                                     ReadableLayerDesc::Persistent {
    2835           10 :                                         desc: (*layer).clone(),
    2836           10 :                                         lsn_floor,
    2837           10 :                                         lsn_ceil: cont_lsn,
    2838           10 :                                     },
    2839           10 :                                     keyspace_accum.to_keyspace(),
    2840           10 :                                 )
    2841           10 :                             })
    2842           10 :                             .for_each(|(layer, keyspace)| fringe.update(layer, keyspace));
    2843              :                     }
    2844              :                 }
    2845              :             }
    2846              : 
    2847           20 :             if let Some((layer_to_read, keyspace_to_read)) = fringe.next_layer() {
    2848           10 :                 layer_to_read
    2849           10 :                     .get_values_reconstruct_data(
    2850           10 :                         &guard,
    2851           10 :                         keyspace_to_read.clone(),
    2852           10 :                         reconstruct_state,
    2853           10 :                         ctx,
    2854           10 :                     )
    2855           28 :                     .await?;
    2856              : 
    2857           10 :                 unmapped_keyspace = keyspace_to_read;
    2858           10 :                 cont_lsn = layer_to_read.get_lsn_floor();
    2859              :             } else {
    2860           10 :                 break;
    2861              :             }
    2862              :         }
    2863              : 
    2864           10 :         Ok(completed_keyspace)
    2865           10 :     }
    2866              : 
    2867              :     /// # Cancel-safety
    2868              :     ///
    2869              :     /// This method is cancellation-safe.
    2870       502486 :     async fn lookup_cached_page(
    2871       502486 :         &self,
    2872       502486 :         key: &Key,
    2873       502486 :         lsn: Lsn,
    2874       502486 :         ctx: &RequestContext,
    2875       502486 :     ) -> Option<(Lsn, Bytes)> {
    2876       502486 :         let cache = page_cache::get();
    2877              : 
    2878              :         // FIXME: It's pointless to check the cache for things that are not 8kB pages.
    2879              :         // We should look at the key to determine if it's a cacheable object
    2880       502486 :         let (lsn, read_guard) = cache
    2881       502486 :             .lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
    2882       502486 :             .await?;
    2883            0 :         let img = Bytes::from(read_guard.to_vec());
    2884            0 :         Some((lsn, img))
    2885       502486 :     }
    2886              : 
    2887       226538 :     async fn get_ready_ancestor_timeline(
    2888       226538 :         &self,
    2889       226538 :         ctx: &RequestContext,
    2890       226538 :     ) -> Result<Arc<Timeline>, GetReadyAncestorError> {
    2891       226538 :         let ancestor = match self.get_ancestor_timeline() {
    2892       226538 :             Ok(timeline) => timeline,
    2893            0 :             Err(e) => return Err(GetReadyAncestorError::from(e)),
    2894              :         };
    2895              : 
    2896              :         // It's possible that the ancestor timeline isn't active yet, or
    2897              :         // is active but hasn't yet caught up to the branch point. Wait
    2898              :         // for it.
    2899              :         //
    2900              :         // This cannot happen while the pageserver is running normally,
    2901              :         // because you cannot create a branch from a point that isn't
    2902              :         // present in the pageserver yet. However, we don't wait for the
    2903              :         // branch point to be uploaded to cloud storage before creating
    2904              :         // a branch. I.e., the branch LSN need not be remote consistent
    2905              :         // for the branching operation to succeed.
    2906              :         //
    2907              :         // Hence, if we try to load a tenant in such a state where
    2908              :         // 1. the existence of the branch was persisted (in IndexPart and/or locally)
    2909              :         // 2. but the ancestor state is behind branch_lsn because it was not yet persisted
    2910              :         // then we will need to wait for the ancestor timeline to
    2911              :         // re-stream WAL up to branch_lsn before we access it.
    2912              :         //
    2913              :         // How can a tenant get in such a state?
    2914              :         // - ungraceful pageserver process exit
    2915              :         // - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
    2916              :         //
    2917              :         // NB: this could be avoided by requiring
    2918              :         //   branch_lsn >= remote_consistent_lsn
    2919              :         // during branch creation.
    2920       226538 :         match ancestor.wait_to_become_active(ctx).await {
    2921       226536 :             Ok(()) => {}
    2922              :             Err(TimelineState::Stopping) => {
    2923            0 :                 return Err(GetReadyAncestorError::AncestorStopping(
    2924            0 :                     ancestor.timeline_id,
    2925            0 :                 ));
    2926              :             }
    2927            2 :             Err(state) => {
    2928            2 :                 return Err(GetReadyAncestorError::Other(anyhow::anyhow!(
    2929            2 :                     "Timeline {} will not become active. Current state: {:?}",
    2930            2 :                     ancestor.timeline_id,
    2931            2 :                     &state,
    2932            2 :                 )));
    2933              :             }
    2934              :         }
    2935       226536 :         ancestor
    2936       226536 :             .wait_lsn(self.ancestor_lsn, ctx)
    2937            0 :             .await
    2938       226536 :             .map_err(|e| match e {
    2939            0 :                 e @ WaitLsnError::Timeout(_) => GetReadyAncestorError::AncestorLsnTimeout(e),
    2940            0 :                 WaitLsnError::Shutdown => GetReadyAncestorError::Cancelled,
    2941            0 :                 e @ WaitLsnError::BadState => GetReadyAncestorError::Other(anyhow::anyhow!(e)),
    2942       226536 :             })?;
    2943              : 
    2944       226536 :         Ok(ancestor)
    2945       226538 :     }
    2946              : 
    2947       226538 :     fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
    2948       226538 :         let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
    2949            0 :             format!(
    2950            0 :                 "Ancestor is missing. Timeline id: {} Ancestor id {:?}",
    2951            0 :                 self.timeline_id,
    2952            0 :                 self.get_ancestor_timeline_id(),
    2953            0 :             )
    2954       226538 :         })?;
    2955       226538 :         Ok(Arc::clone(ancestor))
    2956       226538 :     }
    2957              : 
    2958           12 :     pub(crate) fn get_shard_identity(&self) -> &ShardIdentity {
    2959           12 :         &self.shard_identity
    2960           12 :     }
    2961              : 
    2962              :     ///
    2963              :     /// Get a handle to the latest layer for appending.
    2964              :     ///
    2965      2627992 :     async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
    2966      2627992 :         let mut guard = self.layers.write().await;
    2967      2627992 :         let layer = guard
    2968      2627992 :             .get_layer_for_write(
    2969      2627992 :                 lsn,
    2970      2627992 :                 self.get_last_record_lsn(),
    2971      2627992 :                 self.conf,
    2972      2627992 :                 self.timeline_id,
    2973      2627992 :                 self.tenant_shard_id,
    2974      2627992 :             )
    2975          320 :             .await?;
    2976      2627992 :         Ok(layer)
    2977      2627992 :     }
    2978              : 
    2979      3102908 :     pub(crate) fn finish_write(&self, new_lsn: Lsn) {
    2980      3102908 :         assert!(new_lsn.is_aligned());
    2981              : 
    2982      3102908 :         self.metrics.last_record_gauge.set(new_lsn.0 as i64);
    2983      3102908 :         self.last_record_lsn.advance(new_lsn);
    2984      3102908 :     }
    2985              : 
    2986          526 :     async fn freeze_inmem_layer(&self, write_lock_held: bool) {
    2987              :         // Freeze the current open in-memory layer. It will be written to disk on next
    2988              :         // iteration.
    2989              : 
    2990          526 :         let _write_guard = if write_lock_held {
    2991            0 :             None
    2992              :         } else {
    2993          526 :             Some(self.write_lock.lock().await)
    2994              :         };
    2995              : 
    2996          526 :         self.freeze_inmem_layer_at(self.get_last_record_lsn()).await;
    2997          526 :     }
    2998              : 
    2999          526 :     async fn freeze_inmem_layer_at(&self, at: Lsn) {
    3000          526 :         let mut guard = self.layers.write().await;
    3001          526 :         guard
    3002          526 :             .try_freeze_in_memory_layer(at, &self.last_freeze_at)
    3003            0 :             .await;
    3004          526 :     }
    3005              : 
    3006              :     /// Layer flusher task's main loop.
    3007          288 :     async fn flush_loop(
    3008          288 :         self: &Arc<Self>,
    3009          288 :         mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
    3010          288 :         ctx: &RequestContext,
    3011          288 :     ) {
    3012          288 :         info!("started flush loop");
    3013              :         loop {
    3014          814 :             tokio::select! {
    3015         1281 :                 _ = self.cancel.cancelled() => {
    3016         1281 :                     info!("shutting down layer flush task");
    3017         1281 :                     break;
    3018         1281 :                 },
    3019         1281 :                 _ = task_mgr::shutdown_watcher() => {
    3020         1281 :                     info!("shutting down layer flush task");
    3021         1281 :                     break;
    3022         1281 :                 },
    3023         1281 :                 _ = layer_flush_start_rx.changed() => {}
    3024         1281 :             }
    3025              : 
    3026            0 :             trace!("waking up");
    3027          526 :             let timer = self.metrics.flush_time_histo.start_timer();
    3028          526 :             let flush_counter = *layer_flush_start_rx.borrow();
    3029         1046 :             let result = loop {
    3030         1046 :                 if self.cancel.is_cancelled() {
    3031            0 :                     info!("dropping out of flush loop for timeline shutdown");
    3032              :                     // Note: we do not bother transmitting into [`layer_flush_done_tx`], because
    3033              :                     // anyone waiting on that will respect self.cancel as well: they will stop
    3034              :                     // waiting at the same time we as drop out of this loop.
    3035            0 :                     return;
    3036         1046 :                 }
    3037              : 
    3038         1046 :                 let layer_to_flush = {
    3039         1046 :                     let guard = self.layers.read().await;
    3040         1046 :                     guard.layer_map().frozen_layers.front().cloned()
    3041              :                     // drop 'layers' lock to allow concurrent reads and writes
    3042              :                 };
    3043         1046 :                 let Some(layer_to_flush) = layer_to_flush else {
    3044          526 :                     break Ok(());
    3045              :                 };
    3046         2307 :                 match self.flush_frozen_layer(layer_to_flush, ctx).await {
    3047          520 :                     Ok(()) => {}
    3048              :                     Err(FlushLayerError::Cancelled) => {
    3049            0 :                         info!("dropping out of flush loop for timeline shutdown");
    3050            0 :                         return;
    3051              :                     }
    3052            0 :                     err @ Err(
    3053              :                         FlushLayerError::Other(_) | FlushLayerError::CreateImageLayersError(_),
    3054              :                     ) => {
    3055            0 :                         error!("could not flush frozen layer: {err:?}");
    3056            0 :                         break err;
    3057              :                     }
    3058              :                 }
    3059              :             };
    3060              :             // Notify any listeners that we're done
    3061          526 :             let _ = self
    3062          526 :                 .layer_flush_done_tx
    3063          526 :                 .send_replace((flush_counter, result));
    3064          526 : 
    3065          526 :             timer.stop_and_record();
    3066              :         }
    3067            8 :     }
    3068              : 
    3069          526 :     async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
    3070          526 :         let mut rx = self.layer_flush_done_tx.subscribe();
    3071          526 : 
    3072          526 :         // Increment the flush cycle counter and wake up the flush task.
    3073          526 :         // Remember the new value, so that when we listen for the flush
    3074          526 :         // to finish, we know when the flush that we initiated has
    3075          526 :         // finished, instead of some other flush that was started earlier.
    3076          526 :         let mut my_flush_request = 0;
    3077          526 : 
    3078          526 :         let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
    3079          526 :         if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
    3080            0 :             anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
    3081          526 :         }
    3082          526 : 
    3083          526 :         self.layer_flush_start_tx.send_modify(|counter| {
    3084          526 :             my_flush_request = *counter + 1;
    3085          526 :             *counter = my_flush_request;
    3086          526 :         });
    3087              : 
    3088         1052 :         loop {
    3089         1052 :             {
    3090         1052 :                 let (last_result_counter, last_result) = &*rx.borrow();
    3091         1052 :                 if *last_result_counter >= my_flush_request {
    3092          526 :                     if let Err(_err) = last_result {
    3093              :                         // We already logged the original error in
    3094              :                         // flush_loop. We cannot propagate it to the caller
    3095              :                         // here, because it might not be Cloneable
    3096            0 :                         anyhow::bail!(
    3097            0 :                             "Could not flush frozen layer. Request id: {}",
    3098            0 :                             my_flush_request
    3099            0 :                         );
    3100              :                     } else {
    3101          526 :                         return Ok(());
    3102              :                     }
    3103          526 :                 }
    3104              :             }
    3105            0 :             trace!("waiting for flush to complete");
    3106          526 :             tokio::select! {
    3107          526 :                 rx_e = rx.changed() => {
    3108              :                     rx_e?;
    3109              :                 },
    3110              :                 // Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring
    3111              :                 // the notification from [`flush_loop`] that it completed.
    3112              :                 _ = self.cancel.cancelled() => {
    3113            0 :                     tracing::info!("Cancelled layer flush due on timeline shutdown");
    3114              :                     return Ok(())
    3115              :                 }
    3116              :             };
    3117            0 :             trace!("done")
    3118              :         }
    3119          526 :     }
    3120              : 
    3121            0 :     fn flush_frozen_layers(&self) {
    3122            0 :         self.layer_flush_start_tx.send_modify(|val| *val += 1);
    3123            0 :     }
    3124              : 
    3125              :     /// Flush one frozen in-memory layer to disk, as a new delta layer.
    3126         1040 :     #[instrument(skip_all, fields(layer=%frozen_layer))]
    3127              :     async fn flush_frozen_layer(
    3128              :         self: &Arc<Self>,
    3129              :         frozen_layer: Arc<InMemoryLayer>,
    3130              :         ctx: &RequestContext,
    3131              :     ) -> Result<(), FlushLayerError> {
    3132              :         debug_assert_current_span_has_tenant_and_timeline_id();
    3133              :         // As a special case, when we have just imported an image into the repository,
    3134              :         // instead of writing out a L0 delta layer, we directly write out image layer
    3135              :         // files instead. This is possible as long as *all* the data imported into the
    3136              :         // repository have the same LSN.
    3137              :         let lsn_range = frozen_layer.get_lsn_range();
    3138              :         let (layers_to_upload, delta_layer_to_add) =
    3139              :             if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
    3140              :                 #[cfg(test)]
    3141              :                 match &mut *self.flush_loop_state.lock().unwrap() {
    3142              :                     FlushLoopState::NotStarted | FlushLoopState::Exited => {
    3143              :                         panic!("flush loop not running")
    3144              :                     }
    3145              :                     FlushLoopState::Running {
    3146              :                         initdb_optimization_count,
    3147              :                         ..
    3148              :                     } => {
    3149              :                         *initdb_optimization_count += 1;
    3150              :                     }
    3151              :                 }
    3152              :                 // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
    3153              :                 // require downloading anything during initial import.
    3154              :                 let (partitioning, _lsn) = self
    3155              :                     .repartition(
    3156              :                         self.initdb_lsn,
    3157              :                         self.get_compaction_target_size(),
    3158              :                         EnumSet::empty(),
    3159              :                         ctx,
    3160              :                     )
    3161              :                     .await?;
    3162              : 
    3163              :                 if self.cancel.is_cancelled() {
    3164              :                     return Err(FlushLayerError::Cancelled);
    3165              :                 }
    3166              : 
    3167              :                 // For image layers, we add them immediately into the layer map.
    3168              :                 (
    3169              :                     self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
    3170              :                         .await?,
    3171              :                     None,
    3172              :                 )
    3173              :             } else {
    3174              :                 #[cfg(test)]
    3175              :                 match &mut *self.flush_loop_state.lock().unwrap() {
    3176              :                     FlushLoopState::NotStarted | FlushLoopState::Exited => {
    3177              :                         panic!("flush loop not running")
    3178              :                     }
    3179              :                     FlushLoopState::Running {
    3180              :                         expect_initdb_optimization,
    3181              :                         ..
    3182              :                     } => {
    3183              :                         assert!(!*expect_initdb_optimization, "expected initdb optimization");
    3184              :                     }
    3185              :                 }
    3186              :                 // Normal case, write out a L0 delta layer file.
    3187              :                 // `create_delta_layer` will not modify the layer map.
    3188              :                 // We will remove frozen layer and add delta layer in one atomic operation later.
    3189              :                 let layer = self.create_delta_layer(&frozen_layer, ctx).await?;
    3190              :                 (
    3191              :                     // FIXME: even though we have a single image and single delta layer assumption
    3192              :                     // we push them to vec
    3193              :                     vec![layer.clone()],
    3194              :                     Some(layer),
    3195              :                 )
    3196              :             };
    3197              : 
    3198          520 :         pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
    3199              : 
    3200              :         if self.cancel.is_cancelled() {
    3201              :             return Err(FlushLayerError::Cancelled);
    3202              :         }
    3203              : 
    3204              :         let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
    3205              :         let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
    3206              : 
    3207              :         // The new on-disk layers are now in the layer map. We can remove the
    3208              :         // in-memory layer from the map now. The flushed layer is stored in
    3209              :         // the mapping in `create_delta_layer`.
    3210              :         let metadata = {
    3211              :             let mut guard = self.layers.write().await;
    3212              : 
    3213              :             if self.cancel.is_cancelled() {
    3214              :                 return Err(FlushLayerError::Cancelled);
    3215              :             }
    3216              : 
    3217              :             guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
    3218              : 
    3219              :             if disk_consistent_lsn != old_disk_consistent_lsn {
    3220              :                 assert!(disk_consistent_lsn > old_disk_consistent_lsn);
    3221              :                 self.disk_consistent_lsn.store(disk_consistent_lsn);
    3222              : 
    3223              :                 // Schedule remote uploads that will reflect our new disk_consistent_lsn
    3224              :                 Some(self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?)
    3225              :             } else {
    3226              :                 None
    3227              :             }
    3228              :             // release lock on 'layers'
    3229              :         };
    3230              : 
    3231              :         // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
    3232              :         // a compaction can delete the file and then it won't be available for uploads any more.
    3233              :         // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
    3234              :         // race situation.
    3235              :         // See https://github.com/neondatabase/neon/issues/4526
    3236          520 :         pausable_failpoint!("flush-frozen-pausable");
    3237              : 
    3238              :         // This failpoint is used by another test case `test_pageserver_recovery`.
    3239            0 :         fail_point!("flush-frozen-exit");
    3240              : 
    3241              :         // Update the metadata file, with new 'disk_consistent_lsn'
    3242              :         //
    3243              :         // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
    3244              :         // *all* the layers, to avoid fsyncing the file multiple times.
    3245              : 
    3246              :         // If we updated our disk_consistent_lsn, persist the updated metadata to local disk.
    3247              :         if let Some(metadata) = metadata {
    3248              :             save_metadata(
    3249              :                 self.conf,
    3250              :                 &self.tenant_shard_id,
    3251              :                 &self.timeline_id,
    3252              :                 &metadata,
    3253              :             )
    3254              :             .await
    3255              :             .context("save_metadata")?;
    3256              :         }
    3257              :         Ok(())
    3258              :     }
    3259              : 
    3260              :     /// Update metadata file
    3261          520 :     fn schedule_uploads(
    3262          520 :         &self,
    3263          520 :         disk_consistent_lsn: Lsn,
    3264          520 :         layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
    3265          520 :     ) -> anyhow::Result<TimelineMetadata> {
    3266          520 :         // We can only save a valid 'prev_record_lsn' value on disk if we
    3267          520 :         // flushed *all* in-memory changes to disk. We only track
    3268          520 :         // 'prev_record_lsn' in memory for the latest processed record, so we
    3269          520 :         // don't remember what the correct value that corresponds to some old
    3270          520 :         // LSN is. But if we flush everything, then the value corresponding
    3271          520 :         // current 'last_record_lsn' is correct and we can store it on disk.
    3272          520 :         let RecordLsn {
    3273          520 :             last: last_record_lsn,
    3274          520 :             prev: prev_record_lsn,
    3275          520 :         } = self.last_record_lsn.load();
    3276          520 :         let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
    3277          520 :             Some(prev_record_lsn)
    3278              :         } else {
    3279            0 :             None
    3280              :         };
    3281              : 
    3282          520 :         let ancestor_timeline_id = self
    3283          520 :             .ancestor_timeline
    3284          520 :             .as_ref()
    3285          520 :             .map(|ancestor| ancestor.timeline_id);
    3286          520 : 
    3287          520 :         let metadata = TimelineMetadata::new(
    3288          520 :             disk_consistent_lsn,
    3289          520 :             ondisk_prev_record_lsn,
    3290          520 :             ancestor_timeline_id,
    3291          520 :             self.ancestor_lsn,
    3292          520 :             *self.latest_gc_cutoff_lsn.read(),
    3293          520 :             self.initdb_lsn,
    3294          520 :             self.pg_version,
    3295          520 :         );
    3296          520 : 
    3297          520 :         fail_point!("checkpoint-before-saving-metadata", |x| bail!(
    3298            0 :             "{}",
    3299            0 :             x.unwrap()
    3300          520 :         ));
    3301              : 
    3302          520 :         if let Some(remote_client) = &self.remote_client {
    3303         1040 :             for layer in layers_to_upload {
    3304          520 :                 remote_client.schedule_layer_file_upload(layer)?;
    3305              :             }
    3306          520 :             remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
    3307            0 :         }
    3308              : 
    3309          520 :         Ok(metadata)
    3310          520 :     }
    3311              : 
    3312            0 :     async fn update_metadata_file(
    3313            0 :         &self,
    3314            0 :         disk_consistent_lsn: Lsn,
    3315            0 :         layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
    3316            0 :     ) -> anyhow::Result<()> {
    3317            0 :         let metadata = self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?;
    3318              : 
    3319            0 :         save_metadata(
    3320            0 :             self.conf,
    3321            0 :             &self.tenant_shard_id,
    3322            0 :             &self.timeline_id,
    3323            0 :             &metadata,
    3324            0 :         )
    3325            0 :         .await
    3326            0 :         .context("save_metadata")?;
    3327              : 
    3328            0 :         Ok(())
    3329            0 :     }
    3330              : 
    3331            0 :     pub(crate) async fn preserve_initdb_archive(&self) -> anyhow::Result<()> {
    3332            0 :         if let Some(remote_client) = &self.remote_client {
    3333            0 :             remote_client
    3334            0 :                 .preserve_initdb_archive(
    3335            0 :                     &self.tenant_shard_id.tenant_id,
    3336            0 :                     &self.timeline_id,
    3337            0 :                     &self.cancel,
    3338            0 :                 )
    3339            0 :                 .await?;
    3340              :         } else {
    3341            0 :             bail!("No remote storage configured, but was asked to backup the initdb archive for {} / {}", self.tenant_shard_id.tenant_id, self.timeline_id);
    3342              :         }
    3343            0 :         Ok(())
    3344            0 :     }
    3345              : 
    3346              :     // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
    3347              :     // in layer map immediately. The caller is responsible to put it into the layer map.
    3348          450 :     async fn create_delta_layer(
    3349          450 :         self: &Arc<Self>,
    3350          450 :         frozen_layer: &Arc<InMemoryLayer>,
    3351          450 :         ctx: &RequestContext,
    3352          450 :     ) -> anyhow::Result<ResidentLayer> {
    3353          450 :         let span = tracing::info_span!("blocking");
    3354          450 :         let new_delta: ResidentLayer = tokio::task::spawn_blocking({
    3355          450 :             let self_clone = Arc::clone(self);
    3356          450 :             let frozen_layer = Arc::clone(frozen_layer);
    3357          450 :             let ctx = ctx.attached_child();
    3358          450 :             move || {
    3359          450 :                 // Write it out
    3360          450 :                 // Keep this inside `spawn_blocking` and `Handle::current`
    3361          450 :                 // as long as the write path is still sync and the read impl
    3362          450 :                 // is still not fully async. Otherwise executor threads would
    3363          450 :                 // be blocked.
    3364          450 :                 let _g = span.entered();
    3365          450 :                 let new_delta =
    3366          450 :                     Handle::current().block_on(frozen_layer.write_to_disk(&self_clone, &ctx))?;
    3367          450 :                 let new_delta_path = new_delta.local_path().to_owned();
    3368          450 : 
    3369          450 :                 // Sync it to disk.
    3370          450 :                 //
    3371          450 :                 // We must also fsync the timeline dir to ensure the directory entries for
    3372          450 :                 // new layer files are durable.
    3373          450 :                 //
    3374          450 :                 // NB: timeline dir must be synced _after_ the file contents are durable.
    3375          450 :                 // So, two separate fsyncs are required, they mustn't be batched.
    3376          450 :                 //
    3377          450 :                 // TODO: If we're running inside 'flush_frozen_layers' and there are multiple
    3378          450 :                 // files to flush, the fsync overhead can be reduces as follows:
    3379          450 :                 // 1. write them all to temporary file names
    3380          450 :                 // 2. fsync them
    3381          450 :                 // 3. rename to the final name
    3382          450 :                 // 4. fsync the parent directory.
    3383          450 :                 // Note that (1),(2),(3) today happen inside write_to_disk().
    3384          450 :                 //
    3385          450 :                 // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
    3386          450 :                 par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
    3387          450 :                 par_fsync::par_fsync(&[self_clone
    3388          450 :                     .conf
    3389          450 :                     .timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id)])
    3390          450 :                 .context("fsync of timeline dir")?;
    3391              : 
    3392          450 :                 anyhow::Ok(new_delta)
    3393          450 :             }
    3394          450 :         })
    3395          450 :         .await
    3396          450 :         .context("spawn_blocking")
    3397          450 :         .and_then(|x| x)?;
    3398              : 
    3399          450 :         Ok(new_delta)
    3400          450 :     }
    3401              : 
    3402          478 :     async fn repartition(
    3403          478 :         &self,
    3404          478 :         lsn: Lsn,
    3405          478 :         partition_size: u64,
    3406          478 :         flags: EnumSet<CompactFlags>,
    3407          478 :         ctx: &RequestContext,
    3408          478 :     ) -> anyhow::Result<(KeyPartitioning, Lsn)> {
    3409          478 :         {
    3410          478 :             let partitioning_guard = self.partitioning.lock().unwrap();
    3411          478 :             let distance = lsn.0 - partitioning_guard.1 .0;
    3412          478 :             if partitioning_guard.1 != Lsn(0)
    3413          308 :                 && distance <= self.repartition_threshold
    3414          308 :                 && !flags.contains(CompactFlags::ForceRepartition)
    3415              :             {
    3416            0 :                 debug!(
    3417            0 :                     distance,
    3418            0 :                     threshold = self.repartition_threshold,
    3419            0 :                     "no repartitioning needed"
    3420            0 :                 );
    3421          308 :                 return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
    3422          170 :             }
    3423              :         }
    3424        13252 :         let keyspace = self.collect_keyspace(lsn, ctx).await?;
    3425          170 :         let partitioning = keyspace.partition(partition_size);
    3426          170 : 
    3427          170 :         let mut partitioning_guard = self.partitioning.lock().unwrap();
    3428          170 :         if lsn > partitioning_guard.1 {
    3429          170 :             *partitioning_guard = (partitioning, lsn);
    3430          170 :         } else {
    3431            0 :             warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
    3432              :         }
    3433          170 :         Ok((partitioning_guard.0.clone(), partitioning_guard.1))
    3434          478 :     }
    3435              : 
    3436              :     // Is it time to create a new image layer for the given partition?
    3437          408 :     async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
    3438          408 :         let threshold = self.get_image_creation_threshold();
    3439              : 
    3440          408 :         let guard = self.layers.read().await;
    3441          408 :         let layers = guard.layer_map();
    3442          408 : 
    3443          408 :         let mut max_deltas = 0;
    3444          408 :         {
    3445          408 :             let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
    3446          408 :             if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
    3447          294 :                 let img_range =
    3448          294 :                     partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
    3449          294 :                 if wanted.overlaps(&img_range) {
    3450              :                     //
    3451              :                     // gc_timeline only pays attention to image layers that are older than the GC cutoff,
    3452              :                     // but create_image_layers creates image layers at last-record-lsn.
    3453              :                     // So it's possible that gc_timeline wants a new image layer to be created for a key range,
    3454              :                     // but the range is already covered by image layers at more recent LSNs. Before we
    3455              :                     // create a new image layer, check if the range is already covered at more recent LSNs.
    3456            0 :                     if !layers
    3457            0 :                         .image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))
    3458              :                     {
    3459            0 :                         debug!(
    3460            0 :                             "Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
    3461            0 :                             img_range.start, img_range.end, cutoff_lsn, lsn
    3462            0 :                         );
    3463            0 :                         return true;
    3464            0 :                     }
    3465          294 :                 }
    3466          114 :             }
    3467              :         }
    3468              : 
    3469         2856 :         for part_range in &partition.ranges {
    3470         2448 :             let image_coverage = layers.image_coverage(part_range, lsn);
    3471         5204 :             for (img_range, last_img) in image_coverage {
    3472         2756 :                 let img_lsn = if let Some(last_img) = last_img {
    3473         2156 :                     last_img.get_lsn_range().end
    3474              :                 } else {
    3475          600 :                     Lsn(0)
    3476              :                 };
    3477              :                 // Let's consider an example:
    3478              :                 //
    3479              :                 // delta layer with LSN range 71-81
    3480              :                 // delta layer with LSN range 81-91
    3481              :                 // delta layer with LSN range 91-101
    3482              :                 // image layer at LSN 100
    3483              :                 //
    3484              :                 // If 'lsn' is still 100, i.e. no new WAL has been processed since the last image layer,
    3485              :                 // there's no need to create a new one. We check this case explicitly, to avoid passing
    3486              :                 // a bogus range to count_deltas below, with start > end. It's even possible that there
    3487              :                 // are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
    3488              :                 // after we read last_record_lsn, which is passed here in the 'lsn' argument.
    3489         2756 :                 if img_lsn < lsn {
    3490          600 :                     let num_deltas =
    3491          600 :                         layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold));
    3492          600 : 
    3493          600 :                     max_deltas = max_deltas.max(num_deltas);
    3494          600 :                     if num_deltas >= threshold {
    3495            0 :                         debug!(
    3496            0 :                             "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
    3497            0 :                             img_range.start, img_range.end, num_deltas, img_lsn, lsn
    3498            0 :                         );
    3499            0 :                         return true;
    3500          600 :                     }
    3501         2156 :                 }
    3502              :             }
    3503              :         }
    3504              : 
    3505            0 :         debug!(
    3506            0 :             max_deltas,
    3507            0 :             "none of the partitioned ranges had >= {threshold} deltas"
    3508            0 :         );
    3509          408 :         false
    3510          408 :     }
    3511              : 
    3512          956 :     #[tracing::instrument(skip_all, fields(%lsn, %force))]
    3513              :     async fn create_image_layers(
    3514              :         self: &Arc<Timeline>,
    3515              :         partitioning: &KeyPartitioning,
    3516              :         lsn: Lsn,
    3517              :         force: bool,
    3518              :         ctx: &RequestContext,
    3519              :     ) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
    3520              :         let timer = self.metrics.create_images_time_histo.start_timer();
    3521              :         let mut image_layers = Vec::new();
    3522              : 
    3523              :         // We need to avoid holes between generated image layers.
    3524              :         // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
    3525              :         // image layer with hole between them. In this case such layer can not be utilized by GC.
    3526              :         //
    3527              :         // How such hole between partitions can appear?
    3528              :         // if we have relation with relid=1 and size 100 and relation with relid=2 with size 200 then result of
    3529              :         // KeySpace::partition may contain partitions <100000000..100000099> and <200000000..200000199>.
    3530              :         // If there is delta layer <100000000..300000000> then it never be garbage collected because
    3531              :         // image layers  <100000000..100000099> and <200000000..200000199> are not completely covering it.
    3532              :         let mut start = Key::MIN;
    3533              : 
    3534              :         for partition in partitioning.parts.iter() {
    3535              :             let img_range = start..partition.ranges.last().unwrap().end;
    3536              :             if !force && !self.time_for_new_image_layer(partition, lsn).await {
    3537              :                 start = img_range.end;
    3538              :                 continue;
    3539              :             }
    3540              : 
    3541              :             let mut image_layer_writer = ImageLayerWriter::new(
    3542              :                 self.conf,
    3543              :                 self.timeline_id,
    3544              :                 self.tenant_shard_id,
    3545              :                 &img_range,
    3546              :                 lsn,
    3547              :             )
    3548              :             .await?;
    3549              : 
    3550            0 :             fail_point!("image-layer-writer-fail-before-finish", |_| {
    3551            0 :                 Err(CreateImageLayersError::Other(anyhow::anyhow!(
    3552            0 :                     "failpoint image-layer-writer-fail-before-finish"
    3553            0 :                 )))
    3554            0 :             });
    3555              : 
    3556              :             let mut wrote_keys = false;
    3557              : 
    3558              :             let mut key_request_accum = KeySpaceAccum::new();
    3559              :             for range in &partition.ranges {
    3560              :                 let mut key = range.start;
    3561              :                 while key < range.end {
    3562              :                     // Decide whether to retain this key: usually we do, but sharded tenants may
    3563              :                     // need to drop keys that don't belong to them.  If we retain the key, add it
    3564              :                     // to `key_request_accum` for later issuing a vectored get
    3565              :                     if self.shard_identity.is_key_disposable(&key) {
    3566            0 :                         debug!(
    3567            0 :                             "Dropping key {} during compaction (it belongs on shard {:?})",
    3568            0 :                             key,
    3569            0 :                             self.shard_identity.get_shard_number(&key)
    3570            0 :                         );
    3571              :                     } else {
    3572              :                         key_request_accum.add_key(key);
    3573              :                     }
    3574              : 
    3575              :                     let last_key_in_range = key.next() == range.end;
    3576              :                     key = key.next();
    3577              : 
    3578              :                     // Maybe flush `key_rest_accum`
    3579              :                     if key_request_accum.size() >= Timeline::MAX_GET_VECTORED_KEYS
    3580              :                         || last_key_in_range
    3581              :                     {
    3582              :                         let results = self
    3583              :                             .get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
    3584              :                             .await?;
    3585              : 
    3586              :                         for (img_key, img) in results {
    3587              :                             let img = match img {
    3588              :                                 Ok(img) => img,
    3589              :                                 Err(err) => {
    3590              :                                     // If we fail to reconstruct a VM or FSM page, we can zero the
    3591              :                                     // page without losing any actual user data. That seems better
    3592              :                                     // than failing repeatedly and getting stuck.
    3593              :                                     //
    3594              :                                     // We had a bug at one point, where we truncated the FSM and VM
    3595              :                                     // in the pageserver, but the Postgres didn't know about that
    3596              :                                     // and continued to generate incremental WAL records for pages
    3597              :                                     // that didn't exist in the pageserver. Trying to replay those
    3598              :                                     // WAL records failed to find the previous image of the page.
    3599              :                                     // This special case allows us to recover from that situation.
    3600              :                                     // See https://github.com/neondatabase/neon/issues/2601.
    3601              :                                     //
    3602              :                                     // Unfortunately we cannot do this for the main fork, or for
    3603              :                                     // any metadata keys, keys, as that would lead to actual data
    3604              :                                     // loss.
    3605              :                                     if is_rel_fsm_block_key(img_key) || is_rel_vm_block_key(img_key)
    3606              :                                     {
    3607            0 :                                         warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
    3608              :                                         ZERO_PAGE.clone()
    3609              :                                     } else {
    3610              :                                         return Err(CreateImageLayersError::PageReconstructError(
    3611              :                                             err,
    3612              :                                         ));
    3613              :                                     }
    3614              :                                 }
    3615              :                             };
    3616              : 
    3617              :                             // Write all the keys we just read into our new image layer.
    3618              :                             image_layer_writer.put_image(img_key, img).await?;
    3619              :                             wrote_keys = true;
    3620              :                         }
    3621              :                     }
    3622              :                 }
    3623              :             }
    3624              : 
    3625              :             if wrote_keys {
    3626              :                 // Normal path: we have written some data into the new image layer for this
    3627              :                 // partition, so flush it to disk.
    3628              :                 start = img_range.end;
    3629              :                 let image_layer = image_layer_writer.finish(self).await?;
    3630              :                 image_layers.push(image_layer);
    3631              :             } else {
    3632              :                 // Special case: the image layer may be empty if this is a sharded tenant and the
    3633              :                 // partition does not cover any keys owned by this shard.  In this case, to ensure
    3634              :                 // we don't leave gaps between image layers, leave `start` where it is, so that the next
    3635              :                 // layer we write will cover the key range that we just scanned.
    3636            0 :                 tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
    3637              :             }
    3638              :         }
    3639              :         // All layers that the GC wanted us to create have now been created.
    3640              :         //
    3641              :         // It's possible that another GC cycle happened while we were compacting, and added
    3642              :         // something new to wanted_image_layers, and we now clear that before processing it.
    3643              :         // That's OK, because the next GC iteration will put it back in.
    3644              :         *self.wanted_image_layers.lock().unwrap() = None;
    3645              : 
    3646              :         // Sync the new layer to disk before adding it to the layer map, to make sure
    3647              :         // we don't garbage collect something based on the new layer, before it has
    3648              :         // reached the disk.
    3649              :         //
    3650              :         // We must also fsync the timeline dir to ensure the directory entries for
    3651              :         // new layer files are durable
    3652              :         //
    3653              :         // Compaction creates multiple image layers. It would be better to create them all
    3654              :         // and fsync them all in parallel.
    3655              :         let all_paths = image_layers
    3656              :             .iter()
    3657           70 :             .map(|layer| layer.local_path().to_owned())
    3658              :             .collect::<Vec<_>>();
    3659              : 
    3660              :         par_fsync::par_fsync_async(&all_paths)
    3661              :             .await
    3662              :             .context("fsync of newly created layer files")?;
    3663              : 
    3664              :         if !all_paths.is_empty() {
    3665              :             par_fsync::par_fsync_async(&[self
    3666              :                 .conf
    3667              :                 .timeline_path(&self.tenant_shard_id, &self.timeline_id)])
    3668              :             .await
    3669              :             .context("fsync of timeline dir")?;
    3670              :         }
    3671              : 
    3672              :         let mut guard = self.layers.write().await;
    3673              : 
    3674              :         // FIXME: we could add the images to be uploaded *before* returning from here, but right
    3675              :         // now they are being scheduled outside of write lock
    3676              :         guard.track_new_image_layers(&image_layers, &self.metrics);
    3677              :         drop_wlock(guard);
    3678              :         timer.stop_and_record();
    3679              : 
    3680              :         Ok(image_layers)
    3681              :     }
    3682              : 
    3683              :     /// Wait until the background initial logical size calculation is complete, or
    3684              :     /// this Timeline is shut down.  Calling this function will cause the initial
    3685              :     /// logical size calculation to skip waiting for the background jobs barrier.
    3686            0 :     pub(crate) async fn await_initial_logical_size(self: Arc<Self>) {
    3687            0 :         if let Some(await_bg_cancel) = self
    3688            0 :             .current_logical_size
    3689            0 :             .cancel_wait_for_background_loop_concurrency_limit_semaphore
    3690            0 :             .get()
    3691            0 :         {
    3692            0 :             await_bg_cancel.cancel();
    3693            0 :         } else {
    3694              :             // We should not wait if we were not able to explicitly instruct
    3695              :             // the logical size cancellation to skip the concurrency limit semaphore.
    3696              :             // TODO: this is an unexpected case.  We should restructure so that it
    3697              :             // can't happen.
    3698            0 :             tracing::info!(
    3699            0 :                 "await_initial_logical_size: can't get semaphore cancel token, skipping"
    3700            0 :             );
    3701              :         }
    3702              : 
    3703            0 :         tokio::select!(
    3704            0 :             _ = self.current_logical_size.initialized.acquire() => {},
    3705            0 :             _ = self.cancel.cancelled() => {}
    3706            0 :         )
    3707            0 :     }
    3708              : }
    3709              : 
    3710          378 : #[derive(Default)]
    3711              : struct CompactLevel0Phase1Result {
    3712              :     new_layers: Vec<ResidentLayer>,
    3713              :     deltas_to_compact: Vec<Layer>,
    3714              : }
    3715              : 
    3716              : /// Top-level failure to compact.
    3717            0 : #[derive(Debug, thiserror::Error)]
    3718              : pub(crate) enum CompactionError {
    3719              :     #[error("The timeline or pageserver is shutting down")]
    3720              :     ShuttingDown,
    3721              :     /// Compaction cannot be done right now; page reconstruction and so on.
    3722              :     #[error(transparent)]
    3723              :     Other(#[from] anyhow::Error),
    3724              : }
    3725              : 
    3726              : #[serde_as]
    3727          420 : #[derive(serde::Serialize)]
    3728              : struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
    3729              : 
    3730         2856 : #[derive(Default)]
    3731              : enum DurationRecorder {
    3732              :     #[default]
    3733              :     NotStarted,
    3734              :     Recorded(RecordedDuration, tokio::time::Instant),
    3735              : }
    3736              : 
    3737              : impl DurationRecorder {
    3738          558 :     fn till_now(&self) -> DurationRecorder {
    3739          558 :         match self {
    3740              :             DurationRecorder::NotStarted => {
    3741            0 :                 panic!("must only call on recorded measurements")
    3742              :             }
    3743          558 :             DurationRecorder::Recorded(_, ended) => {
    3744          558 :                 let now = tokio::time::Instant::now();
    3745          558 :                 DurationRecorder::Recorded(RecordedDuration(now - *ended), now)
    3746          558 :             }
    3747          558 :         }
    3748          558 :     }
    3749          210 :     fn into_recorded(self) -> Option<RecordedDuration> {
    3750          210 :         match self {
    3751            0 :             DurationRecorder::NotStarted => None,
    3752          210 :             DurationRecorder::Recorded(recorded, _) => Some(recorded),
    3753              :         }
    3754          210 :     }
    3755              : }
    3756              : 
    3757          408 : #[derive(Default)]
    3758              : struct CompactLevel0Phase1StatsBuilder {
    3759              :     version: Option<u64>,
    3760              :     tenant_id: Option<TenantShardId>,
    3761              :     timeline_id: Option<TimelineId>,
    3762              :     read_lock_acquisition_micros: DurationRecorder,
    3763              :     read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
    3764              :     read_lock_held_key_sort_micros: DurationRecorder,
    3765              :     read_lock_held_prerequisites_micros: DurationRecorder,
    3766              :     read_lock_held_compute_holes_micros: DurationRecorder,
    3767              :     read_lock_drop_micros: DurationRecorder,
    3768              :     write_layer_files_micros: DurationRecorder,
    3769              :     level0_deltas_count: Option<usize>,
    3770              :     new_deltas_count: Option<usize>,
    3771              :     new_deltas_size: Option<u64>,
    3772              : }
    3773              : 
    3774           30 : #[derive(serde::Serialize)]
    3775              : struct CompactLevel0Phase1Stats {
    3776              :     version: u64,
    3777              :     tenant_id: TenantShardId,
    3778              :     timeline_id: TimelineId,
    3779              :     read_lock_acquisition_micros: RecordedDuration,
    3780              :     read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
    3781              :     read_lock_held_key_sort_micros: RecordedDuration,
    3782              :     read_lock_held_prerequisites_micros: RecordedDuration,
    3783              :     read_lock_held_compute_holes_micros: RecordedDuration,
    3784              :     read_lock_drop_micros: RecordedDuration,
    3785              :     write_layer_files_micros: RecordedDuration,
    3786              :     level0_deltas_count: usize,
    3787              :     new_deltas_count: usize,
    3788              :     new_deltas_size: u64,
    3789              : }
    3790              : 
    3791              : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
    3792              :     type Error = anyhow::Error;
    3793              : 
    3794           30 :     fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
    3795           30 :         Ok(Self {
    3796           30 :             version: value.version.ok_or_else(|| anyhow!("version not set"))?,
    3797           30 :             tenant_id: value
    3798           30 :                 .tenant_id
    3799           30 :                 .ok_or_else(|| anyhow!("tenant_id not set"))?,
    3800           30 :             timeline_id: value
    3801           30 :                 .timeline_id
    3802           30 :                 .ok_or_else(|| anyhow!("timeline_id not set"))?,
    3803           30 :             read_lock_acquisition_micros: value
    3804           30 :                 .read_lock_acquisition_micros
    3805           30 :                 .into_recorded()
    3806           30 :                 .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
    3807           30 :             read_lock_held_spawn_blocking_startup_micros: value
    3808           30 :                 .read_lock_held_spawn_blocking_startup_micros
    3809           30 :                 .into_recorded()
    3810           30 :                 .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
    3811           30 :             read_lock_held_key_sort_micros: value
    3812           30 :                 .read_lock_held_key_sort_micros
    3813           30 :                 .into_recorded()
    3814           30 :                 .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
    3815           30 :             read_lock_held_prerequisites_micros: value
    3816           30 :                 .read_lock_held_prerequisites_micros
    3817           30 :                 .into_recorded()
    3818           30 :                 .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
    3819           30 :             read_lock_held_compute_holes_micros: value
    3820           30 :                 .read_lock_held_compute_holes_micros
    3821           30 :                 .into_recorded()
    3822           30 :                 .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
    3823           30 :             read_lock_drop_micros: value
    3824           30 :                 .read_lock_drop_micros
    3825           30 :                 .into_recorded()
    3826           30 :                 .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
    3827           30 :             write_layer_files_micros: value
    3828           30 :                 .write_layer_files_micros
    3829           30 :                 .into_recorded()
    3830           30 :                 .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
    3831           30 :             level0_deltas_count: value
    3832           30 :                 .level0_deltas_count
    3833           30 :                 .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
    3834           30 :             new_deltas_count: value
    3835           30 :                 .new_deltas_count
    3836           30 :                 .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
    3837           30 :             new_deltas_size: value
    3838           30 :                 .new_deltas_size
    3839           30 :                 .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
    3840              :         })
    3841           30 :     }
    3842              : }
    3843              : 
    3844              : impl Timeline {
    3845              :     /// Level0 files first phase of compaction, explained in the [`Self::compact`] comment.
    3846          408 :     async fn compact_level0_phase1(
    3847          408 :         self: &Arc<Self>,
    3848          408 :         guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
    3849          408 :         mut stats: CompactLevel0Phase1StatsBuilder,
    3850          408 :         target_file_size: u64,
    3851          408 :         ctx: &RequestContext,
    3852          408 :     ) -> Result<CompactLevel0Phase1Result, CompactionError> {
    3853          408 :         stats.read_lock_held_spawn_blocking_startup_micros =
    3854          408 :             stats.read_lock_acquisition_micros.till_now(); // set by caller
    3855          408 :         let layers = guard.layer_map();
    3856          408 :         let level0_deltas = layers.get_level0_deltas()?;
    3857          408 :         let mut level0_deltas = level0_deltas
    3858          408 :             .into_iter()
    3859         1770 :             .map(|x| guard.get_from_desc(&x))
    3860          408 :             .collect_vec();
    3861          408 :         stats.level0_deltas_count = Some(level0_deltas.len());
    3862          408 :         // Only compact if enough layers have accumulated.
    3863          408 :         let threshold = self.get_compaction_threshold();
    3864          408 :         if level0_deltas.is_empty() || level0_deltas.len() < threshold {
    3865            0 :             debug!(
    3866            0 :                 level0_deltas = level0_deltas.len(),
    3867            0 :                 threshold, "too few deltas to compact"
    3868            0 :             );
    3869          378 :             return Ok(CompactLevel0Phase1Result::default());
    3870           30 :         }
    3871           30 : 
    3872           30 :         // This failpoint is used together with `test_duplicate_layers` integration test.
    3873           30 :         // It returns the compaction result exactly the same layers as input to compaction.
    3874           30 :         // We want to ensure that this will not cause any problem when updating the layer map
    3875           30 :         // after the compaction is finished.
    3876           30 :         //
    3877           30 :         // Currently, there are two rare edge cases that will cause duplicated layers being
    3878           30 :         // inserted.
    3879           30 :         // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
    3880           30 :         //    is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
    3881           30 :         //    map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
    3882           30 :         //    point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
    3883           30 :         //    and this causes an overwrite. This is acceptable because the content is the same, and we should do a
    3884           30 :         //    layer replace instead of the normal remove / upload process.
    3885           30 :         // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
    3886           30 :         //    size length. Compaction will likely create the same set of n files afterwards.
    3887           30 :         //
    3888           30 :         // This failpoint is a superset of both of the cases.
    3889           30 :         if cfg!(feature = "testing") {
    3890           30 :             let active = (|| {
    3891           30 :                 ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
    3892           30 :                 false
    3893           30 :             })();
    3894           30 : 
    3895           30 :             if active {
    3896            0 :                 let mut new_layers = Vec::with_capacity(level0_deltas.len());
    3897            0 :                 for delta in &level0_deltas {
    3898              :                     // we are just faking these layers as being produced again for this failpoint
    3899            0 :                     new_layers.push(
    3900            0 :                         delta
    3901            0 :                             .download_and_keep_resident()
    3902            0 :                             .await
    3903            0 :                             .context("download layer for failpoint")?,
    3904              :                     );
    3905              :                 }
    3906            0 :                 tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
    3907            0 :                 return Ok(CompactLevel0Phase1Result {
    3908            0 :                     new_layers,
    3909            0 :                     deltas_to_compact: level0_deltas,
    3910            0 :                 });
    3911           30 :             }
    3912            0 :         }
    3913              : 
    3914              :         // Gather the files to compact in this iteration.
    3915              :         //
    3916              :         // Start with the oldest Level 0 delta file, and collect any other
    3917              :         // level 0 files that form a contiguous sequence, such that the end
    3918              :         // LSN of previous file matches the start LSN of the next file.
    3919              :         //
    3920              :         // Note that if the files don't form such a sequence, we might
    3921              :         // "compact" just a single file. That's a bit pointless, but it allows
    3922              :         // us to get rid of the level 0 file, and compact the other files on
    3923              :         // the next iteration. This could probably made smarter, but such
    3924              :         // "gaps" in the sequence of level 0 files should only happen in case
    3925              :         // of a crash, partial download from cloud storage, or something like
    3926              :         // that, so it's not a big deal in practice.
    3927          540 :         level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
    3928           30 :         let mut level0_deltas_iter = level0_deltas.iter();
    3929           30 : 
    3930           30 :         let first_level0_delta = level0_deltas_iter.next().unwrap();
    3931           30 :         let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
    3932           30 :         let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
    3933           30 : 
    3934           30 :         deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
    3935          300 :         for l in level0_deltas_iter {
    3936          270 :             let lsn_range = &l.layer_desc().lsn_range;
    3937          270 : 
    3938          270 :             if lsn_range.start != prev_lsn_end {
    3939            0 :                 break;
    3940          270 :             }
    3941          270 :             deltas_to_compact.push(l.download_and_keep_resident().await?);
    3942          270 :             prev_lsn_end = lsn_range.end;
    3943              :         }
    3944           30 :         let lsn_range = Range {
    3945           30 :             start: deltas_to_compact
    3946           30 :                 .first()
    3947           30 :                 .unwrap()
    3948           30 :                 .layer_desc()
    3949           30 :                 .lsn_range
    3950           30 :                 .start,
    3951           30 :             end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
    3952           30 :         };
    3953              : 
    3954           30 :         info!(
    3955           30 :             "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
    3956           30 :             lsn_range.start,
    3957           30 :             lsn_range.end,
    3958           30 :             deltas_to_compact.len(),
    3959           30 :             level0_deltas.len()
    3960           30 :         );
    3961              : 
    3962          300 :         for l in deltas_to_compact.iter() {
    3963          300 :             info!("compact includes {l}");
    3964              :         }
    3965              : 
    3966              :         // We don't need the original list of layers anymore. Drop it so that
    3967              :         // we don't accidentally use it later in the function.
    3968           30 :         drop(level0_deltas);
    3969           30 : 
    3970           30 :         stats.read_lock_held_prerequisites_micros = stats
    3971           30 :             .read_lock_held_spawn_blocking_startup_micros
    3972           30 :             .till_now();
    3973           30 : 
    3974           30 :         // Determine N largest holes where N is number of compacted layers.
    3975           30 :         let max_holes = deltas_to_compact.len();
    3976           30 :         let last_record_lsn = self.get_last_record_lsn();
    3977           30 :         let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
    3978           30 :         let min_hole_coverage_size = 3; // TODO: something more flexible?
    3979           30 : 
    3980           30 :         // min-heap (reserve space for one more element added before eviction)
    3981           30 :         let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
    3982           30 :         let mut prev: Option<Key> = None;
    3983           30 : 
    3984           30 :         let mut all_keys = Vec::new();
    3985              : 
    3986          300 :         for l in deltas_to_compact.iter() {
    3987         2422 :             all_keys.extend(l.load_keys(ctx).await?);
    3988              :         }
    3989              : 
    3990              :         // FIXME: should spawn_blocking the rest of this function
    3991              : 
    3992              :         // The current stdlib sorting implementation is designed in a way where it is
    3993              :         // particularly fast where the slice is made up of sorted sub-ranges.
    3994      4931318 :         all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
    3995           30 : 
    3996           30 :         stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
    3997              : 
    3998      2102000 :         for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
    3999      2102000 :             if let Some(prev_key) = prev {
    4000              :                 // just first fast filter
    4001      2101970 :                 if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
    4002            0 :                     let key_range = prev_key..next_key;
    4003            0 :                     // Measuring hole by just subtraction of i128 representation of key range boundaries
    4004            0 :                     // has not so much sense, because largest holes will corresponds field1/field2 changes.
    4005            0 :                     // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
    4006            0 :                     // That is why it is better to measure size of hole as number of covering image layers.
    4007            0 :                     let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
    4008            0 :                     if coverage_size >= min_hole_coverage_size {
    4009            0 :                         heap.push(Hole {
    4010            0 :                             key_range,
    4011            0 :                             coverage_size,
    4012            0 :                         });
    4013            0 :                         if heap.len() > max_holes {
    4014            0 :                             heap.pop(); // remove smallest hole
    4015            0 :                         }
    4016            0 :                     }
    4017      2101970 :                 }
    4018           30 :             }
    4019      2102000 :             prev = Some(next_key.next());
    4020              :         }
    4021           30 :         stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
    4022           30 :         drop_rlock(guard);
    4023           30 :         stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
    4024           30 :         let mut holes = heap.into_vec();
    4025           30 :         holes.sort_unstable_by_key(|hole| hole.key_range.start);
    4026           30 :         let mut next_hole = 0; // index of next hole in holes vector
    4027           30 : 
    4028           30 :         // This iterator walks through all key-value pairs from all the layers
    4029           30 :         // we're compacting, in key, LSN order.
    4030           30 :         let all_values_iter = all_keys.iter();
    4031           30 : 
    4032           30 :         // This iterator walks through all keys and is needed to calculate size used by each key
    4033           30 :         let mut all_keys_iter = all_keys
    4034           30 :             .iter()
    4035      2102000 :             .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
    4036      2101970 :             .coalesce(|mut prev, cur| {
    4037      2101970 :                 // Coalesce keys that belong to the same key pair.
    4038      2101970 :                 // This ensures that compaction doesn't put them
    4039      2101970 :                 // into different layer files.
    4040      2101970 :                 // Still limit this by the target file size,
    4041      2101970 :                 // so that we keep the size of the files in
    4042      2101970 :                 // check.
    4043      2101970 :                 if prev.0 == cur.0 && prev.2 < target_file_size {
    4044        92001 :                     prev.2 += cur.2;
    4045        92001 :                     Ok(prev)
    4046              :                 } else {
    4047      2009969 :                     Err((prev, cur))
    4048              :                 }
    4049      2101970 :             });
    4050           30 : 
    4051           30 :         // Merge the contents of all the input delta layers into a new set
    4052           30 :         // of delta layers, based on the current partitioning.
    4053           30 :         //
    4054           30 :         // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
    4055           30 :         // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
    4056           30 :         // would be too large. In that case, we also split on the LSN dimension.
    4057           30 :         //
    4058           30 :         // LSN
    4059           30 :         //  ^
    4060           30 :         //  |
    4061           30 :         //  | +-----------+            +--+--+--+--+
    4062           30 :         //  | |           |            |  |  |  |  |
    4063           30 :         //  | +-----------+            |  |  |  |  |
    4064           30 :         //  | |           |            |  |  |  |  |
    4065           30 :         //  | +-----------+     ==>    |  |  |  |  |
    4066           30 :         //  | |           |            |  |  |  |  |
    4067           30 :         //  | +-----------+            |  |  |  |  |
    4068           30 :         //  | |           |            |  |  |  |  |
    4069           30 :         //  | +-----------+            +--+--+--+--+
    4070           30 :         //  |
    4071           30 :         //  +--------------> key
    4072           30 :         //
    4073           30 :         //
    4074           30 :         // If one key (X) has a lot of page versions:
    4075           30 :         //
    4076           30 :         // LSN
    4077           30 :         //  ^
    4078           30 :         //  |                                 (X)
    4079           30 :         //  | +-----------+            +--+--+--+--+
    4080           30 :         //  | |           |            |  |  |  |  |
    4081           30 :         //  | +-----------+            |  |  +--+  |
    4082           30 :         //  | |           |            |  |  |  |  |
    4083           30 :         //  | +-----------+     ==>    |  |  |  |  |
    4084           30 :         //  | |           |            |  |  +--+  |
    4085           30 :         //  | +-----------+            |  |  |  |  |
    4086           30 :         //  | |           |            |  |  |  |  |
    4087           30 :         //  | +-----------+            +--+--+--+--+
    4088           30 :         //  |
    4089           30 :         //  +--------------> key
    4090           30 :         // TODO: this actually divides the layers into fixed-size chunks, not
    4091           30 :         // based on the partitioning.
    4092           30 :         //
    4093           30 :         // TODO: we should also opportunistically materialize and
    4094           30 :         // garbage collect what we can.
    4095           30 :         let mut new_layers = Vec::new();
    4096           30 :         let mut prev_key: Option<Key> = None;
    4097           30 :         let mut writer: Option<DeltaLayerWriter> = None;
    4098           30 :         let mut key_values_total_size = 0u64;
    4099           30 :         let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
    4100           30 :         let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
    4101              : 
    4102              :         for &DeltaEntry {
    4103      2102000 :             key, lsn, ref val, ..
    4104      2102030 :         } in all_values_iter
    4105              :         {
    4106      2102000 :             let value = val.load(ctx).await?;
    4107      2102000 :             let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
    4108      2102000 :             // We need to check key boundaries once we reach next key or end of layer with the same key
    4109      2102000 :             if !same_key || lsn == dup_end_lsn {
    4110      2009999 :                 let mut next_key_size = 0u64;
    4111      2009999 :                 let is_dup_layer = dup_end_lsn.is_valid();
    4112      2009999 :                 dup_start_lsn = Lsn::INVALID;
    4113      2009999 :                 if !same_key {
    4114      2009999 :                     dup_end_lsn = Lsn::INVALID;
    4115      2009999 :                 }
    4116              :                 // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
    4117      2009999 :                 for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
    4118      2009999 :                     next_key_size = next_size;
    4119      2009999 :                     if key != next_key {
    4120      2009969 :                         if dup_end_lsn.is_valid() {
    4121            0 :                             // We are writting segment with duplicates:
    4122            0 :                             // place all remaining values of this key in separate segment
    4123            0 :                             dup_start_lsn = dup_end_lsn; // new segments starts where old stops
    4124            0 :                             dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
    4125      2009969 :                         }
    4126      2009969 :                         break;
    4127           30 :                     }
    4128           30 :                     key_values_total_size += next_size;
    4129           30 :                     // Check if it is time to split segment: if total keys size is larger than target file size.
    4130           30 :                     // We need to avoid generation of empty segments if next_size > target_file_size.
    4131           30 :                     if key_values_total_size > target_file_size && lsn != next_lsn {
    4132              :                         // Split key between multiple layers: such layer can contain only single key
    4133            0 :                         dup_start_lsn = if dup_end_lsn.is_valid() {
    4134            0 :                             dup_end_lsn // new segment with duplicates starts where old one stops
    4135              :                         } else {
    4136            0 :                             lsn // start with the first LSN for this key
    4137              :                         };
    4138            0 :                         dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
    4139            0 :                         break;
    4140           30 :                     }
    4141              :                 }
    4142              :                 // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
    4143      2009999 :                 if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
    4144            0 :                     dup_start_lsn = dup_end_lsn;
    4145            0 :                     dup_end_lsn = lsn_range.end;
    4146      2009999 :                 }
    4147      2009999 :                 if writer.is_some() {
    4148      2009969 :                     let written_size = writer.as_mut().unwrap().size();
    4149      2009969 :                     let contains_hole =
    4150      2009969 :                         next_hole < holes.len() && key >= holes[next_hole].key_range.end;
    4151              :                     // check if key cause layer overflow or contains hole...
    4152      2009969 :                     if is_dup_layer
    4153      2009969 :                         || dup_end_lsn.is_valid()
    4154      2009969 :                         || written_size + key_values_total_size > target_file_size
    4155      2009969 :                         || contains_hole
    4156              :                     {
    4157              :                         // ... if so, flush previous layer and prepare to write new one
    4158            0 :                         new_layers.push(
    4159            0 :                             writer
    4160            0 :                                 .take()
    4161            0 :                                 .unwrap()
    4162            0 :                                 .finish(prev_key.unwrap().next(), self)
    4163            0 :                                 .await?,
    4164              :                         );
    4165            0 :                         writer = None;
    4166            0 : 
    4167            0 :                         if contains_hole {
    4168            0 :                             // skip hole
    4169            0 :                             next_hole += 1;
    4170            0 :                         }
    4171      2009969 :                     }
    4172           30 :                 }
    4173              :                 // Remember size of key value because at next iteration we will access next item
    4174      2009999 :                 key_values_total_size = next_key_size;
    4175        92001 :             }
    4176      2102000 :             fail_point!("delta-layer-writer-fail-before-finish", |_| {
    4177            0 :                 Err(CompactionError::Other(anyhow::anyhow!(
    4178            0 :                     "failpoint delta-layer-writer-fail-before-finish"
    4179            0 :                 )))
    4180      2102000 :             });
    4181              : 
    4182      2102000 :             if !self.shard_identity.is_key_disposable(&key) {
    4183      2102000 :                 if writer.is_none() {
    4184           30 :                     // Create writer if not initiaized yet
    4185           30 :                     writer = Some(
    4186              :                         DeltaLayerWriter::new(
    4187           30 :                             self.conf,
    4188           30 :                             self.timeline_id,
    4189           30 :                             self.tenant_shard_id,
    4190           30 :                             key,
    4191           30 :                             if dup_end_lsn.is_valid() {
    4192              :                                 // this is a layer containing slice of values of the same key
    4193            0 :                                 debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
    4194            0 :                                 dup_start_lsn..dup_end_lsn
    4195              :                             } else {
    4196            0 :                                 debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
    4197           30 :                                 lsn_range.clone()
    4198              :                             },
    4199              :                         )
    4200           15 :                         .await?,
    4201              :                     );
    4202      2101970 :                 }
    4203              : 
    4204      2102000 :                 writer.as_mut().unwrap().put_value(key, lsn, value).await?;
    4205              :             } else {
    4206            0 :                 debug!(
    4207            0 :                     "Dropping key {} during compaction (it belongs on shard {:?})",
    4208            0 :                     key,
    4209            0 :                     self.shard_identity.get_shard_number(&key)
    4210            0 :                 );
    4211              :             }
    4212              : 
    4213      2102000 :             if !new_layers.is_empty() {
    4214            0 :                 fail_point!("after-timeline-compacted-first-L1");
    4215      2102000 :             }
    4216              : 
    4217      2102000 :             prev_key = Some(key);
    4218              :         }
    4219           30 :         if let Some(writer) = writer {
    4220           61 :             new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
    4221            0 :         }
    4222              : 
    4223              :         // Sync layers
    4224           30 :         if !new_layers.is_empty() {
    4225              :             // Print a warning if the created layer is larger than double the target size
    4226              :             // Add two pages for potential overhead. This should in theory be already
    4227              :             // accounted for in the target calculation, but for very small targets,
    4228              :             // we still might easily hit the limit otherwise.
    4229           30 :             let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
    4230           30 :             for layer in new_layers.iter() {
    4231           30 :                 if layer.layer_desc().file_size > warn_limit {
    4232            0 :                     warn!(
    4233            0 :                         %layer,
    4234            0 :                         "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
    4235            0 :                     );
    4236           30 :                 }
    4237              :             }
    4238              : 
    4239              :             // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
    4240           30 :             let layer_paths: Vec<Utf8PathBuf> = new_layers
    4241           30 :                 .iter()
    4242           30 :                 .map(|l| l.local_path().to_owned())
    4243           30 :                 .collect();
    4244           30 : 
    4245           30 :             // Fsync all the layer files and directory using multiple threads to
    4246           30 :             // minimize latency.
    4247           30 :             par_fsync::par_fsync_async(&layer_paths)
    4248           30 :                 .await
    4249           30 :                 .context("fsync all new layers")?;
    4250              : 
    4251           30 :             let timeline_dir = self
    4252           30 :                 .conf
    4253           30 :                 .timeline_path(&self.tenant_shard_id, &self.timeline_id);
    4254           30 : 
    4255           30 :             par_fsync::par_fsync_async(&[timeline_dir])
    4256           30 :                 .await
    4257           30 :                 .context("fsync of timeline dir")?;
    4258            0 :         }
    4259              : 
    4260           30 :         stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
    4261           30 :         stats.new_deltas_count = Some(new_layers.len());
    4262           30 :         stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
    4263           30 : 
    4264           30 :         match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
    4265           30 :             .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
    4266              :         {
    4267           30 :             Ok(stats_json) => {
    4268           30 :                 info!(
    4269           30 :                     stats_json = stats_json.as_str(),
    4270           30 :                     "compact_level0_phase1 stats available"
    4271           30 :                 )
    4272              :             }
    4273            0 :             Err(e) => {
    4274            0 :                 warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
    4275              :             }
    4276              :         }
    4277              : 
    4278           30 :         Ok(CompactLevel0Phase1Result {
    4279           30 :             new_layers,
    4280           30 :             deltas_to_compact: deltas_to_compact
    4281           30 :                 .into_iter()
    4282          300 :                 .map(|x| x.drop_eviction_guard())
    4283           30 :                 .collect::<Vec<_>>(),
    4284           30 :         })
    4285          408 :     }
    4286              : 
    4287              :     ///
    4288              :     /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
    4289              :     /// as Level 1 files.
    4290              :     ///
    4291          408 :     async fn compact_level0(
    4292          408 :         self: &Arc<Self>,
    4293          408 :         target_file_size: u64,
    4294          408 :         ctx: &RequestContext,
    4295          408 :     ) -> Result<(), CompactionError> {
    4296              :         let CompactLevel0Phase1Result {
    4297          408 :             new_layers,
    4298          408 :             deltas_to_compact,
    4299              :         } = {
    4300          408 :             let phase1_span = info_span!("compact_level0_phase1");
    4301          408 :             let ctx = ctx.attached_child();
    4302          408 :             let mut stats = CompactLevel0Phase1StatsBuilder {
    4303          408 :                 version: Some(2),
    4304          408 :                 tenant_id: Some(self.tenant_shard_id),
    4305          408 :                 timeline_id: Some(self.timeline_id),
    4306          408 :                 ..Default::default()
    4307          408 :             };
    4308          408 : 
    4309          408 :             let begin = tokio::time::Instant::now();
    4310          408 :             let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
    4311          408 :             let now = tokio::time::Instant::now();
    4312          408 :             stats.read_lock_acquisition_micros =
    4313          408 :                 DurationRecorder::Recorded(RecordedDuration(now - begin), now);
    4314          408 :             self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
    4315          408 :                 .instrument(phase1_span)
    4316        39551 :                 .await?
    4317              :         };
    4318              : 
    4319          408 :         if new_layers.is_empty() && deltas_to_compact.is_empty() {
    4320              :             // nothing to do
    4321          378 :             return Ok(());
    4322           30 :         }
    4323              : 
    4324           30 :         let mut guard = self.layers.write().await;
    4325              : 
    4326           30 :         let mut duplicated_layers = HashSet::new();
    4327           30 : 
    4328           30 :         let mut insert_layers = Vec::with_capacity(new_layers.len());
    4329              : 
    4330           60 :         for l in &new_layers {
    4331           30 :             if guard.contains(l.as_ref()) {
    4332              :                 // expected in tests
    4333            0 :                 tracing::error!(layer=%l, "duplicated L1 layer");
    4334              : 
    4335              :                 // good ways to cause a duplicate: we repeatedly error after taking the writelock
    4336              :                 // `guard`  on self.layers. as of writing this, there are no error returns except
    4337              :                 // for compact_level0_phase1 creating an L0, which does not happen in practice
    4338              :                 // because we have not implemented L0 => L0 compaction.
    4339            0 :                 duplicated_layers.insert(l.layer_desc().key());
    4340           30 :             } else if LayerMap::is_l0(l.layer_desc()) {
    4341            0 :                 return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
    4342           30 :             } else {
    4343           30 :                 insert_layers.push(l.clone());
    4344           30 :             }
    4345              :         }
    4346              : 
    4347           30 :         let remove_layers = {
    4348           30 :             let mut deltas_to_compact = deltas_to_compact;
    4349           30 :             // only remove those inputs which were not outputs
    4350          300 :             deltas_to_compact.retain(|l| !duplicated_layers.contains(&l.layer_desc().key()));
    4351           30 :             deltas_to_compact
    4352           30 :         };
    4353           30 : 
    4354           30 :         // deletion will happen later, the layer file manager calls garbage_collect_on_drop
    4355           30 :         guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
    4356              : 
    4357           30 :         if let Some(remote_client) = self.remote_client.as_ref() {
    4358           30 :             remote_client.schedule_compaction_update(&remove_layers, &new_layers)?;
    4359            0 :         }
    4360              : 
    4361           30 :         drop_wlock(guard);
    4362           30 : 
    4363           30 :         Ok(())
    4364          408 :     }
    4365              : 
    4366              :     /// Update information about which layer files need to be retained on
    4367              :     /// garbage collection. This is separate from actually performing the GC,
    4368              :     /// and is updated more frequently, so that compaction can remove obsolete
    4369              :     /// page versions more aggressively.
    4370              :     ///
    4371              :     /// TODO: that's wishful thinking, compaction doesn't actually do that
    4372              :     /// currently.
    4373              :     ///
    4374              :     /// The caller specifies how much history is needed with the 3 arguments:
    4375              :     ///
    4376              :     /// retain_lsns: keep a version of each page at these LSNs
    4377              :     /// cutoff_horizon: also keep everything newer than this LSN
    4378              :     /// pitr: the time duration required to keep data for PITR
    4379              :     ///
    4380              :     /// The 'retain_lsns' list is currently used to prevent removing files that
    4381              :     /// are needed by child timelines. In the future, the user might be able to
    4382              :     /// name additional points in time to retain. The caller is responsible for
    4383              :     /// collecting that information.
    4384              :     ///
    4385              :     /// The 'cutoff_horizon' point is used to retain recent versions that might still be
    4386              :     /// needed by read-only nodes. (As of this writing, the caller just passes
    4387              :     /// the latest LSN subtracted by a constant, and doesn't do anything smart
    4388              :     /// to figure out what read-only nodes might actually need.)
    4389              :     ///
    4390              :     /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
    4391              :     /// whether a record is needed for PITR.
    4392              :     ///
    4393              :     /// NOTE: This function holds a short-lived lock to protect the 'gc_info'
    4394              :     /// field, so that the three values passed as argument are stored
    4395              :     /// atomically. But the caller is responsible for ensuring that no new
    4396              :     /// branches are created that would need to be included in 'retain_lsns',
    4397              :     /// for example. The caller should hold `Tenant::gc_cs` lock to ensure
    4398              :     /// that.
    4399              :     ///
    4400          816 :     #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
    4401              :     pub(super) async fn update_gc_info(
    4402              :         &self,
    4403              :         retain_lsns: Vec<Lsn>,
    4404              :         cutoff_horizon: Lsn,
    4405              :         pitr: Duration,
    4406              :         cancel: &CancellationToken,
    4407              :         ctx: &RequestContext,
    4408              :     ) -> anyhow::Result<()> {
    4409              :         // First, calculate pitr_cutoff_timestamp and then convert it to LSN.
    4410              :         //
    4411              :         // Some unit tests depend on garbage-collection working even when
    4412              :         // CLOG data is missing, so that find_lsn_for_timestamp() doesn't
    4413              :         // work, so avoid calling it altogether if time-based retention is not
    4414              :         // configured. It would be pointless anyway.
    4415              :         let pitr_cutoff = if pitr != Duration::ZERO {
    4416              :             let now = SystemTime::now();
    4417              :             if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
    4418              :                 let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
    4419              : 
    4420              :                 match self
    4421              :                     .find_lsn_for_timestamp(pitr_timestamp, cancel, ctx)
    4422              :                     .await?
    4423              :                 {
    4424              :                     LsnForTimestamp::Present(lsn) => lsn,
    4425              :                     LsnForTimestamp::Future(lsn) => {
    4426              :                         // The timestamp is in the future. That sounds impossible,
    4427              :                         // but what it really means is that there hasn't been
    4428              :                         // any commits since the cutoff timestamp.
    4429            0 :                         debug!("future({})", lsn);
    4430              :                         cutoff_horizon
    4431              :                     }
    4432              :                     LsnForTimestamp::Past(lsn) => {
    4433            0 :                         debug!("past({})", lsn);
    4434              :                         // conservative, safe default is to remove nothing, when we
    4435              :                         // have no commit timestamp data available
    4436              :                         *self.get_latest_gc_cutoff_lsn()
    4437              :                     }
    4438              :                     LsnForTimestamp::NoData(lsn) => {
    4439            0 :                         debug!("nodata({})", lsn);
    4440              :                         // conservative, safe default is to remove nothing, when we
    4441              :                         // have no commit timestamp data available
    4442              :                         *self.get_latest_gc_cutoff_lsn()
    4443              :                     }
    4444              :                 }
    4445              :             } else {
    4446              :                 // If we don't have enough data to convert to LSN,
    4447              :                 // play safe and don't remove any layers.
    4448              :                 *self.get_latest_gc_cutoff_lsn()
    4449              :             }
    4450              :         } else {
    4451              :             // No time-based retention was configured. Set time-based cutoff to
    4452              :             // same as LSN based.
    4453              :             cutoff_horizon
    4454              :         };
    4455              : 
    4456              :         // Grab the lock and update the values
    4457              :         *self.gc_info.write().unwrap() = GcInfo {
    4458              :             retain_lsns,
    4459              :             horizon_cutoff: cutoff_horizon,
    4460              :             pitr_cutoff,
    4461              :         };
    4462              : 
    4463              :         Ok(())
    4464              :     }
    4465              : 
    4466              :     /// Garbage collect layer files on a timeline that are no longer needed.
    4467              :     ///
    4468              :     /// Currently, we don't make any attempt at removing unneeded page versions
    4469              :     /// within a layer file. We can only remove the whole file if it's fully
    4470              :     /// obsolete.
    4471          408 :     pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
    4472          408 :         // this is most likely the background tasks, but it might be the spawned task from
    4473          408 :         // immediate_gc
    4474          408 :         let cancel = crate::task_mgr::shutdown_token();
    4475          408 :         let _g = tokio::select! {
    4476          408 :             guard = self.gc_lock.lock() => guard,
    4477              :             _ = self.cancel.cancelled() => return Ok(GcResult::default()),
    4478              :             _ = cancel.cancelled() => return Ok(GcResult::default()),
    4479              :         };
    4480          408 :         let timer = self.metrics.garbage_collect_histo.start_timer();
    4481              : 
    4482            0 :         fail_point!("before-timeline-gc");
    4483              : 
    4484              :         // Is the timeline being deleted?
    4485          408 :         if self.is_stopping() {
    4486            0 :             anyhow::bail!("timeline is Stopping");
    4487          408 :         }
    4488          408 : 
    4489          408 :         let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
    4490          408 :             let gc_info = self.gc_info.read().unwrap();
    4491          408 : 
    4492          408 :             let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
    4493          408 :             let pitr_cutoff = gc_info.pitr_cutoff;
    4494          408 :             let retain_lsns = gc_info.retain_lsns.clone();
    4495          408 :             (horizon_cutoff, pitr_cutoff, retain_lsns)
    4496          408 :         };
    4497          408 : 
    4498          408 :         let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
    4499              : 
    4500          408 :         let res = self
    4501          408 :             .gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
    4502          408 :             .instrument(
    4503          408 :                 info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
    4504              :             )
    4505            1 :             .await?;
    4506              : 
    4507              :         // only record successes
    4508          408 :         timer.stop_and_record();
    4509          408 : 
    4510          408 :         Ok(res)
    4511          408 :     }
    4512              : 
    4513          408 :     async fn gc_timeline(
    4514          408 :         &self,
    4515          408 :         horizon_cutoff: Lsn,
    4516          408 :         pitr_cutoff: Lsn,
    4517          408 :         retain_lsns: Vec<Lsn>,
    4518          408 :         new_gc_cutoff: Lsn,
    4519          408 :     ) -> anyhow::Result<GcResult> {
    4520          408 :         let now = SystemTime::now();
    4521          408 :         let mut result: GcResult = GcResult::default();
    4522          408 : 
    4523          408 :         // Nothing to GC. Return early.
    4524          408 :         let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
    4525          408 :         if latest_gc_cutoff >= new_gc_cutoff {
    4526            0 :             info!(
    4527            0 :                 "Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
    4528            0 :             );
    4529            0 :             return Ok(result);
    4530          408 :         }
    4531              : 
    4532              :         // We need to ensure that no one tries to read page versions or create
    4533              :         // branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
    4534              :         // for details. This will block until the old value is no longer in use.
    4535              :         //
    4536              :         // The GC cutoff should only ever move forwards.
    4537          408 :         let waitlist = {
    4538          408 :             let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
    4539          408 :             ensure!(
    4540          408 :                 *write_guard <= new_gc_cutoff,
    4541            0 :                 "Cannot move GC cutoff LSN backwards (was {}, new {})",
    4542            0 :                 *write_guard,
    4543              :                 new_gc_cutoff
    4544              :             );
    4545          408 :             write_guard.store_and_unlock(new_gc_cutoff)
    4546          408 :         };
    4547          408 :         waitlist.wait().await;
    4548              : 
    4549          408 :         info!("GC starting");
    4550              : 
    4551            0 :         debug!("retain_lsns: {:?}", retain_lsns);
    4552              : 
    4553          408 :         let mut layers_to_remove = Vec::new();
    4554          408 :         let mut wanted_image_layers = KeySpaceRandomAccum::default();
    4555              : 
    4556              :         // Scan all layers in the timeline (remote or on-disk).
    4557              :         //
    4558              :         // Garbage collect the layer if all conditions are satisfied:
    4559              :         // 1. it is older than cutoff LSN;
    4560              :         // 2. it is older than PITR interval;
    4561              :         // 3. it doesn't need to be retained for 'retain_lsns';
    4562              :         // 4. newer on-disk image layers cover the layer's whole key range
    4563              :         //
    4564              :         // TODO holding a write lock is too agressive and avoidable
    4565          408 :         let mut guard = self.layers.write().await;
    4566          408 :         let layers = guard.layer_map();
    4567         2404 :         'outer: for l in layers.iter_historic_layers() {
    4568         2404 :             result.layers_total += 1;
    4569         2404 : 
    4570         2404 :             // 1. Is it newer than GC horizon cutoff point?
    4571         2404 :             if l.get_lsn_range().end > horizon_cutoff {
    4572            0 :                 debug!(
    4573            0 :                     "keeping {} because it's newer than horizon_cutoff {}",
    4574            0 :                     l.filename(),
    4575            0 :                     horizon_cutoff,
    4576            0 :                 );
    4577          408 :                 result.layers_needed_by_cutoff += 1;
    4578          408 :                 continue 'outer;
    4579         1996 :             }
    4580         1996 : 
    4581         1996 :             // 2. It is newer than PiTR cutoff point?
    4582         1996 :             if l.get_lsn_range().end > pitr_cutoff {
    4583            0 :                 debug!(
    4584            0 :                     "keeping {} because it's newer than pitr_cutoff {}",
    4585            0 :                     l.filename(),
    4586            0 :                     pitr_cutoff,
    4587            0 :                 );
    4588            0 :                 result.layers_needed_by_pitr += 1;
    4589            0 :                 continue 'outer;
    4590         1996 :             }
    4591              : 
    4592              :             // 3. Is it needed by a child branch?
    4593              :             // NOTE With that we would keep data that
    4594              :             // might be referenced by child branches forever.
    4595              :             // We can track this in child timeline GC and delete parent layers when
    4596              :             // they are no longer needed. This might be complicated with long inheritance chains.
    4597              :             //
    4598              :             // TODO Vec is not a great choice for `retain_lsns`
    4599         1996 :             for retain_lsn in &retain_lsns {
    4600              :                 // start_lsn is inclusive
    4601           12 :                 if &l.get_lsn_range().start <= retain_lsn {
    4602            0 :                     debug!(
    4603            0 :                         "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
    4604            0 :                         l.filename(),
    4605            0 :                         retain_lsn,
    4606            0 :                         l.is_incremental(),
    4607            0 :                     );
    4608           12 :                     result.layers_needed_by_branches += 1;
    4609           12 :                     continue 'outer;
    4610            0 :                 }
    4611              :             }
    4612              : 
    4613              :             // 4. Is there a later on-disk layer for this relation?
    4614              :             //
    4615              :             // The end-LSN is exclusive, while disk_consistent_lsn is
    4616              :             // inclusive. For example, if disk_consistent_lsn is 100, it is
    4617              :             // OK for a delta layer to have end LSN 101, but if the end LSN
    4618              :             // is 102, then it might not have been fully flushed to disk
    4619              :             // before crash.
    4620              :             //
    4621              :             // For example, imagine that the following layers exist:
    4622              :             //
    4623              :             // 1000      - image (A)
    4624              :             // 1000-2000 - delta (B)
    4625              :             // 2000      - image (C)
    4626              :             // 2000-3000 - delta (D)
    4627              :             // 3000      - image (E)
    4628              :             //
    4629              :             // If GC horizon is at 2500, we can remove layers A and B, but
    4630              :             // we cannot remove C, even though it's older than 2500, because
    4631              :             // the delta layer 2000-3000 depends on it.
    4632         1984 :             if !layers
    4633         1984 :                 .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
    4634              :             {
    4635            0 :                 debug!("keeping {} because it is the latest layer", l.filename());
    4636              :                 // Collect delta key ranges that need image layers to allow garbage
    4637              :                 // collecting the layers.
    4638              :                 // It is not so obvious whether we need to propagate information only about
    4639              :                 // delta layers. Image layers can form "stairs" preventing old image from been deleted.
    4640              :                 // But image layers are in any case less sparse than delta layers. Also we need some
    4641              :                 // protection from replacing recent image layers with new one after each GC iteration.
    4642         1984 :                 if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
    4643            0 :                     wanted_image_layers.add_range(l.get_key_range());
    4644         1984 :                 }
    4645         1984 :                 result.layers_not_updated += 1;
    4646         1984 :                 continue 'outer;
    4647            0 :             }
    4648              : 
    4649              :             // We didn't find any reason to keep this file, so remove it.
    4650            0 :             debug!(
    4651            0 :                 "garbage collecting {} is_dropped: xx is_incremental: {}",
    4652            0 :                 l.filename(),
    4653            0 :                 l.is_incremental(),
    4654            0 :             );
    4655            0 :             layers_to_remove.push(l);
    4656              :         }
    4657          408 :         self.wanted_image_layers
    4658          408 :             .lock()
    4659          408 :             .unwrap()
    4660          408 :             .replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
    4661          408 : 
    4662          408 :         if !layers_to_remove.is_empty() {
    4663              :             // Persist the new GC cutoff value in the metadata file, before
    4664              :             // we actually remove anything.
    4665              :             //
    4666              :             // This does not in fact have any effect as we no longer consider local metadata unless
    4667              :             // running without remote storage.
    4668              :             //
    4669              :             // This unconditionally schedules also an index_part.json update, even though, we will
    4670              :             // be doing one a bit later with the unlinked gc'd layers.
    4671              :             //
    4672              :             // TODO: remove when implementing <https://github.com/neondatabase/neon/issues/4099>.
    4673            0 :             self.update_metadata_file(self.disk_consistent_lsn.load(), None)
    4674            0 :                 .await?;
    4675              : 
    4676            0 :             let gc_layers = layers_to_remove
    4677            0 :                 .iter()
    4678            0 :                 .map(|x| guard.get_from_desc(x))
    4679            0 :                 .collect::<Vec<Layer>>();
    4680            0 : 
    4681            0 :             result.layers_removed = gc_layers.len() as u64;
    4682              : 
    4683            0 :             if let Some(remote_client) = self.remote_client.as_ref() {
    4684            0 :                 remote_client.schedule_gc_update(&gc_layers)?;
    4685            0 :             }
    4686              : 
    4687            0 :             guard.finish_gc_timeline(&gc_layers);
    4688            0 : 
    4689            0 :             #[cfg(feature = "testing")]
    4690            0 :             {
    4691            0 :                 result.doomed_layers = gc_layers;
    4692            0 :             }
    4693          408 :         }
    4694              : 
    4695          408 :         info!(
    4696          408 :             "GC completed removing {} layers, cutoff {}",
    4697          408 :             result.layers_removed, new_gc_cutoff
    4698          408 :         );
    4699              : 
    4700          408 :         result.elapsed = now.elapsed()?;
    4701          408 :         Ok(result)
    4702          408 :     }
    4703              : 
    4704              :     /// Reconstruct a value, using the given base image and WAL records in 'data'.
    4705       502698 :     async fn reconstruct_value(
    4706       502698 :         &self,
    4707       502698 :         key: Key,
    4708       502698 :         request_lsn: Lsn,
    4709       502698 :         mut data: ValueReconstructState,
    4710       502698 :     ) -> Result<Bytes, PageReconstructError> {
    4711       502698 :         // Perform WAL redo if needed
    4712       502698 :         data.records.reverse();
    4713       502698 : 
    4714       502698 :         // If we have a page image, and no WAL, we're all set
    4715       502698 :         if data.records.is_empty() {
    4716       502690 :             if let Some((img_lsn, img)) = &data.img {
    4717            0 :                 trace!(
    4718            0 :                     "found page image for key {} at {}, no WAL redo required, req LSN {}",
    4719            0 :                     key,
    4720            0 :                     img_lsn,
    4721            0 :                     request_lsn,
    4722            0 :                 );
    4723       502690 :                 Ok(img.clone())
    4724              :             } else {
    4725            0 :                 Err(PageReconstructError::from(anyhow!(
    4726            0 :                     "base image for {key} at {request_lsn} not found"
    4727            0 :                 )))
    4728              :             }
    4729              :         } else {
    4730              :             // We need to do WAL redo.
    4731              :             //
    4732              :             // If we don't have a base image, then the oldest WAL record better initialize
    4733              :             // the page
    4734            8 :             if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
    4735            0 :                 Err(PageReconstructError::from(anyhow!(
    4736            0 :                     "Base image for {} at {} not found, but got {} WAL records",
    4737            0 :                     key,
    4738            0 :                     request_lsn,
    4739            0 :                     data.records.len()
    4740            0 :                 )))
    4741              :             } else {
    4742            8 :                 if data.img.is_some() {
    4743            0 :                     trace!(
    4744            0 :                         "found {} WAL records and a base image for {} at {}, performing WAL redo",
    4745            0 :                         data.records.len(),
    4746            0 :                         key,
    4747            0 :                         request_lsn
    4748            0 :                     );
    4749              :                 } else {
    4750            0 :                     trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
    4751              :                 };
    4752              : 
    4753            8 :                 let last_rec_lsn = data.records.last().unwrap().0;
    4754              : 
    4755            8 :                 let img = match self
    4756            8 :                     .walredo_mgr
    4757            8 :                     .as_ref()
    4758            8 :                     .context("timeline has no walredo manager")
    4759            8 :                     .map_err(PageReconstructError::WalRedo)?
    4760            8 :                     .request_redo(key, request_lsn, data.img, data.records, self.pg_version)
    4761            0 :                     .await
    4762            8 :                     .context("reconstruct a page image")
    4763              :                 {
    4764            8 :                     Ok(img) => img,
    4765            0 :                     Err(e) => return Err(PageReconstructError::WalRedo(e)),
    4766              :                 };
    4767              : 
    4768            8 :                 if img.len() == page_cache::PAGE_SZ {
    4769            0 :                     let cache = page_cache::get();
    4770            0 :                     if let Err(e) = cache
    4771            0 :                         .memorize_materialized_page(
    4772            0 :                             self.tenant_shard_id,
    4773            0 :                             self.timeline_id,
    4774            0 :                             key,
    4775            0 :                             last_rec_lsn,
    4776            0 :                             &img,
    4777            0 :                         )
    4778            0 :                         .await
    4779            0 :                         .context("Materialized page memoization failed")
    4780              :                     {
    4781            0 :                         return Err(PageReconstructError::from(e));
    4782            0 :                     }
    4783            8 :                 }
    4784              : 
    4785            8 :                 Ok(img)
    4786              :             }
    4787              :         }
    4788       502698 :     }
    4789              : 
    4790            0 :     pub(crate) async fn spawn_download_all_remote_layers(
    4791            0 :         self: Arc<Self>,
    4792            0 :         request: DownloadRemoteLayersTaskSpawnRequest,
    4793            0 :     ) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
    4794            0 :         use pageserver_api::models::DownloadRemoteLayersTaskState;
    4795            0 : 
    4796            0 :         // this is not really needed anymore; it has tests which really check the return value from
    4797            0 :         // http api. it would be better not to maintain this anymore.
    4798            0 : 
    4799            0 :         let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
    4800            0 :         if let Some(st) = &*status_guard {
    4801            0 :             match &st.state {
    4802              :                 DownloadRemoteLayersTaskState::Running => {
    4803            0 :                     return Err(st.clone());
    4804              :                 }
    4805              :                 DownloadRemoteLayersTaskState::ShutDown
    4806            0 :                 | DownloadRemoteLayersTaskState::Completed => {
    4807            0 :                     *status_guard = None;
    4808            0 :                 }
    4809              :             }
    4810            0 :         }
    4811              : 
    4812            0 :         let self_clone = Arc::clone(&self);
    4813            0 :         let task_id = task_mgr::spawn(
    4814            0 :             task_mgr::BACKGROUND_RUNTIME.handle(),
    4815            0 :             task_mgr::TaskKind::DownloadAllRemoteLayers,
    4816            0 :             Some(self.tenant_shard_id),
    4817            0 :             Some(self.timeline_id),
    4818            0 :             "download all remote layers task",
    4819              :             false,
    4820            0 :             async move {
    4821            0 :                 self_clone.download_all_remote_layers(request).await;
    4822            0 :                 let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
    4823            0 :                  match &mut *status_guard {
    4824              :                     None => {
    4825            0 :                         warn!("tasks status is supposed to be Some(), since we are running");
    4826              :                     }
    4827            0 :                     Some(st) => {
    4828            0 :                         let exp_task_id = format!("{}", task_mgr::current_task_id().unwrap());
    4829            0 :                         if st.task_id != exp_task_id {
    4830            0 :                             warn!("task id changed while we were still running, expecting {} but have {}", exp_task_id, st.task_id);
    4831            0 :                         } else {
    4832            0 :                             st.state = DownloadRemoteLayersTaskState::Completed;
    4833            0 :                         }
    4834              :                     }
    4835              :                 };
    4836            0 :                 Ok(())
    4837            0 :             }
    4838            0 :             .instrument(info_span!(parent: None, "download_all_remote_layers", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
    4839              :         );
    4840              : 
    4841            0 :         let initial_info = DownloadRemoteLayersTaskInfo {
    4842            0 :             task_id: format!("{task_id}"),
    4843            0 :             state: DownloadRemoteLayersTaskState::Running,
    4844            0 :             total_layer_count: 0,
    4845            0 :             successful_download_count: 0,
    4846            0 :             failed_download_count: 0,
    4847            0 :         };
    4848            0 :         *status_guard = Some(initial_info.clone());
    4849            0 : 
    4850            0 :         Ok(initial_info)
    4851            0 :     }
    4852              : 
    4853            0 :     async fn download_all_remote_layers(
    4854            0 :         self: &Arc<Self>,
    4855            0 :         request: DownloadRemoteLayersTaskSpawnRequest,
    4856            0 :     ) {
    4857              :         use pageserver_api::models::DownloadRemoteLayersTaskState;
    4858              : 
    4859            0 :         let remaining = {
    4860            0 :             let guard = self.layers.read().await;
    4861            0 :             guard
    4862            0 :                 .layer_map()
    4863            0 :                 .iter_historic_layers()
    4864            0 :                 .map(|desc| guard.get_from_desc(&desc))
    4865            0 :                 .collect::<Vec<_>>()
    4866            0 :         };
    4867            0 :         let total_layer_count = remaining.len();
    4868            0 : 
    4869            0 :         macro_rules! lock_status {
    4870            0 :             ($st:ident) => {
    4871            0 :                 let mut st = self.download_all_remote_layers_task_info.write().unwrap();
    4872            0 :                 let st = st
    4873            0 :                     .as_mut()
    4874            0 :                     .expect("this function is only called after the task has been spawned");
    4875            0 :                 assert_eq!(
    4876            0 :                     st.task_id,
    4877            0 :                     format!(
    4878            0 :                         "{}",
    4879            0 :                         task_mgr::current_task_id().expect("we run inside a task_mgr task")
    4880            0 :                     )
    4881            0 :                 );
    4882            0 :                 let $st = st;
    4883            0 :             };
    4884            0 :         }
    4885            0 : 
    4886            0 :         {
    4887            0 :             lock_status!(st);
    4888            0 :             st.total_layer_count = total_layer_count as u64;
    4889            0 :         }
    4890            0 : 
    4891            0 :         let mut remaining = remaining.into_iter();
    4892            0 :         let mut have_remaining = true;
    4893            0 :         let mut js = tokio::task::JoinSet::new();
    4894            0 : 
    4895            0 :         let cancel = task_mgr::shutdown_token();
    4896            0 : 
    4897            0 :         let limit = request.max_concurrent_downloads;
    4898              : 
    4899              :         loop {
    4900            0 :             while js.len() < limit.get() && have_remaining && !cancel.is_cancelled() {
    4901            0 :                 let Some(next) = remaining.next() else {
    4902            0 :                     have_remaining = false;
    4903            0 :                     break;
    4904              :                 };
    4905              : 
    4906            0 :                 let span = tracing::info_span!("download", layer = %next);
    4907              : 
    4908            0 :                 js.spawn(
    4909            0 :                     async move {
    4910            0 :                         let res = next.download().await;
    4911            0 :                         (next, res)
    4912            0 :                     }
    4913            0 :                     .instrument(span),
    4914            0 :                 );
    4915              :             }
    4916              : 
    4917            0 :             while let Some(res) = js.join_next().await {
    4918            0 :                 match res {
    4919              :                     Ok((_, Ok(_))) => {
    4920            0 :                         lock_status!(st);
    4921            0 :                         st.successful_download_count += 1;
    4922              :                     }
    4923            0 :                     Ok((layer, Err(e))) => {
    4924            0 :                         tracing::error!(%layer, "download failed: {e:#}");
    4925            0 :                         lock_status!(st);
    4926            0 :                         st.failed_download_count += 1;
    4927              :                     }
    4928            0 :                     Err(je) if je.is_cancelled() => unreachable!("not used here"),
    4929            0 :                     Err(je) if je.is_panic() => {
    4930            0 :                         lock_status!(st);
    4931            0 :                         st.failed_download_count += 1;
    4932              :                     }
    4933            0 :                     Err(je) => tracing::warn!("unknown joinerror: {je:?}"),
    4934              :                 }
    4935              :             }
    4936              : 
    4937            0 :             if js.is_empty() && (!have_remaining || cancel.is_cancelled()) {
    4938            0 :                 break;
    4939            0 :             }
    4940              :         }
    4941              : 
    4942              :         {
    4943            0 :             lock_status!(st);
    4944            0 :             st.state = DownloadRemoteLayersTaskState::Completed;
    4945            0 :         }
    4946            0 :     }
    4947              : 
    4948            0 :     pub(crate) fn get_download_all_remote_layers_task_info(
    4949            0 :         &self,
    4950            0 :     ) -> Option<DownloadRemoteLayersTaskInfo> {
    4951            0 :         self.download_all_remote_layers_task_info
    4952            0 :             .read()
    4953            0 :             .unwrap()
    4954            0 :             .clone()
    4955            0 :     }
    4956              : }
    4957              : 
    4958              : impl Timeline {
    4959              :     /// Returns non-remote layers for eviction.
    4960            0 :     pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
    4961            0 :         let guard = self.layers.read().await;
    4962            0 :         let mut max_layer_size: Option<u64> = None;
    4963              : 
    4964            0 :         let resident_layers = guard
    4965            0 :             .resident_layers()
    4966            0 :             .map(|layer| {
    4967            0 :                 let file_size = layer.layer_desc().file_size;
    4968            0 :                 max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
    4969            0 : 
    4970            0 :                 let last_activity_ts = layer.access_stats().latest_activity_or_now();
    4971            0 : 
    4972            0 :                 EvictionCandidate {
    4973            0 :                     layer: layer.into(),
    4974            0 :                     last_activity_ts,
    4975            0 :                     relative_last_activity: finite_f32::FiniteF32::ZERO,
    4976            0 :                 }
    4977            0 :             })
    4978            0 :             .collect()
    4979            0 :             .await;
    4980              : 
    4981            0 :         DiskUsageEvictionInfo {
    4982            0 :             max_layer_size,
    4983            0 :             resident_layers,
    4984            0 :         }
    4985            0 :     }
    4986              : 
    4987          556 :     pub(crate) fn get_shard_index(&self) -> ShardIndex {
    4988          556 :         ShardIndex {
    4989          556 :             shard_number: self.tenant_shard_id.shard_number,
    4990          556 :             shard_count: self.tenant_shard_id.shard_count,
    4991          556 :         }
    4992          556 :     }
    4993              : }
    4994              : 
    4995              : type TraversalPathItem = (
    4996              :     ValueReconstructResult,
    4997              :     Lsn,
    4998              :     Box<dyn Send + FnOnce() -> TraversalId>,
    4999              : );
    5000              : 
    5001              : /// Helper function for get_reconstruct_data() to add the path of layers traversed
    5002              : /// to an error, as anyhow context information.
    5003          106 : fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructError {
    5004          106 :     // We want the original 'msg' to be the outermost context. The outermost context
    5005          106 :     // is the most high-level information, which also gets propagated to the client.
    5006          106 :     let mut msg_iter = path
    5007          106 :         .into_iter()
    5008          108 :         .map(|(r, c, l)| {
    5009          108 :             format!(
    5010          108 :                 "layer traversal: result {:?}, cont_lsn {}, layer: {}",
    5011          108 :                 r,
    5012          108 :                 c,
    5013          108 :                 l(),
    5014          108 :             )
    5015          108 :         })
    5016          106 :         .chain(std::iter::once(msg));
    5017          106 :     // Construct initial message from the first traversed layer
    5018          106 :     let err = anyhow!(msg_iter.next().unwrap());
    5019          106 : 
    5020          106 :     // Append all subsequent traversals, and the error message 'msg', as contexts.
    5021          108 :     let msg = msg_iter.fold(err, |err, msg| err.context(msg));
    5022          106 :     PageReconstructError::from(msg)
    5023          106 : }
    5024              : 
    5025              : struct TimelineWriterState {
    5026              :     open_layer: Arc<InMemoryLayer>,
    5027              :     current_size: u64,
    5028              :     // Previous Lsn which passed through
    5029              :     prev_lsn: Option<Lsn>,
    5030              :     // Largest Lsn which passed through the current writer
    5031              :     max_lsn: Option<Lsn>,
    5032              :     // Cached details of the last freeze. Avoids going trough the atomic/lock on every put.
    5033              :     cached_last_freeze_at: Lsn,
    5034              :     cached_last_freeze_ts: Instant,
    5035              : }
    5036              : 
    5037              : impl TimelineWriterState {
    5038      2627992 :     fn new(
    5039      2627992 :         open_layer: Arc<InMemoryLayer>,
    5040      2627992 :         current_size: u64,
    5041      2627992 :         last_freeze_at: Lsn,
    5042      2627992 :         last_freeze_ts: Instant,
    5043      2627992 :     ) -> Self {
    5044      2627992 :         Self {
    5045      2627992 :             open_layer,
    5046      2627992 :             current_size,
    5047      2627992 :             prev_lsn: None,
    5048      2627992 :             max_lsn: None,
    5049      2627992 :             cached_last_freeze_at: last_freeze_at,
    5050      2627992 :             cached_last_freeze_ts: last_freeze_ts,
    5051      2627992 :         }
    5052      2627992 :     }
    5053              : }
    5054              : 
    5055              : /// Various functions to mutate the timeline.
    5056              : // TODO Currently, Deref is used to allow easy access to read methods from this trait.
    5057              : // This is probably considered a bad practice in Rust and should be fixed eventually,
    5058              : // but will cause large code changes.
    5059              : pub(crate) struct TimelineWriter<'a> {
    5060              :     tl: &'a Timeline,
    5061              :     write_guard: tokio::sync::MutexGuard<'a, Option<TimelineWriterState>>,
    5062              : }
    5063              : 
    5064              : impl Deref for TimelineWriter<'_> {
    5065              :     type Target = Timeline;
    5066              : 
    5067      5258576 :     fn deref(&self) -> &Self::Target {
    5068      5258576 :         self.tl
    5069      5258576 :     }
    5070              : }
    5071              : 
    5072              : impl Drop for TimelineWriter<'_> {
    5073      2957008 :     fn drop(&mut self) {
    5074      2957008 :         self.write_guard.take();
    5075      2957008 :     }
    5076              : }
    5077              : 
    5078              : enum OpenLayerAction {
    5079              :     Roll,
    5080              :     Open,
    5081              :     None,
    5082              : }
    5083              : 
    5084              : impl<'a> TimelineWriter<'a> {
    5085              :     /// Put a new page version that can be constructed from a WAL record
    5086              :     ///
    5087              :     /// This will implicitly extend the relation, if the page is beyond the
    5088              :     /// current end-of-file.
    5089      2913766 :     pub(crate) async fn put(
    5090      2913766 :         &mut self,
    5091      2913766 :         key: Key,
    5092      2913766 :         lsn: Lsn,
    5093      2913766 :         value: &Value,
    5094      2913766 :         ctx: &RequestContext,
    5095      2913766 :     ) -> anyhow::Result<()> {
    5096      2913766 :         // Avoid doing allocations for "small" values.
    5097      2913766 :         // In the regression test suite, the limit of 256 avoided allocations in 95% of cases:
    5098      2913766 :         // https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061
    5099      2913766 :         let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
    5100      2913766 :         buf.clear();
    5101      2913766 :         value.ser_into(&mut buf)?;
    5102      2913766 :         let buf_size: u64 = buf.len().try_into().expect("oversized value buf");
    5103      2913766 : 
    5104      2913766 :         let action = self.get_open_layer_action(lsn, buf_size);
    5105      2913766 :         let layer = self.handle_open_layer_action(lsn, action).await?;
    5106      2913766 :         let res = layer.put_value(key, lsn, &buf, ctx).await;
    5107              : 
    5108      2913766 :         if res.is_ok() {
    5109      2913766 :             // Update the current size only when the entire write was ok.
    5110      2913766 :             // In case of failures, we may have had partial writes which
    5111      2913766 :             // render the size tracking out of sync. That's ok because
    5112      2913766 :             // the checkpoint distance should be significantly smaller
    5113      2913766 :             // than the S3 single shot upload limit of 5GiB.
    5114      2913766 :             let state = self.write_guard.as_mut().unwrap();
    5115      2913766 : 
    5116      2913766 :             state.current_size += buf_size;
    5117      2913766 :             state.prev_lsn = Some(lsn);
    5118      2913766 :             state.max_lsn = std::cmp::max(state.max_lsn, Some(lsn));
    5119      2913766 :         }
    5120              : 
    5121      2913766 :         res
    5122      2913766 :     }
    5123              : 
    5124      2913768 :     async fn handle_open_layer_action(
    5125      2913768 :         &mut self,
    5126      2913768 :         at: Lsn,
    5127      2913768 :         action: OpenLayerAction,
    5128      2913768 :     ) -> anyhow::Result<&Arc<InMemoryLayer>> {
    5129      2913768 :         match action {
    5130              :             OpenLayerAction::Roll => {
    5131            0 :                 let max_lsn = self.write_guard.as_ref().unwrap().max_lsn.unwrap();
    5132            0 :                 self.tl.freeze_inmem_layer_at(max_lsn).await;
    5133              : 
    5134            0 :                 let now = Instant::now();
    5135            0 :                 *(self.last_freeze_ts.write().unwrap()) = now;
    5136            0 : 
    5137            0 :                 self.tl.flush_frozen_layers();
    5138            0 : 
    5139            0 :                 let current_size = self.write_guard.as_ref().unwrap().current_size;
    5140            0 :                 if current_size > self.get_checkpoint_distance() {
    5141            0 :                     warn!("Flushed oversized open layer with size {}", current_size)
    5142            0 :                 }
    5143              : 
    5144            0 :                 assert!(self.write_guard.is_some());
    5145              : 
    5146            0 :                 let layer = self.tl.get_layer_for_write(at).await?;
    5147            0 :                 let initial_size = layer.size().await?;
    5148            0 :                 self.write_guard.replace(TimelineWriterState::new(
    5149            0 :                     layer,
    5150            0 :                     initial_size,
    5151            0 :                     Lsn(max_lsn.0 + 1),
    5152            0 :                     now,
    5153            0 :                 ));
    5154              :             }
    5155              :             OpenLayerAction::Open => {
    5156      2627992 :                 assert!(self.write_guard.is_none());
    5157              : 
    5158      2627992 :                 let layer = self.tl.get_layer_for_write(at).await?;
    5159      2627992 :                 let initial_size = layer.size().await?;
    5160              : 
    5161      2627992 :                 let last_freeze_at = self.last_freeze_at.load();
    5162      2627992 :                 let last_freeze_ts = *self.last_freeze_ts.read().unwrap();
    5163      2627992 :                 self.write_guard.replace(TimelineWriterState::new(
    5164      2627992 :                     layer,
    5165      2627992 :                     initial_size,
    5166      2627992 :                     last_freeze_at,
    5167      2627992 :                     last_freeze_ts,
    5168      2627992 :                 ));
    5169              :             }
    5170              :             OpenLayerAction::None => {
    5171       285776 :                 assert!(self.write_guard.is_some());
    5172              :             }
    5173              :         }
    5174              : 
    5175      2913768 :         Ok(&self.write_guard.as_ref().unwrap().open_layer)
    5176      2913768 :     }
    5177              : 
    5178      2913768 :     fn get_open_layer_action(&self, lsn: Lsn, new_value_size: u64) -> OpenLayerAction {
    5179      2913768 :         let state = &*self.write_guard;
    5180      2913768 :         let Some(state) = &state else {
    5181      2627992 :             return OpenLayerAction::Open;
    5182              :         };
    5183              : 
    5184       285776 :         if state.prev_lsn == Some(lsn) {
    5185              :             // Rolling mid LSN is not supported by downstream code.
    5186              :             // Hence, only roll at LSN boundaries.
    5187       285728 :             return OpenLayerAction::None;
    5188           48 :         }
    5189           48 : 
    5190           48 :         let distance = lsn.widening_sub(state.cached_last_freeze_at);
    5191           48 :         let proposed_open_layer_size = state.current_size + new_value_size;
    5192           48 : 
    5193           48 :         // Rolling the open layer can be triggered by:
    5194           48 :         // 1. The distance from the last LSN we rolled at. This bounds the amount of WAL that
    5195           48 :         //    the safekeepers need to store.  For sharded tenants, we multiply by shard count to
    5196           48 :         //    account for how writes are distributed across shards: we expect each node to consume
    5197           48 :         //    1/count of the LSN on average.
    5198           48 :         // 2. The size of the currently open layer.
    5199           48 :         // 3. The time since the last roll. It helps safekeepers to regard pageserver as caught
    5200           48 :         //    up and suspend activity.
    5201           48 :         if distance
    5202           48 :             >= self.get_checkpoint_distance() as i128 * self.shard_identity.count.count() as i128
    5203              :         {
    5204            0 :             info!(
    5205            0 :                 "Will roll layer at {} with layer size {} due to LSN distance ({})",
    5206            0 :                 lsn, state.current_size, distance
    5207            0 :             );
    5208              : 
    5209            0 :             OpenLayerAction::Roll
    5210           48 :         } else if state.current_size > 0
    5211           48 :             && proposed_open_layer_size >= self.get_checkpoint_distance()
    5212              :         {
    5213            0 :             info!(
    5214            0 :                 "Will roll layer at {} with layer size {} due to layer size ({})",
    5215            0 :                 lsn, state.current_size, proposed_open_layer_size
    5216            0 :             );
    5217              : 
    5218            0 :             OpenLayerAction::Roll
    5219           48 :         } else if distance > 0
    5220           48 :             && state.cached_last_freeze_ts.elapsed() >= self.get_checkpoint_timeout()
    5221              :         {
    5222            0 :             info!(
    5223            0 :                 "Will roll layer at {} with layer size {} due to time since last flush ({:?})",
    5224            0 :                 lsn,
    5225            0 :                 state.current_size,
    5226            0 :                 state.cached_last_freeze_ts.elapsed()
    5227            0 :             );
    5228              : 
    5229            0 :             OpenLayerAction::Roll
    5230              :         } else {
    5231           48 :             OpenLayerAction::None
    5232              :         }
    5233      2913768 :     }
    5234              : 
    5235              :     /// Put a batch keys at the specified Lsns.
    5236              :     ///
    5237              :     /// The batch should be sorted by Lsn such that it's safe
    5238              :     /// to roll the open layer mid batch.
    5239       413936 :     pub(crate) async fn put_batch(
    5240       413936 :         &mut self,
    5241       413936 :         batch: Vec<(Key, Lsn, Value)>,
    5242       413936 :         ctx: &RequestContext,
    5243       413936 :     ) -> anyhow::Result<()> {
    5244      1113600 :         for (key, lsn, val) in batch {
    5245       699664 :             self.put(key, lsn, &val, ctx).await?
    5246              :         }
    5247              : 
    5248       413936 :         Ok(())
    5249       413936 :     }
    5250              : 
    5251            2 :     pub(crate) async fn delete_batch(&mut self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
    5252            2 :         if let Some((_, lsn)) = batch.first() {
    5253            2 :             let action = self.get_open_layer_action(*lsn, 0);
    5254            2 :             let layer = self.handle_open_layer_action(*lsn, action).await?;
    5255            2 :             layer.put_tombstones(batch).await?;
    5256            0 :         }
    5257              : 
    5258            2 :         Ok(())
    5259            2 :     }
    5260              : 
    5261              :     /// Track the end of the latest digested WAL record.
    5262              :     /// Remember the (end of) last valid WAL record remembered in the timeline.
    5263              :     ///
    5264              :     /// Call this after you have finished writing all the WAL up to 'lsn'.
    5265              :     ///
    5266              :     /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for
    5267              :     /// the 'lsn' or anything older. The previous last record LSN is stored alongside
    5268              :     /// the latest and can be read.
    5269      3102908 :     pub(crate) fn finish_write(&self, new_lsn: Lsn) {
    5270      3102908 :         self.tl.finish_write(new_lsn);
    5271      3102908 :     }
    5272              : 
    5273       270570 :     pub(crate) fn update_current_logical_size(&self, delta: i64) {
    5274       270570 :         self.tl.update_current_logical_size(delta)
    5275       270570 :     }
    5276              : }
    5277              : 
    5278              : // We need TimelineWriter to be send in upcoming conversion of
    5279              : // Timeline::layers to tokio::sync::RwLock.
    5280            2 : #[test]
    5281            2 : fn is_send() {
    5282            2 :     fn _assert_send<T: Send>() {}
    5283            2 :     _assert_send::<TimelineWriter<'_>>();
    5284            2 : }
    5285              : 
    5286              : /// Add a suffix to a layer file's name: .{num}.old
    5287              : /// Uses the first available num (starts at 0)
    5288            0 : fn rename_to_backup(path: &Utf8Path) -> anyhow::Result<()> {
    5289            0 :     let filename = path
    5290            0 :         .file_name()
    5291            0 :         .ok_or_else(|| anyhow!("Path {path} don't have a file name"))?;
    5292            0 :     let mut new_path = path.to_owned();
    5293              : 
    5294            0 :     for i in 0u32.. {
    5295            0 :         new_path.set_file_name(format!("{filename}.{i}.old"));
    5296            0 :         if !new_path.exists() {
    5297            0 :             std::fs::rename(path, &new_path)
    5298            0 :                 .with_context(|| format!("rename {path:?} to {new_path:?}"))?;
    5299            0 :             return Ok(());
    5300            0 :         }
    5301              :     }
    5302              : 
    5303            0 :     bail!("couldn't find an unused backup number for {:?}", path)
    5304            0 : }
    5305              : 
    5306              : #[cfg(test)]
    5307              : mod tests {
    5308              :     use utils::{id::TimelineId, lsn::Lsn};
    5309              : 
    5310              :     use crate::tenant::{
    5311              :         harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline,
    5312              :     };
    5313              : 
    5314            2 :     #[tokio::test]
    5315            2 :     async fn two_layer_eviction_attempts_at_the_same_time() {
    5316            2 :         let harness =
    5317            2 :             TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
    5318            2 : 
    5319            2 :         let ctx = any_context();
    5320            2 :         let tenant = harness.do_try_load(&ctx).await.unwrap();
    5321            2 :         let timeline = tenant
    5322            2 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
    5323            6 :             .await
    5324            2 :             .unwrap();
    5325            2 : 
    5326            2 :         let layer = find_some_layer(&timeline).await;
    5327            2 :         let layer = layer
    5328            2 :             .keep_resident()
    5329            2 :             .await
    5330            2 :             .expect("no download => no downloading errors")
    5331            2 :             .expect("should had been resident")
    5332            2 :             .drop_eviction_guard();
    5333            2 : 
    5334            2 :         let first = async { layer.evict_and_wait().await };
    5335            2 :         let second = async { layer.evict_and_wait().await };
    5336            2 : 
    5337            4 :         let (first, second) = tokio::join!(first, second);
    5338            2 : 
    5339            2 :         let res = layer.keep_resident().await;
    5340            2 :         assert!(matches!(res, Ok(None)), "{res:?}");
    5341            2 : 
    5342            2 :         match (first, second) {
    5343            2 :             (Ok(()), Ok(())) => {
    5344            0 :                 // because there are no more timeline locks being taken on eviction path, we can
    5345            0 :                 // witness all three outcomes here.
    5346            0 :             }
    5347            2 :             (Ok(()), Err(EvictionError::NotFound)) | (Err(EvictionError::NotFound), Ok(())) => {
    5348            2 :                 // if one completes before the other, this is fine just as well.
    5349            2 :             }
    5350            2 :             other => unreachable!("unexpected {:?}", other),
    5351            2 :         }
    5352            2 :     }
    5353              : 
    5354            2 :     fn any_context() -> crate::context::RequestContext {
    5355            2 :         use crate::context::*;
    5356            2 :         use crate::task_mgr::*;
    5357            2 :         RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
    5358            2 :     }
    5359              : 
    5360            2 :     async fn find_some_layer(timeline: &Timeline) -> Layer {
    5361            2 :         let layers = timeline.layers.read().await;
    5362            2 :         let desc = layers
    5363            2 :             .layer_map()
    5364            2 :             .iter_historic_layers()
    5365            2 :             .next()
    5366            2 :             .expect("must find one layer to evict");
    5367            2 : 
    5368            2 :         layers.get_from_desc(&desc)
    5369            2 :     }
    5370              : }
        

Generated by: LCOV version 2.1-beta