Line data Source code
1 : mod compaction;
2 : pub mod delete;
3 : mod eviction_task;
4 : mod init;
5 : pub mod layer_manager;
6 : pub(crate) mod logical_size;
7 : pub mod span;
8 : pub mod uninit;
9 : mod walreceiver;
10 :
11 : use anyhow::{anyhow, bail, ensure, Context, Result};
12 : use bytes::Bytes;
13 : use camino::{Utf8Path, Utf8PathBuf};
14 : use enumset::EnumSet;
15 : use fail::fail_point;
16 : use futures::stream::StreamExt;
17 : use itertools::Itertools;
18 : use once_cell::sync::Lazy;
19 : use pageserver_api::{
20 : keyspace::KeySpaceAccum,
21 : models::{
22 : CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
23 : EvictionPolicy, LayerMapInfo, TimelineState,
24 : },
25 : reltag::BlockNumber,
26 : shard::{ShardIdentity, TenantShardId},
27 : };
28 : use rand::Rng;
29 : use serde_with::serde_as;
30 : use std::ops::{Deref, Range};
31 : use std::pin::pin;
32 : use std::sync::atomic::Ordering as AtomicOrdering;
33 : use std::sync::{Arc, Mutex, RwLock, Weak};
34 : use std::time::{Duration, Instant, SystemTime};
35 : use std::{
36 : array,
37 : collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
38 : sync::atomic::AtomicU64,
39 : };
40 : use std::{
41 : cmp::{max, min, Ordering},
42 : ops::ControlFlow,
43 : };
44 : use storage_broker::BrokerClientChannel;
45 : use tokio::{
46 : runtime::Handle,
47 : sync::{oneshot, watch},
48 : };
49 : use tokio_util::sync::CancellationToken;
50 : use tracing::*;
51 : use utils::sync::gate::{Gate, GateGuard};
52 :
53 : use crate::pgdatadir_mapping::{AuxFilesDirectory, DirectoryKind};
54 : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
55 : use crate::tenant::{
56 : layer_map::{LayerMap, SearchResult},
57 : metadata::TimelineMetadata,
58 : par_fsync,
59 : };
60 : use crate::{
61 : context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder},
62 : disk_usage_eviction_task::DiskUsageEvictionInfo,
63 : pgdatadir_mapping::CollectKeySpaceError,
64 : };
65 : use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
66 : use crate::{
67 : disk_usage_eviction_task::finite_f32,
68 : tenant::storage_layer::{
69 : AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
70 : LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
71 : ValueReconstructState, ValuesReconstructState,
72 : },
73 : };
74 : use crate::{
75 : disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
76 : };
77 : use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
78 :
79 : use crate::config::PageServerConf;
80 : use crate::keyspace::{KeyPartitioning, KeySpace};
81 : use crate::metrics::{
82 : TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
83 : };
84 : use crate::pgdatadir_mapping::CalculateLogicalSizeError;
85 : use crate::tenant::config::TenantConfOpt;
86 : use pageserver_api::key::{is_inherited_key, is_rel_fsm_block_key, is_rel_vm_block_key};
87 : use pageserver_api::reltag::RelTag;
88 : use pageserver_api::shard::ShardIndex;
89 :
90 : use postgres_connection::PgConnectionConfig;
91 : use postgres_ffi::to_pg_timestamp;
92 : use utils::{
93 : completion,
94 : generation::Generation,
95 : id::TimelineId,
96 : lsn::{AtomicLsn, Lsn, RecordLsn},
97 : seqwait::SeqWait,
98 : simple_rcu::{Rcu, RcuReadGuard},
99 : };
100 :
101 : use crate::page_cache;
102 : use crate::repository::GcResult;
103 : use crate::repository::{Key, Value};
104 : use crate::task_mgr;
105 : use crate::task_mgr::TaskKind;
106 : use crate::ZERO_PAGE;
107 :
108 : use self::delete::DeleteTimelineFlow;
109 : pub(super) use self::eviction_task::EvictionTaskTenantState;
110 : use self::eviction_task::EvictionTaskTimelineState;
111 : use self::layer_manager::LayerManager;
112 : use self::logical_size::LogicalSize;
113 : use self::walreceiver::{WalReceiver, WalReceiverConf};
114 :
115 : use super::remote_timeline_client::RemoteTimelineClient;
116 : use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
117 : use super::{config::TenantConf, storage_layer::ReadableLayerDesc};
118 : use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
119 : use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
120 :
121 2 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
122 : pub(super) enum FlushLoopState {
123 : NotStarted,
124 : Running {
125 : #[cfg(test)]
126 : expect_initdb_optimization: bool,
127 : #[cfg(test)]
128 : initdb_optimization_count: usize,
129 : },
130 : Exited,
131 : }
132 :
133 : /// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
134 0 : #[derive(Debug, Clone, PartialEq, Eq)]
135 : pub(crate) struct Hole {
136 : key_range: Range<Key>,
137 : coverage_size: usize,
138 : }
139 :
140 : impl Ord for Hole {
141 0 : fn cmp(&self, other: &Self) -> Ordering {
142 0 : other.coverage_size.cmp(&self.coverage_size) // inverse order
143 0 : }
144 : }
145 :
146 : impl PartialOrd for Hole {
147 0 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
148 0 : Some(self.cmp(other))
149 0 : }
150 : }
151 :
152 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
153 : /// Can be removed after all refactors are done.
154 30 : fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
155 30 : drop(rlock)
156 30 : }
157 :
158 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
159 : /// Can be removed after all refactors are done.
160 512 : fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
161 512 : drop(rlock)
162 512 : }
163 :
164 : /// The outward-facing resources required to build a Timeline
165 : pub struct TimelineResources {
166 : pub remote_client: Option<RemoteTimelineClient>,
167 : pub deletion_queue_client: DeletionQueueClient,
168 : pub timeline_get_throttle: Arc<
169 : crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
170 : >,
171 : }
172 :
173 : pub(crate) struct AuxFilesState {
174 : pub(crate) dir: Option<AuxFilesDirectory>,
175 : pub(crate) n_deltas: usize,
176 : }
177 :
178 : pub struct Timeline {
179 : conf: &'static PageServerConf,
180 : tenant_conf: Arc<RwLock<AttachedTenantConf>>,
181 :
182 : myself: Weak<Self>,
183 :
184 : pub(crate) tenant_shard_id: TenantShardId,
185 : pub timeline_id: TimelineId,
186 :
187 : /// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
188 : /// Never changes for the lifetime of this [`Timeline`] object.
189 : ///
190 : /// This duplicates the generation stored in LocationConf, but that structure is mutable:
191 : /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
192 : pub(crate) generation: Generation,
193 :
194 : /// The detailed sharding information from our parent Tenant. This enables us to map keys
195 : /// to shards, and is constant through the lifetime of this Timeline.
196 : shard_identity: ShardIdentity,
197 :
198 : pub pg_version: u32,
199 :
200 : /// The tuple has two elements.
201 : /// 1. `LayerFileManager` keeps track of the various physical representations of the layer files (inmem, local, remote).
202 : /// 2. `LayerMap`, the acceleration data structure for `get_reconstruct_data`.
203 : ///
204 : /// `LayerMap` maps out the `(PAGE,LSN) / (KEY,LSN)` space, which is composed of `(KeyRange, LsnRange)` rectangles.
205 : /// We describe these rectangles through the `PersistentLayerDesc` struct.
206 : ///
207 : /// When we want to reconstruct a page, we first find the `PersistentLayerDesc`'s that we need for page reconstruction,
208 : /// using `LayerMap`. Then, we use `LayerFileManager` to get the `PersistentLayer`'s that correspond to the
209 : /// `PersistentLayerDesc`'s.
210 : ///
211 : /// Hence, it's important to keep things coherent. The `LayerFileManager` must always have an entry for all
212 : /// `PersistentLayerDesc`'s in the `LayerMap`. If it doesn't, `LayerFileManager::get_from_desc` will panic at
213 : /// runtime, e.g., during page reconstruction.
214 : ///
215 : /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
216 : /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
217 : pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
218 :
219 : last_freeze_at: AtomicLsn,
220 : // Atomic would be more appropriate here.
221 : last_freeze_ts: RwLock<Instant>,
222 :
223 : // WAL redo manager. `None` only for broken tenants.
224 : walredo_mgr: Option<Arc<super::WalRedoManager>>,
225 :
226 : /// Remote storage client.
227 : /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
228 : pub remote_client: Option<Arc<RemoteTimelineClient>>,
229 :
230 : // What page versions do we hold in the repository? If we get a
231 : // request > last_record_lsn, we need to wait until we receive all
232 : // the WAL up to the request. The SeqWait provides functions for
233 : // that. TODO: If we get a request for an old LSN, such that the
234 : // versions have already been garbage collected away, we should
235 : // throw an error, but we don't track that currently.
236 : //
237 : // last_record_lsn.load().last points to the end of last processed WAL record.
238 : //
239 : // We also remember the starting point of the previous record in
240 : // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
241 : // first WAL record when the node is started up. But here, we just
242 : // keep track of it.
243 : last_record_lsn: SeqWait<RecordLsn, Lsn>,
244 :
245 : // All WAL records have been processed and stored durably on files on
246 : // local disk, up to this LSN. On crash and restart, we need to re-process
247 : // the WAL starting from this point.
248 : //
249 : // Some later WAL records might have been processed and also flushed to disk
250 : // already, so don't be surprised to see some, but there's no guarantee on
251 : // them yet.
252 : disk_consistent_lsn: AtomicLsn,
253 :
254 : // Parent timeline that this timeline was branched from, and the LSN
255 : // of the branch point.
256 : ancestor_timeline: Option<Arc<Timeline>>,
257 : ancestor_lsn: Lsn,
258 :
259 : pub(super) metrics: TimelineMetrics,
260 :
261 : // `Timeline` doesn't write these metrics itself, but it manages the lifetime. Code
262 : // in `crate::page_service` writes these metrics.
263 : pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
264 :
265 : directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM],
266 :
267 : /// Ensures layers aren't frozen by checkpointer between
268 : /// [`Timeline::get_layer_for_write`] and layer reads.
269 : /// Locked automatically by [`TimelineWriter`] and checkpointer.
270 : /// Must always be acquired before the layer map/individual layer lock
271 : /// to avoid deadlock.
272 : write_lock: tokio::sync::Mutex<()>,
273 :
274 : /// Used to avoid multiple `flush_loop` tasks running
275 : pub(super) flush_loop_state: Mutex<FlushLoopState>,
276 :
277 : /// layer_flush_start_tx can be used to wake up the layer-flushing task.
278 : /// The value is a counter, incremented every time a new flush cycle is requested.
279 : /// The flush cycle counter is sent back on the layer_flush_done channel when
280 : /// the flush finishes. You can use that to wait for the flush to finish.
281 : layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
282 : /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
283 : layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
284 :
285 : // Needed to ensure that we can't create a branch at a point that was already garbage collected
286 : pub latest_gc_cutoff_lsn: Rcu<Lsn>,
287 :
288 : // List of child timelines and their branch points. This is needed to avoid
289 : // garbage collecting data that is still needed by the child timelines.
290 : pub gc_info: std::sync::RwLock<GcInfo>,
291 :
292 : // It may change across major versions so for simplicity
293 : // keep it after running initdb for a timeline.
294 : // It is needed in checks when we want to error on some operations
295 : // when they are requested for pre-initdb lsn.
296 : // It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn",
297 : // though let's keep them both for better error visibility.
298 : pub initdb_lsn: Lsn,
299 :
300 : /// When did we last calculate the partitioning?
301 : partitioning: tokio::sync::Mutex<(KeyPartitioning, Lsn)>,
302 :
303 : /// Configuration: how often should the partitioning be recalculated.
304 : repartition_threshold: u64,
305 :
306 : /// Current logical size of the "datadir", at the last LSN.
307 : current_logical_size: LogicalSize,
308 :
309 : /// Information about the last processed message by the WAL receiver,
310 : /// or None if WAL receiver has not received anything for this timeline
311 : /// yet.
312 : pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
313 : pub walreceiver: Mutex<Option<WalReceiver>>,
314 :
315 : /// Relation size cache
316 : pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
317 :
318 : download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
319 :
320 : state: watch::Sender<TimelineState>,
321 :
322 : /// Prevent two tasks from deleting the timeline at the same time. If held, the
323 : /// timeline is being deleted. If 'true', the timeline has already been deleted.
324 : pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
325 :
326 : eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
327 :
328 : /// Load or creation time information about the disk_consistent_lsn and when the loading
329 : /// happened. Used for consumption metrics.
330 : pub(crate) loaded_at: (Lsn, SystemTime),
331 :
332 : /// Gate to prevent shutdown completing while I/O is still happening to this timeline's data
333 : pub(crate) gate: Gate,
334 :
335 : /// Cancellation token scoped to this timeline: anything doing long-running work relating
336 : /// to the timeline should drop out when this token fires.
337 : pub(crate) cancel: CancellationToken,
338 :
339 : /// Make sure we only have one running compaction at a time in tests.
340 : ///
341 : /// Must only be taken in two places:
342 : /// - [`Timeline::compact`] (this file)
343 : /// - [`delete::delete_local_timeline_directory`]
344 : ///
345 : /// Timeline deletion will acquire both compaction and gc locks in whatever order.
346 : compaction_lock: tokio::sync::Mutex<()>,
347 :
348 : /// Make sure we only have one running gc at a time.
349 : ///
350 : /// Must only be taken in two places:
351 : /// - [`Timeline::gc`] (this file)
352 : /// - [`delete::delete_local_timeline_directory`]
353 : ///
354 : /// Timeline deletion will acquire both compaction and gc locks in whatever order.
355 : gc_lock: tokio::sync::Mutex<()>,
356 :
357 : /// Cloned from [`super::Tenant::timeline_get_throttle`] on construction.
358 : timeline_get_throttle: Arc<
359 : crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
360 : >,
361 :
362 : /// Keep aux directory cache to avoid it's reconstruction on each update
363 : pub(crate) aux_files: tokio::sync::Mutex<AuxFilesState>,
364 : }
365 :
366 : pub struct WalReceiverInfo {
367 : pub wal_source_connconf: PgConnectionConfig,
368 : pub last_received_msg_lsn: Lsn,
369 : pub last_received_msg_ts: u128,
370 : }
371 :
372 : ///
373 : /// Information about how much history needs to be retained, needed by
374 : /// Garbage Collection.
375 : ///
376 : pub struct GcInfo {
377 : /// Specific LSNs that are needed.
378 : ///
379 : /// Currently, this includes all points where child branches have
380 : /// been forked off from. In the future, could also include
381 : /// explicit user-defined snapshot points.
382 : pub retain_lsns: Vec<Lsn>,
383 :
384 : /// In addition to 'retain_lsns', keep everything newer than this
385 : /// point.
386 : ///
387 : /// This is calculated by subtracting 'gc_horizon' setting from
388 : /// last-record LSN
389 : ///
390 : /// FIXME: is this inclusive or exclusive?
391 : pub horizon_cutoff: Lsn,
392 :
393 : /// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this
394 : /// point.
395 : ///
396 : /// This is calculated by finding a number such that a record is needed for PITR
397 : /// if only if its LSN is larger than 'pitr_cutoff'.
398 : pub pitr_cutoff: Lsn,
399 : }
400 :
401 : /// An error happened in a get() operation.
402 2 : #[derive(thiserror::Error, Debug)]
403 : pub(crate) enum PageReconstructError {
404 : #[error(transparent)]
405 : Other(#[from] anyhow::Error),
406 :
407 : #[error("Ancestor LSN wait error: {0}")]
408 : AncestorLsnTimeout(#[from] WaitLsnError),
409 :
410 : #[error("timeline shutting down")]
411 : Cancelled,
412 :
413 : /// The ancestor of this is being stopped
414 : #[error("ancestor timeline {0} is being stopped")]
415 : AncestorStopping(TimelineId),
416 :
417 : /// An error happened replaying WAL records
418 : #[error(transparent)]
419 : WalRedo(anyhow::Error),
420 : }
421 :
422 : impl PageReconstructError {
423 : /// Returns true if this error indicates a tenant/timeline shutdown alike situation
424 0 : pub(crate) fn is_stopping(&self) -> bool {
425 0 : use PageReconstructError::*;
426 0 : match self {
427 0 : Other(_) => false,
428 0 : AncestorLsnTimeout(_) => false,
429 0 : Cancelled | AncestorStopping(_) => true,
430 0 : WalRedo(_) => false,
431 : }
432 0 : }
433 : }
434 :
435 0 : #[derive(thiserror::Error, Debug)]
436 : enum CreateImageLayersError {
437 : #[error("timeline shutting down")]
438 : Cancelled,
439 :
440 : #[error(transparent)]
441 : GetVectoredError(GetVectoredError),
442 :
443 : #[error(transparent)]
444 : PageReconstructError(PageReconstructError),
445 :
446 : #[error(transparent)]
447 : Other(#[from] anyhow::Error),
448 : }
449 :
450 0 : #[derive(thiserror::Error, Debug)]
451 : enum FlushLayerError {
452 : /// Timeline cancellation token was cancelled
453 : #[error("timeline shutting down")]
454 : Cancelled,
455 :
456 : #[error(transparent)]
457 : CreateImageLayersError(CreateImageLayersError),
458 :
459 : #[error(transparent)]
460 : Other(#[from] anyhow::Error),
461 : }
462 :
463 0 : #[derive(thiserror::Error, Debug)]
464 : pub(crate) enum GetVectoredError {
465 : #[error("timeline shutting down")]
466 : Cancelled,
467 :
468 : #[error("Requested too many keys: {0} > {}", Timeline::MAX_GET_VECTORED_KEYS)]
469 : Oversized(u64),
470 :
471 : #[error("Requested at invalid LSN: {0}")]
472 : InvalidLsn(Lsn),
473 :
474 : #[error("Requested key {0} not found")]
475 : MissingKey(Key),
476 :
477 : #[error(transparent)]
478 : GetReadyAncestorError(GetReadyAncestorError),
479 :
480 : #[error(transparent)]
481 : Other(#[from] anyhow::Error),
482 : }
483 :
484 0 : #[derive(thiserror::Error, Debug)]
485 : pub(crate) enum GetReadyAncestorError {
486 : #[error("ancestor timeline {0} is being stopped")]
487 : AncestorStopping(TimelineId),
488 :
489 : #[error("Ancestor LSN wait error: {0}")]
490 : AncestorLsnTimeout(#[from] WaitLsnError),
491 :
492 : #[error("Cancelled")]
493 : Cancelled,
494 :
495 : #[error(transparent)]
496 : Other(#[from] anyhow::Error),
497 : }
498 :
499 0 : #[derive(Clone, Copy)]
500 : pub enum LogicalSizeCalculationCause {
501 : Initial,
502 : ConsumptionMetricsSyntheticSize,
503 : EvictionTaskImitation,
504 : TenantSizeHandler,
505 : }
506 :
507 : pub enum GetLogicalSizePriority {
508 : User,
509 : Background,
510 : }
511 :
512 0 : #[derive(enumset::EnumSetType)]
513 : pub(crate) enum CompactFlags {
514 : ForceRepartition,
515 : ForceImageLayerCreation,
516 : }
517 :
518 : impl std::fmt::Debug for Timeline {
519 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
520 0 : write!(f, "Timeline<{}>", self.timeline_id)
521 0 : }
522 : }
523 :
524 0 : #[derive(thiserror::Error, Debug)]
525 : pub(crate) enum WaitLsnError {
526 : // Called on a timeline which is shutting down
527 : #[error("Shutdown")]
528 : Shutdown,
529 :
530 : // Called on an timeline not in active state or shutting down
531 : #[error("Bad state (not active)")]
532 : BadState,
533 :
534 : // Timeout expired while waiting for LSN to catch up with goal.
535 : #[error("{0}")]
536 : Timeout(String),
537 : }
538 :
539 : // The impls below achieve cancellation mapping for errors.
540 : // Perhaps there's a way of achieving this with less cruft.
541 :
542 : impl From<CreateImageLayersError> for CompactionError {
543 0 : fn from(e: CreateImageLayersError) -> Self {
544 0 : match e {
545 0 : CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
546 0 : _ => CompactionError::Other(e.into()),
547 : }
548 0 : }
549 : }
550 :
551 : impl From<CreateImageLayersError> for FlushLayerError {
552 0 : fn from(e: CreateImageLayersError) -> Self {
553 0 : match e {
554 0 : CreateImageLayersError::Cancelled => FlushLayerError::Cancelled,
555 0 : any => FlushLayerError::CreateImageLayersError(any),
556 : }
557 0 : }
558 : }
559 :
560 : impl From<PageReconstructError> for CreateImageLayersError {
561 0 : fn from(e: PageReconstructError) -> Self {
562 0 : match e {
563 0 : PageReconstructError::Cancelled => CreateImageLayersError::Cancelled,
564 0 : _ => CreateImageLayersError::PageReconstructError(e),
565 : }
566 0 : }
567 : }
568 :
569 : impl From<GetVectoredError> for CreateImageLayersError {
570 0 : fn from(e: GetVectoredError) -> Self {
571 0 : match e {
572 0 : GetVectoredError::Cancelled => CreateImageLayersError::Cancelled,
573 0 : _ => CreateImageLayersError::GetVectoredError(e),
574 : }
575 0 : }
576 : }
577 :
578 : impl From<GetReadyAncestorError> for PageReconstructError {
579 2 : fn from(e: GetReadyAncestorError) -> Self {
580 2 : use GetReadyAncestorError::*;
581 2 : match e {
582 0 : AncestorStopping(tid) => PageReconstructError::AncestorStopping(tid),
583 0 : AncestorLsnTimeout(wait_err) => PageReconstructError::AncestorLsnTimeout(wait_err),
584 0 : Cancelled => PageReconstructError::Cancelled,
585 2 : Other(other) => PageReconstructError::Other(other),
586 : }
587 2 : }
588 : }
589 :
590 : #[derive(
591 : Eq,
592 4 : PartialEq,
593 0 : Debug,
594 : Copy,
595 0 : Clone,
596 119 : strum_macros::EnumString,
597 0 : strum_macros::Display,
598 0 : serde_with::DeserializeFromStr,
599 0 : serde_with::SerializeDisplay,
600 : )]
601 : #[strum(serialize_all = "kebab-case")]
602 : pub enum GetVectoredImpl {
603 : Sequential,
604 : Vectored,
605 : }
606 :
607 : /// Public interface functions
608 : impl Timeline {
609 : /// Get the LSN where this branch was created
610 6 : pub(crate) fn get_ancestor_lsn(&self) -> Lsn {
611 6 : self.ancestor_lsn
612 6 : }
613 :
614 : /// Get the ancestor's timeline id
615 14 : pub(crate) fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
616 14 : self.ancestor_timeline
617 14 : .as_ref()
618 14 : .map(|ancestor| ancestor.timeline_id)
619 14 : }
620 :
621 : /// Lock and get timeline's GC cutoff
622 622 : pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
623 622 : self.latest_gc_cutoff_lsn.read()
624 622 : }
625 :
626 : /// Look up given page version.
627 : ///
628 : /// If a remote layer file is needed, it is downloaded as part of this
629 : /// call.
630 : ///
631 : /// NOTE: It is considered an error to 'get' a key that doesn't exist. The
632 : /// abstraction above this needs to store suitable metadata to track what
633 : /// data exists with what keys, in separate metadata entries. If a
634 : /// non-existent key is requested, we may incorrectly return a value from
635 : /// an ancestor branch, for example, or waste a lot of cycles chasing the
636 : /// non-existing key.
637 : ///
638 : /// # Cancel-Safety
639 : ///
640 : /// This method is cancellation-safe.
641 502524 : pub(crate) async fn get(
642 502524 : &self,
643 502524 : key: Key,
644 502524 : lsn: Lsn,
645 502524 : ctx: &RequestContext,
646 502524 : ) -> Result<Bytes, PageReconstructError> {
647 502524 : if !lsn.is_valid() {
648 0 : return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
649 502524 : }
650 502524 :
651 502524 : self.timeline_get_throttle.throttle(ctx, 1).await;
652 :
653 : // This check is debug-only because of the cost of hashing, and because it's a double-check: we
654 : // already checked the key against the shard_identity when looking up the Timeline from
655 : // page_service.
656 502524 : debug_assert!(!self.shard_identity.is_key_disposable(&key));
657 :
658 : // XXX: structured stats collection for layer eviction here.
659 0 : trace!(
660 0 : "get page request for {}@{} from task kind {:?}",
661 0 : key,
662 0 : lsn,
663 0 : ctx.task_kind()
664 0 : );
665 :
666 : // Check the page cache. We will get back the most recent page with lsn <= `lsn`.
667 : // The cached image can be returned directly if there is no WAL between the cached image
668 : // and requested LSN. The cached image can also be used to reduce the amount of WAL needed
669 : // for redo.
670 502524 : let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
671 0 : Some((cached_lsn, cached_img)) => {
672 0 : match cached_lsn.cmp(&lsn) {
673 0 : Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
674 : Ordering::Equal => {
675 0 : MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
676 0 : return Ok(cached_img); // exact LSN match, return the image
677 : }
678 : Ordering::Greater => {
679 0 : unreachable!("the returned lsn should never be after the requested lsn")
680 : }
681 : }
682 0 : Some((cached_lsn, cached_img))
683 : }
684 502524 : None => None,
685 : };
686 :
687 502524 : let mut reconstruct_state = ValueReconstructState {
688 502524 : records: Vec::new(),
689 502524 : img: cached_page_img,
690 502524 : };
691 502524 :
692 502524 : let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
693 502524 : let path = self
694 502524 : .get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
695 36291 : .await?;
696 502416 : timer.stop_and_record();
697 502416 :
698 502416 : let start = Instant::now();
699 502416 : let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
700 502416 : let elapsed = start.elapsed();
701 502416 : crate::metrics::RECONSTRUCT_TIME
702 502416 : .for_result(&res)
703 502416 : .observe(elapsed.as_secs_f64());
704 502416 :
705 502416 : if cfg!(feature = "testing") && res.is_err() {
706 : // it can only be walredo issue
707 : use std::fmt::Write;
708 :
709 0 : let mut msg = String::new();
710 0 :
711 0 : path.into_iter().for_each(|(res, cont_lsn, layer)| {
712 0 : writeln!(
713 0 : msg,
714 0 : "- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
715 0 : layer(),
716 0 : )
717 0 : .expect("string grows")
718 0 : });
719 :
720 : // this is to rule out or provide evidence that we could in some cases read a duplicate
721 : // walrecord
722 0 : tracing::info!("walredo failed, path:\n{msg}");
723 502416 : }
724 :
725 502416 : res
726 502524 : }
727 :
728 : pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
729 :
730 : /// Look up multiple page versions at a given LSN
731 : ///
732 : /// This naive implementation will be replaced with a more efficient one
733 : /// which actually vectorizes the read path.
734 444 : pub(crate) async fn get_vectored(
735 444 : &self,
736 444 : keyspace: KeySpace,
737 444 : lsn: Lsn,
738 444 : ctx: &RequestContext,
739 444 : ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
740 444 : if !lsn.is_valid() {
741 0 : return Err(GetVectoredError::InvalidLsn(lsn));
742 444 : }
743 444 :
744 444 : let key_count = keyspace.total_size().try_into().unwrap();
745 444 : if key_count > Timeline::MAX_GET_VECTORED_KEYS {
746 0 : return Err(GetVectoredError::Oversized(key_count));
747 444 : }
748 444 :
749 444 : self.timeline_get_throttle
750 444 : .throttle(ctx, key_count as usize)
751 0 : .await;
752 :
753 888 : for range in &keyspace.ranges {
754 444 : let mut key = range.start;
755 1036 : while key != range.end {
756 592 : assert!(!self.shard_identity.is_key_disposable(&key));
757 592 : key = key.next();
758 : }
759 : }
760 :
761 0 : trace!(
762 0 : "get vectored request for {:?}@{} from task kind {:?} will use {} implementation",
763 0 : keyspace,
764 0 : lsn,
765 0 : ctx.task_kind(),
766 0 : self.conf.get_vectored_impl
767 0 : );
768 :
769 444 : let _timer = crate::metrics::GET_VECTORED_LATENCY
770 444 : .for_task_kind(ctx.task_kind())
771 444 : .map(|t| t.start_timer());
772 444 :
773 444 : match self.conf.get_vectored_impl {
774 : GetVectoredImpl::Sequential => {
775 444 : self.get_vectored_sequential_impl(keyspace, lsn, ctx).await
776 : }
777 : GetVectoredImpl::Vectored => {
778 0 : let vectored_res = self.get_vectored_impl(keyspace.clone(), lsn, ctx).await;
779 :
780 0 : if self.conf.validate_vectored_get {
781 0 : self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
782 0 : .await;
783 0 : }
784 :
785 0 : vectored_res
786 : }
787 : }
788 444 : }
789 :
790 454 : pub(super) async fn get_vectored_sequential_impl(
791 454 : &self,
792 454 : keyspace: KeySpace,
793 454 : lsn: Lsn,
794 454 : ctx: &RequestContext,
795 454 : ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
796 454 : let mut values = BTreeMap::new();
797 908 : for range in keyspace.ranges {
798 454 : let mut key = range.start;
799 1366 : while key != range.end {
800 912 : let block = self.get(key, lsn, ctx).await;
801 :
802 : use PageReconstructError::*;
803 0 : match block {
804 : Err(Cancelled | AncestorStopping(_)) => {
805 0 : return Err(GetVectoredError::Cancelled)
806 : }
807 0 : Err(Other(err)) if err.to_string().contains("could not find data for key") => {
808 0 : return Err(GetVectoredError::MissingKey(key))
809 : }
810 912 : _ => {
811 912 : values.insert(key, block);
812 912 : key = key.next();
813 912 : }
814 : }
815 : }
816 : }
817 :
818 454 : Ok(values)
819 454 : }
820 :
821 10 : pub(super) async fn get_vectored_impl(
822 10 : &self,
823 10 : keyspace: KeySpace,
824 10 : lsn: Lsn,
825 10 : ctx: &RequestContext,
826 10 : ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
827 10 : let mut reconstruct_state = ValuesReconstructState::new();
828 10 :
829 10 : self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx)
830 25 : .await?;
831 :
832 10 : let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
833 330 : for (key, res) in reconstruct_state.keys {
834 320 : match res {
835 0 : Err(err) => {
836 0 : results.insert(key, Err(err));
837 0 : }
838 320 : Ok(state) => {
839 320 : let state = ValueReconstructState::from(state);
840 :
841 320 : let reconstruct_res = self.reconstruct_value(key, lsn, state).await;
842 320 : results.insert(key, reconstruct_res);
843 : }
844 : }
845 : }
846 :
847 10 : Ok(results)
848 10 : }
849 :
850 10 : pub(super) async fn validate_get_vectored_impl(
851 10 : &self,
852 10 : vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>,
853 10 : keyspace: KeySpace,
854 10 : lsn: Lsn,
855 10 : ctx: &RequestContext,
856 10 : ) {
857 10 : let sequential_res = self
858 10 : .get_vectored_sequential_impl(keyspace.clone(), lsn, ctx)
859 20 : .await;
860 :
861 0 : fn errors_match(lhs: &GetVectoredError, rhs: &GetVectoredError) -> bool {
862 0 : use GetVectoredError::*;
863 0 : match (lhs, rhs) {
864 0 : (Cancelled, Cancelled) => true,
865 0 : (_, Cancelled) => true,
866 0 : (Oversized(l), Oversized(r)) => l == r,
867 0 : (InvalidLsn(l), InvalidLsn(r)) => l == r,
868 0 : (MissingKey(l), MissingKey(r)) => l == r,
869 0 : (GetReadyAncestorError(_), GetReadyAncestorError(_)) => true,
870 0 : (Other(_), Other(_)) => true,
871 0 : _ => false,
872 : }
873 0 : }
874 :
875 10 : match (&sequential_res, vectored_res) {
876 0 : (Err(seq_err), Ok(_)) => {
877 0 : panic!(concat!("Sequential get failed with {}, but vectored get did not",
878 0 : " - keyspace={:?} lsn={}"),
879 0 : seq_err, keyspace, lsn) },
880 0 : (Ok(_), Err(vec_err)) => {
881 0 : panic!(concat!("Vectored get failed with {}, but sequential get did not",
882 0 : " - keyspace={:?} lsn={}"),
883 0 : vec_err, keyspace, lsn) },
884 0 : (Err(seq_err), Err(vec_err)) => {
885 0 : assert!(errors_match(seq_err, vec_err),
886 0 : "Mismatched errors: {seq_err} != {vec_err} - keyspace={keyspace:?} lsn={lsn}")},
887 10 : (Ok(seq_values), Ok(vec_values)) => {
888 320 : seq_values.iter().zip(vec_values.iter()).for_each(|((seq_key, seq_res), (vec_key, vec_res))| {
889 320 : assert_eq!(seq_key, vec_key);
890 320 : match (seq_res, vec_res) {
891 320 : (Ok(seq_blob), Ok(vec_blob)) => {
892 320 : assert_eq!(seq_blob, vec_blob,
893 0 : "Image mismatch for key {seq_key} - keyspace={keyspace:?} lsn={lsn}");
894 : },
895 0 : (Err(err), Ok(_)) => {
896 0 : panic!(
897 0 : concat!("Sequential get failed with {} for key {}, but vectored get did not",
898 0 : " - keyspace={:?} lsn={}"),
899 0 : err, seq_key, keyspace, lsn) },
900 0 : (Ok(_), Err(err)) => {
901 0 : panic!(
902 0 : concat!("Vectored get failed with {} for key {}, but sequential get did not",
903 0 : " - keyspace={:?} lsn={}"),
904 0 : err, seq_key, keyspace, lsn) },
905 0 : (Err(_), Err(_)) => {}
906 : }
907 320 : })
908 : }
909 : }
910 10 : }
911 :
912 : /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
913 2900534 : pub(crate) fn get_last_record_lsn(&self) -> Lsn {
914 2900534 : self.last_record_lsn.load().last
915 2900534 : }
916 :
917 0 : pub(crate) fn get_prev_record_lsn(&self) -> Lsn {
918 0 : self.last_record_lsn.load().prev
919 0 : }
920 :
921 : /// Atomically get both last and prev.
922 210 : pub(crate) fn get_last_record_rlsn(&self) -> RecordLsn {
923 210 : self.last_record_lsn.load()
924 210 : }
925 :
926 698 : pub(crate) fn get_disk_consistent_lsn(&self) -> Lsn {
927 698 : self.disk_consistent_lsn.load()
928 698 : }
929 :
930 : /// remote_consistent_lsn from the perspective of the tenant's current generation,
931 : /// not validated with control plane yet.
932 : /// See [`Self::get_remote_consistent_lsn_visible`].
933 0 : pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
934 0 : if let Some(remote_client) = &self.remote_client {
935 0 : remote_client.remote_consistent_lsn_projected()
936 : } else {
937 0 : None
938 : }
939 0 : }
940 :
941 : /// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
942 : /// i.e. a value of remote_consistent_lsn_projected which has undergone
943 : /// generation validation in the deletion queue.
944 0 : pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
945 0 : if let Some(remote_client) = &self.remote_client {
946 0 : remote_client.remote_consistent_lsn_visible()
947 : } else {
948 0 : None
949 : }
950 0 : }
951 :
952 : /// The sum of the file size of all historic layers in the layer map.
953 : /// This method makes no distinction between local and remote layers.
954 : /// Hence, the result **does not represent local filesystem usage**.
955 0 : pub(crate) async fn layer_size_sum(&self) -> u64 {
956 0 : let guard = self.layers.read().await;
957 0 : let layer_map = guard.layer_map();
958 0 : let mut size = 0;
959 0 : for l in layer_map.iter_historic_layers() {
960 0 : size += l.file_size();
961 0 : }
962 0 : size
963 0 : }
964 :
965 0 : pub(crate) fn resident_physical_size(&self) -> u64 {
966 0 : self.metrics.resident_physical_size_get()
967 0 : }
968 :
969 0 : pub(crate) fn get_directory_metrics(&self) -> [u64; DirectoryKind::KINDS_NUM] {
970 0 : array::from_fn(|idx| self.directory_metrics[idx].load(AtomicOrdering::Relaxed))
971 0 : }
972 :
973 : ///
974 : /// Wait until WAL has been received and processed up to this LSN.
975 : ///
976 : /// You should call this before any of the other get_* or list_* functions. Calling
977 : /// those functions with an LSN that has been processed yet is an error.
978 : ///
979 226767 : pub(crate) async fn wait_lsn(
980 226767 : &self,
981 226767 : lsn: Lsn,
982 226767 : _ctx: &RequestContext, /* Prepare for use by cancellation */
983 226767 : ) -> Result<(), WaitLsnError> {
984 226767 : if self.cancel.is_cancelled() {
985 0 : return Err(WaitLsnError::Shutdown);
986 226767 : } else if !self.is_active() {
987 0 : return Err(WaitLsnError::BadState);
988 226767 : }
989 :
990 : // This should never be called from the WAL receiver, because that could lead
991 : // to a deadlock.
992 : debug_assert!(
993 226767 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
994 0 : "wait_lsn cannot be called in WAL receiver"
995 : );
996 : debug_assert!(
997 226767 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
998 0 : "wait_lsn cannot be called in WAL receiver"
999 : );
1000 : debug_assert!(
1001 226767 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
1002 0 : "wait_lsn cannot be called in WAL receiver"
1003 : );
1004 :
1005 226767 : let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
1006 226767 :
1007 226767 : match self
1008 226767 : .last_record_lsn
1009 226767 : .wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
1010 0 : .await
1011 : {
1012 226767 : Ok(()) => Ok(()),
1013 0 : Err(e) => {
1014 0 : use utils::seqwait::SeqWaitError::*;
1015 0 : match e {
1016 0 : Shutdown => Err(WaitLsnError::Shutdown),
1017 : Timeout => {
1018 : // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
1019 0 : drop(_timer);
1020 0 : let walreceiver_status = self.walreceiver_status();
1021 0 : Err(WaitLsnError::Timeout(format!(
1022 0 : "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
1023 0 : lsn,
1024 0 : self.get_last_record_lsn(),
1025 0 : self.get_disk_consistent_lsn(),
1026 0 : walreceiver_status,
1027 0 : )))
1028 : }
1029 : }
1030 : }
1031 : }
1032 226767 : }
1033 :
1034 0 : pub(crate) fn walreceiver_status(&self) -> String {
1035 0 : match &*self.walreceiver.lock().unwrap() {
1036 0 : None => "stopping or stopped".to_string(),
1037 0 : Some(walreceiver) => match walreceiver.status() {
1038 0 : Some(status) => status.to_human_readable_string(),
1039 0 : None => "Not active".to_string(),
1040 : },
1041 : }
1042 0 : }
1043 :
1044 : /// Check that it is valid to request operations with that lsn.
1045 214 : pub(crate) fn check_lsn_is_in_scope(
1046 214 : &self,
1047 214 : lsn: Lsn,
1048 214 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
1049 214 : ) -> anyhow::Result<()> {
1050 214 : ensure!(
1051 214 : lsn >= **latest_gc_cutoff_lsn,
1052 4 : "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
1053 4 : lsn,
1054 4 : **latest_gc_cutoff_lsn,
1055 : );
1056 210 : Ok(())
1057 214 : }
1058 :
1059 : /// Flush to disk all data that was written with the put_* functions
1060 1060 : #[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
1061 : pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
1062 : self.freeze_inmem_layer(false).await;
1063 : self.flush_frozen_layers_and_wait().await
1064 : }
1065 :
1066 : /// Outermost timeline compaction operation; downloads needed layers.
1067 408 : pub(crate) async fn compact(
1068 408 : self: &Arc<Self>,
1069 408 : cancel: &CancellationToken,
1070 408 : flags: EnumSet<CompactFlags>,
1071 408 : ctx: &RequestContext,
1072 408 : ) -> Result<(), CompactionError> {
1073 408 : // most likely the cancellation token is from background task, but in tests it could be the
1074 408 : // request task as well.
1075 408 :
1076 408 : let prepare = async move {
1077 408 : let guard = self.compaction_lock.lock().await;
1078 :
1079 408 : let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
1080 408 : BackgroundLoopKind::Compaction,
1081 408 : ctx,
1082 408 : )
1083 0 : .await;
1084 :
1085 408 : (guard, permit)
1086 408 : };
1087 :
1088 : // this wait probably never needs any "long time spent" logging, because we already nag if
1089 : // compaction task goes over it's period (20s) which is quite often in production.
1090 408 : let (_guard, _permit) = tokio::select! {
1091 408 : tuple = prepare => { tuple },
1092 : _ = self.cancel.cancelled() => return Ok(()),
1093 : _ = cancel.cancelled() => return Ok(()),
1094 : };
1095 :
1096 408 : let last_record_lsn = self.get_last_record_lsn();
1097 408 :
1098 408 : // Last record Lsn could be zero in case the timeline was just created
1099 408 : if !last_record_lsn.is_valid() {
1100 0 : warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
1101 0 : return Ok(());
1102 408 : }
1103 408 :
1104 408 : match self.get_compaction_algorithm() {
1105 0 : CompactionAlgorithm::Tiered => self.compact_tiered(cancel, ctx).await,
1106 52874 : CompactionAlgorithm::Legacy => self.compact_legacy(cancel, flags, ctx).await,
1107 : }
1108 408 : }
1109 :
1110 : /// TODO: cancellation
1111 408 : async fn compact_legacy(
1112 408 : self: &Arc<Self>,
1113 408 : _cancel: &CancellationToken,
1114 408 : flags: EnumSet<CompactFlags>,
1115 408 : ctx: &RequestContext,
1116 408 : ) -> Result<(), CompactionError> {
1117 408 : // High level strategy for compaction / image creation:
1118 408 : //
1119 408 : // 1. First, calculate the desired "partitioning" of the
1120 408 : // currently in-use key space. The goal is to partition the
1121 408 : // key space into roughly fixed-size chunks, but also take into
1122 408 : // account any existing image layers, and try to align the
1123 408 : // chunk boundaries with the existing image layers to avoid
1124 408 : // too much churn. Also try to align chunk boundaries with
1125 408 : // relation boundaries. In principle, we don't know about
1126 408 : // relation boundaries here, we just deal with key-value
1127 408 : // pairs, and the code in pgdatadir_mapping.rs knows how to
1128 408 : // map relations into key-value pairs. But in practice we know
1129 408 : // that 'field6' is the block number, and the fields 1-5
1130 408 : // identify a relation. This is just an optimization,
1131 408 : // though.
1132 408 : //
1133 408 : // 2. Once we know the partitioning, for each partition,
1134 408 : // decide if it's time to create a new image layer. The
1135 408 : // criteria is: there has been too much "churn" since the last
1136 408 : // image layer? The "churn" is fuzzy concept, it's a
1137 408 : // combination of too many delta files, or too much WAL in
1138 408 : // total in the delta file. Or perhaps: if creating an image
1139 408 : // file would allow to delete some older files.
1140 408 : //
1141 408 : // 3. After that, we compact all level0 delta files if there
1142 408 : // are too many of them. While compacting, we also garbage
1143 408 : // collect any page versions that are no longer needed because
1144 408 : // of the new image layers we created in step 2.
1145 408 : //
1146 408 : // TODO: This high level strategy hasn't been implemented yet.
1147 408 : // Below are functions compact_level0() and create_image_layers()
1148 408 : // but they are a bit ad hoc and don't quite work like it's explained
1149 408 : // above. Rewrite it.
1150 408 :
1151 408 : // Is the timeline being deleted?
1152 408 : if self.is_stopping() {
1153 0 : trace!("Dropping out of compaction on timeline shutdown");
1154 0 : return Err(CompactionError::ShuttingDown);
1155 408 : }
1156 408 :
1157 408 : let target_file_size = self.get_checkpoint_distance();
1158 408 :
1159 408 : // Define partitioning schema if needed
1160 408 :
1161 408 : // FIXME: the match should only cover repartitioning, not the next steps
1162 408 : match self
1163 408 : .repartition(
1164 408 : self.get_last_record_lsn(),
1165 408 : self.get_compaction_target_size(),
1166 408 : flags,
1167 408 : ctx,
1168 408 : )
1169 13313 : .await
1170 : {
1171 408 : Ok((partitioning, lsn)) => {
1172 408 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
1173 408 : let image_ctx = RequestContextBuilder::extend(ctx)
1174 408 : .access_stats_behavior(AccessStatsBehavior::Skip)
1175 408 : .build();
1176 408 :
1177 408 : // 2. Compact
1178 408 : let timer = self.metrics.compact_time_histo.start_timer();
1179 39561 : self.compact_level0(target_file_size, ctx).await?;
1180 408 : timer.stop_and_record();
1181 :
1182 : // 3. Create new image layers for partitions that have been modified
1183 : // "enough".
1184 408 : let layers = self
1185 408 : .create_image_layers(
1186 408 : &partitioning,
1187 408 : lsn,
1188 408 : flags.contains(CompactFlags::ForceImageLayerCreation),
1189 408 : &image_ctx,
1190 408 : )
1191 0 : .await
1192 408 : .map_err(anyhow::Error::from)?;
1193 408 : if let Some(remote_client) = &self.remote_client {
1194 408 : for layer in layers {
1195 0 : remote_client.schedule_layer_file_upload(layer)?;
1196 : }
1197 0 : }
1198 :
1199 408 : if let Some(remote_client) = &self.remote_client {
1200 : // should any new image layer been created, not uploading index_part will
1201 : // result in a mismatch between remote_physical_size and layermap calculated
1202 : // size, which will fail some tests, but should not be an issue otherwise.
1203 408 : remote_client.schedule_index_upload_for_file_changes()?;
1204 0 : }
1205 : }
1206 0 : Err(err) => {
1207 0 : // no partitioning? This is normal, if the timeline was just created
1208 0 : // as an empty timeline. Also in unit tests, when we use the timeline
1209 0 : // as a simple key-value store, ignoring the datadir layout. Log the
1210 0 : // error but continue.
1211 0 : //
1212 0 : // Suppress error when it's due to cancellation
1213 0 : if !self.cancel.is_cancelled() {
1214 0 : error!("could not compact, repartitioning keyspace failed: {err:?}");
1215 0 : }
1216 : }
1217 : };
1218 :
1219 408 : Ok(())
1220 408 : }
1221 :
1222 : /// Mutate the timeline with a [`TimelineWriter`].
1223 2957012 : pub(crate) async fn writer(&self) -> TimelineWriter<'_> {
1224 2957012 : TimelineWriter {
1225 2957012 : tl: self,
1226 2957012 : _write_guard: self.write_lock.lock().await,
1227 : }
1228 2957012 : }
1229 :
1230 : /// Check if more than 'checkpoint_distance' of WAL has been accumulated in
1231 : /// the in-memory layer, and initiate flushing it if so.
1232 : ///
1233 : /// Also flush after a period of time without new data -- it helps
1234 : /// safekeepers to regard pageserver as caught up and suspend activity.
1235 0 : pub(crate) async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
1236 0 : let last_lsn = self.get_last_record_lsn();
1237 0 : let open_layer_size = {
1238 0 : let guard = self.layers.read().await;
1239 0 : let layers = guard.layer_map();
1240 0 : let Some(open_layer) = layers.open_layer.as_ref() else {
1241 0 : return Ok(());
1242 : };
1243 0 : open_layer.size().await?
1244 : };
1245 0 : let last_freeze_at = self.last_freeze_at.load();
1246 0 : let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
1247 0 : let distance = last_lsn.widening_sub(last_freeze_at);
1248 0 : // Rolling the open layer can be triggered by:
1249 0 : // 1. The distance from the last LSN we rolled at. This bounds the amount of WAL that
1250 0 : // the safekeepers need to store. For sharded tenants, we multiply by shard count to
1251 0 : // account for how writes are distributed across shards: we expect each node to consume
1252 0 : // 1/count of the LSN on average.
1253 0 : // 2. The size of the currently open layer.
1254 0 : // 3. The time since the last roll. It helps safekeepers to regard pageserver as caught
1255 0 : // up and suspend activity.
1256 0 : if (distance
1257 0 : >= self.get_checkpoint_distance() as i128 * self.shard_identity.count.count() as i128)
1258 0 : || open_layer_size > self.get_checkpoint_distance()
1259 0 : || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
1260 : {
1261 0 : info!(
1262 0 : "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
1263 0 : distance,
1264 0 : open_layer_size,
1265 0 : last_freeze_ts.elapsed()
1266 0 : );
1267 :
1268 0 : self.freeze_inmem_layer(true).await;
1269 0 : self.last_freeze_at.store(last_lsn);
1270 0 : *(self.last_freeze_ts.write().unwrap()) = Instant::now();
1271 0 :
1272 0 : // Wake up the layer flusher
1273 0 : self.flush_frozen_layers();
1274 0 : }
1275 0 : Ok(())
1276 0 : }
1277 :
1278 0 : pub(crate) fn activate(
1279 0 : self: &Arc<Self>,
1280 0 : broker_client: BrokerClientChannel,
1281 0 : background_jobs_can_start: Option<&completion::Barrier>,
1282 0 : ctx: &RequestContext,
1283 0 : ) {
1284 0 : if self.tenant_shard_id.is_zero() {
1285 0 : // Logical size is only maintained accurately on shard zero.
1286 0 : self.spawn_initial_logical_size_computation_task(ctx);
1287 0 : }
1288 0 : self.launch_wal_receiver(ctx, broker_client);
1289 0 : self.set_state(TimelineState::Active);
1290 0 : self.launch_eviction_task(background_jobs_can_start);
1291 0 : }
1292 :
1293 : /// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
1294 : /// also to remote storage. This method can easily take multiple seconds for a busy timeline.
1295 : ///
1296 : /// While we are flushing, we continue to accept read I/O.
1297 6 : pub(crate) async fn flush_and_shutdown(&self) {
1298 6 : debug_assert_current_span_has_tenant_and_timeline_id();
1299 :
1300 : // Stop ingesting data, so that we are not still writing to an InMemoryLayer while
1301 : // trying to flush
1302 0 : tracing::debug!("Waiting for WalReceiverManager...");
1303 6 : task_mgr::shutdown_tasks(
1304 6 : Some(TaskKind::WalReceiverManager),
1305 6 : Some(self.tenant_shard_id),
1306 6 : Some(self.timeline_id),
1307 6 : )
1308 0 : .await;
1309 :
1310 : // Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
1311 6 : self.last_record_lsn.shutdown();
1312 6 :
1313 6 : // now all writers to InMemory layer are gone, do the final flush if requested
1314 6 : match self.freeze_and_flush().await {
1315 : Ok(_) => {
1316 : // drain the upload queue
1317 6 : if let Some(client) = self.remote_client.as_ref() {
1318 : // if we did not wait for completion here, it might be our shutdown process
1319 : // didn't wait for remote uploads to complete at all, as new tasks can forever
1320 : // be spawned.
1321 : //
1322 : // what is problematic is the shutting down of RemoteTimelineClient, because
1323 : // obviously it does not make sense to stop while we wait for it, but what
1324 : // about corner cases like s3 suddenly hanging up?
1325 6 : if let Err(e) = client.shutdown().await {
1326 : // Non-fatal. Shutdown is infallible. Failures to flush just mean that
1327 : // we have some extra WAL replay to do next time the timeline starts.
1328 0 : warn!("failed to flush to remote storage: {e:#}");
1329 6 : }
1330 0 : }
1331 : }
1332 0 : Err(e) => {
1333 : // Non-fatal. Shutdown is infallible. Failures to flush just mean that
1334 : // we have some extra WAL replay to do next time the timeline starts.
1335 0 : warn!("failed to freeze and flush: {e:#}");
1336 : }
1337 : }
1338 :
1339 6 : self.shutdown().await;
1340 6 : }
1341 :
1342 : /// Shut down immediately, without waiting for any open layers to flush to disk. This is a subset of
1343 : /// the graceful [`Timeline::flush_and_shutdown`] function.
1344 8 : pub(crate) async fn shutdown(&self) {
1345 8 : debug_assert_current_span_has_tenant_and_timeline_id();
1346 :
1347 : // Signal any subscribers to our cancellation token to drop out
1348 0 : tracing::debug!("Cancelling CancellationToken");
1349 8 : self.cancel.cancel();
1350 8 :
1351 8 : // Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
1352 8 : // while doing so.
1353 8 : self.last_record_lsn.shutdown();
1354 8 :
1355 8 : // Shut down the layer flush task before the remote client, as one depends on the other
1356 8 : task_mgr::shutdown_tasks(
1357 8 : Some(TaskKind::LayerFlushTask),
1358 8 : Some(self.tenant_shard_id),
1359 8 : Some(self.timeline_id),
1360 8 : )
1361 6 : .await;
1362 :
1363 : // Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
1364 : // case our caller wants to use that for a deletion
1365 8 : if let Some(remote_client) = self.remote_client.as_ref() {
1366 8 : match remote_client.stop() {
1367 8 : Ok(()) => {}
1368 0 : Err(StopError::QueueUninitialized) => {
1369 0 : // Shutting down during initialization is legal
1370 0 : }
1371 : }
1372 0 : }
1373 :
1374 0 : tracing::debug!("Waiting for tasks...");
1375 :
1376 8 : task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;
1377 :
1378 : // Finally wait until any gate-holders are complete
1379 8 : self.gate.close().await;
1380 8 : }
1381 :
1382 298 : pub(crate) fn set_state(&self, new_state: TimelineState) {
1383 298 : match (self.current_state(), new_state) {
1384 298 : (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
1385 2 : info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
1386 : }
1387 0 : (st, TimelineState::Loading) => {
1388 0 : error!("ignoring transition from {st:?} into Loading state");
1389 : }
1390 0 : (TimelineState::Broken { .. }, new_state) => {
1391 0 : error!("Ignoring state update {new_state:?} for broken timeline");
1392 : }
1393 : (TimelineState::Stopping, TimelineState::Active) => {
1394 0 : error!("Not activating a Stopping timeline");
1395 : }
1396 296 : (_, new_state) => {
1397 296 : self.state.send_replace(new_state);
1398 296 : }
1399 : }
1400 298 : }
1401 :
1402 2 : pub(crate) fn set_broken(&self, reason: String) {
1403 2 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
1404 2 : let broken_state = TimelineState::Broken {
1405 2 : reason,
1406 2 : backtrace: backtrace_str,
1407 2 : };
1408 2 : self.set_state(broken_state);
1409 2 :
1410 2 : // Although the Broken state is not equivalent to shutdown() (shutdown will be called
1411 2 : // later when this tenant is detach or the process shuts down), firing the cancellation token
1412 2 : // here avoids the need for other tasks to watch for the Broken state explicitly.
1413 2 : self.cancel.cancel();
1414 2 : }
1415 :
1416 228097 : pub(crate) fn current_state(&self) -> TimelineState {
1417 228097 : self.state.borrow().clone()
1418 228097 : }
1419 :
1420 6 : pub(crate) fn is_broken(&self) -> bool {
1421 6 : matches!(&*self.state.borrow(), TimelineState::Broken { .. })
1422 6 : }
1423 :
1424 226983 : pub(crate) fn is_active(&self) -> bool {
1425 226983 : self.current_state() == TimelineState::Active
1426 226983 : }
1427 :
1428 816 : pub(crate) fn is_stopping(&self) -> bool {
1429 816 : self.current_state() == TimelineState::Stopping
1430 816 : }
1431 :
1432 0 : pub(crate) fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
1433 0 : self.state.subscribe()
1434 0 : }
1435 :
1436 226769 : pub(crate) async fn wait_to_become_active(
1437 226769 : &self,
1438 226769 : _ctx: &RequestContext, // Prepare for use by cancellation
1439 226769 : ) -> Result<(), TimelineState> {
1440 226769 : let mut receiver = self.state.subscribe();
1441 226769 : loop {
1442 226769 : let current_state = receiver.borrow().clone();
1443 226769 : match current_state {
1444 : TimelineState::Loading => {
1445 0 : receiver
1446 0 : .changed()
1447 0 : .await
1448 0 : .expect("holding a reference to self");
1449 : }
1450 : TimelineState::Active { .. } => {
1451 226767 : return Ok(());
1452 : }
1453 : TimelineState::Broken { .. } | TimelineState::Stopping => {
1454 : // There's no chance the timeline can transition back into ::Active
1455 2 : return Err(current_state);
1456 : }
1457 : }
1458 : }
1459 226769 : }
1460 :
1461 0 : pub(crate) async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
1462 0 : let guard = self.layers.read().await;
1463 0 : let layer_map = guard.layer_map();
1464 0 : let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
1465 0 : if let Some(open_layer) = &layer_map.open_layer {
1466 0 : in_memory_layers.push(open_layer.info());
1467 0 : }
1468 0 : for frozen_layer in &layer_map.frozen_layers {
1469 0 : in_memory_layers.push(frozen_layer.info());
1470 0 : }
1471 :
1472 0 : let mut historic_layers = Vec::new();
1473 0 : for historic_layer in layer_map.iter_historic_layers() {
1474 0 : let historic_layer = guard.get_from_desc(&historic_layer);
1475 0 : historic_layers.push(historic_layer.info(reset));
1476 0 : }
1477 :
1478 0 : LayerMapInfo {
1479 0 : in_memory_layers,
1480 0 : historic_layers,
1481 0 : }
1482 0 : }
1483 :
1484 0 : #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
1485 : pub(crate) async fn download_layer(
1486 : &self,
1487 : layer_file_name: &str,
1488 : ) -> anyhow::Result<Option<bool>> {
1489 : let Some(layer) = self.find_layer(layer_file_name).await else {
1490 : return Ok(None);
1491 : };
1492 :
1493 : if self.remote_client.is_none() {
1494 : return Ok(Some(false));
1495 : }
1496 :
1497 : layer.download().await?;
1498 :
1499 : Ok(Some(true))
1500 : }
1501 :
1502 : /// Evict just one layer.
1503 : ///
1504 : /// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`.
1505 0 : pub(crate) async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
1506 0 : let _gate = self
1507 0 : .gate
1508 0 : .enter()
1509 0 : .map_err(|_| anyhow::anyhow!("Shutting down"))?;
1510 :
1511 0 : let Some(local_layer) = self.find_layer(layer_file_name).await else {
1512 0 : return Ok(None);
1513 : };
1514 :
1515 0 : match local_layer.evict_and_wait().await {
1516 0 : Ok(()) => Ok(Some(true)),
1517 0 : Err(EvictionError::NotFound) => Ok(Some(false)),
1518 0 : Err(EvictionError::Downloaded) => Ok(Some(false)),
1519 : }
1520 0 : }
1521 : }
1522 :
1523 : /// Number of times we will compute partition within a checkpoint distance.
1524 : const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
1525 :
1526 : // Private functions
1527 : impl Timeline {
1528 0 : pub(crate) fn get_lazy_slru_download(&self) -> bool {
1529 0 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
1530 0 : tenant_conf
1531 0 : .lazy_slru_download
1532 0 : .unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
1533 0 : }
1534 :
1535 704 : fn get_checkpoint_distance(&self) -> u64 {
1536 704 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
1537 704 : tenant_conf
1538 704 : .checkpoint_distance
1539 704 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
1540 704 : }
1541 :
1542 0 : fn get_checkpoint_timeout(&self) -> Duration {
1543 0 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
1544 0 : tenant_conf
1545 0 : .checkpoint_timeout
1546 0 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
1547 0 : }
1548 :
1549 482 : fn get_compaction_target_size(&self) -> u64 {
1550 482 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
1551 482 : tenant_conf
1552 482 : .compaction_target_size
1553 482 : .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
1554 482 : }
1555 :
1556 408 : fn get_compaction_threshold(&self) -> usize {
1557 408 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
1558 408 : tenant_conf
1559 408 : .compaction_threshold
1560 408 : .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
1561 408 : }
1562 :
1563 408 : fn get_image_creation_threshold(&self) -> usize {
1564 408 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
1565 408 : tenant_conf
1566 408 : .image_creation_threshold
1567 408 : .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
1568 408 : }
1569 :
1570 408 : fn get_compaction_algorithm(&self) -> CompactionAlgorithm {
1571 408 : let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
1572 408 : tenant_conf
1573 408 : .compaction_algorithm
1574 408 : .unwrap_or(self.conf.default_tenant_conf.compaction_algorithm)
1575 408 : }
1576 :
1577 0 : fn get_eviction_policy(&self) -> EvictionPolicy {
1578 0 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
1579 0 : tenant_conf
1580 0 : .eviction_policy
1581 0 : .unwrap_or(self.conf.default_tenant_conf.eviction_policy)
1582 0 : }
1583 :
1584 296 : fn get_evictions_low_residence_duration_metric_threshold(
1585 296 : tenant_conf: &TenantConfOpt,
1586 296 : default_tenant_conf: &TenantConf,
1587 296 : ) -> Duration {
1588 296 : tenant_conf
1589 296 : .evictions_low_residence_duration_metric_threshold
1590 296 : .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
1591 296 : }
1592 :
1593 0 : pub(super) fn tenant_conf_updated(&self) {
1594 0 : // NB: Most tenant conf options are read by background loops, so,
1595 0 : // changes will automatically be picked up.
1596 0 :
1597 0 : // The threshold is embedded in the metric. So, we need to update it.
1598 0 : {
1599 0 : let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
1600 0 : &self.tenant_conf.read().unwrap().tenant_conf,
1601 0 : &self.conf.default_tenant_conf,
1602 0 : );
1603 0 :
1604 0 : let tenant_id_str = self.tenant_shard_id.tenant_id.to_string();
1605 0 : let shard_id_str = format!("{}", self.tenant_shard_id.shard_slug());
1606 0 :
1607 0 : let timeline_id_str = self.timeline_id.to_string();
1608 0 : self.metrics
1609 0 : .evictions_with_low_residence_duration
1610 0 : .write()
1611 0 : .unwrap()
1612 0 : .change_threshold(
1613 0 : &tenant_id_str,
1614 0 : &shard_id_str,
1615 0 : &timeline_id_str,
1616 0 : new_threshold,
1617 0 : );
1618 0 : }
1619 0 : }
1620 :
1621 : /// Open a Timeline handle.
1622 : ///
1623 : /// Loads the metadata for the timeline into memory, but not the layer map.
1624 : #[allow(clippy::too_many_arguments)]
1625 296 : pub(super) fn new(
1626 296 : conf: &'static PageServerConf,
1627 296 : tenant_conf: Arc<RwLock<AttachedTenantConf>>,
1628 296 : metadata: &TimelineMetadata,
1629 296 : ancestor: Option<Arc<Timeline>>,
1630 296 : timeline_id: TimelineId,
1631 296 : tenant_shard_id: TenantShardId,
1632 296 : generation: Generation,
1633 296 : shard_identity: ShardIdentity,
1634 296 : walredo_mgr: Option<Arc<super::WalRedoManager>>,
1635 296 : resources: TimelineResources,
1636 296 : pg_version: u32,
1637 296 : state: TimelineState,
1638 296 : cancel: CancellationToken,
1639 296 : ) -> Arc<Self> {
1640 296 : let disk_consistent_lsn = metadata.disk_consistent_lsn();
1641 296 : let (state, _) = watch::channel(state);
1642 296 :
1643 296 : let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
1644 296 : let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
1645 296 :
1646 296 : let tenant_conf_guard = tenant_conf.read().unwrap();
1647 296 :
1648 296 : let evictions_low_residence_duration_metric_threshold =
1649 296 : Self::get_evictions_low_residence_duration_metric_threshold(
1650 296 : &tenant_conf_guard.tenant_conf,
1651 296 : &conf.default_tenant_conf,
1652 296 : );
1653 296 : drop(tenant_conf_guard);
1654 296 :
1655 296 : Arc::new_cyclic(|myself| {
1656 296 : let mut result = Timeline {
1657 296 : conf,
1658 296 : tenant_conf,
1659 296 : myself: myself.clone(),
1660 296 : timeline_id,
1661 296 : tenant_shard_id,
1662 296 : generation,
1663 296 : shard_identity,
1664 296 : pg_version,
1665 296 : layers: Default::default(),
1666 296 :
1667 296 : walredo_mgr,
1668 296 : walreceiver: Mutex::new(None),
1669 296 :
1670 296 : remote_client: resources.remote_client.map(Arc::new),
1671 296 :
1672 296 : // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
1673 296 : last_record_lsn: SeqWait::new(RecordLsn {
1674 296 : last: disk_consistent_lsn,
1675 296 : prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
1676 296 : }),
1677 296 : disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
1678 296 :
1679 296 : last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
1680 296 : last_freeze_ts: RwLock::new(Instant::now()),
1681 296 :
1682 296 : loaded_at: (disk_consistent_lsn, SystemTime::now()),
1683 296 :
1684 296 : ancestor_timeline: ancestor,
1685 296 : ancestor_lsn: metadata.ancestor_lsn(),
1686 296 :
1687 296 : metrics: TimelineMetrics::new(
1688 296 : &tenant_shard_id,
1689 296 : &timeline_id,
1690 296 : crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
1691 296 : "mtime",
1692 296 : evictions_low_residence_duration_metric_threshold,
1693 296 : ),
1694 296 : ),
1695 296 :
1696 296 : query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new(
1697 296 : &tenant_shard_id,
1698 296 : &timeline_id,
1699 296 : ),
1700 296 :
1701 2072 : directory_metrics: array::from_fn(|_| AtomicU64::new(0)),
1702 296 :
1703 296 : flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
1704 296 :
1705 296 : layer_flush_start_tx,
1706 296 : layer_flush_done_tx,
1707 296 :
1708 296 : write_lock: tokio::sync::Mutex::new(()),
1709 296 :
1710 296 : gc_info: std::sync::RwLock::new(GcInfo {
1711 296 : retain_lsns: Vec::new(),
1712 296 : horizon_cutoff: Lsn(0),
1713 296 : pitr_cutoff: Lsn(0),
1714 296 : }),
1715 296 :
1716 296 : latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
1717 296 : initdb_lsn: metadata.initdb_lsn(),
1718 296 :
1719 296 : current_logical_size: if disk_consistent_lsn.is_valid() {
1720 : // we're creating timeline data with some layer files existing locally,
1721 : // need to recalculate timeline's logical size based on data in the layers.
1722 216 : LogicalSize::deferred_initial(disk_consistent_lsn)
1723 : } else {
1724 : // we're creating timeline data without any layers existing locally,
1725 : // initial logical size is 0.
1726 80 : LogicalSize::empty_initial()
1727 : },
1728 296 : partitioning: tokio::sync::Mutex::new((KeyPartitioning::new(), Lsn(0))),
1729 296 : repartition_threshold: 0,
1730 296 :
1731 296 : last_received_wal: Mutex::new(None),
1732 296 : rel_size_cache: RwLock::new(HashMap::new()),
1733 296 :
1734 296 : download_all_remote_layers_task_info: RwLock::new(None),
1735 296 :
1736 296 : state,
1737 296 :
1738 296 : eviction_task_timeline_state: tokio::sync::Mutex::new(
1739 296 : EvictionTaskTimelineState::default(),
1740 296 : ),
1741 296 : delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
1742 296 :
1743 296 : cancel,
1744 296 : gate: Gate::default(),
1745 296 :
1746 296 : compaction_lock: tokio::sync::Mutex::default(),
1747 296 : gc_lock: tokio::sync::Mutex::default(),
1748 296 :
1749 296 : timeline_get_throttle: resources.timeline_get_throttle,
1750 296 :
1751 296 : aux_files: tokio::sync::Mutex::new(AuxFilesState {
1752 296 : dir: None,
1753 296 : n_deltas: 0,
1754 296 : }),
1755 296 : };
1756 296 : result.repartition_threshold =
1757 296 : result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
1758 296 : result
1759 296 : .metrics
1760 296 : .last_record_gauge
1761 296 : .set(disk_consistent_lsn.0 as i64);
1762 296 : result
1763 296 : })
1764 296 : }
1765 :
1766 366 : pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
1767 366 : let Ok(guard) = self.gate.enter() else {
1768 0 : info!("cannot start flush loop when the timeline gate has already been closed");
1769 0 : return;
1770 : };
1771 366 : let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
1772 366 : match *flush_loop_state {
1773 292 : FlushLoopState::NotStarted => (),
1774 : FlushLoopState::Running { .. } => {
1775 74 : info!(
1776 74 : "skipping attempt to start flush_loop twice {}/{}",
1777 74 : self.tenant_shard_id, self.timeline_id
1778 74 : );
1779 74 : return;
1780 : }
1781 : FlushLoopState::Exited => {
1782 0 : warn!(
1783 0 : "ignoring attempt to restart exited flush_loop {}/{}",
1784 0 : self.tenant_shard_id, self.timeline_id
1785 0 : );
1786 0 : return;
1787 : }
1788 : }
1789 :
1790 292 : let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
1791 292 : let self_clone = Arc::clone(self);
1792 292 :
1793 292 : debug!("spawning flush loop");
1794 292 : *flush_loop_state = FlushLoopState::Running {
1795 292 : #[cfg(test)]
1796 292 : expect_initdb_optimization: false,
1797 292 : #[cfg(test)]
1798 292 : initdb_optimization_count: 0,
1799 292 : };
1800 292 : task_mgr::spawn(
1801 292 : task_mgr::BACKGROUND_RUNTIME.handle(),
1802 292 : task_mgr::TaskKind::LayerFlushTask,
1803 292 : Some(self.tenant_shard_id),
1804 292 : Some(self.timeline_id),
1805 292 : "layer flush task",
1806 : false,
1807 292 : async move {
1808 292 : let _guard = guard;
1809 292 : let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
1810 2186 : self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
1811 8 : let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
1812 8 : assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
1813 8 : *flush_loop_state = FlushLoopState::Exited;
1814 8 : Ok(())
1815 8 : }
1816 292 : .instrument(info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
1817 : );
1818 366 : }
1819 :
1820 : /// Creates and starts the wal receiver.
1821 : ///
1822 : /// This function is expected to be called at most once per Timeline's lifecycle
1823 : /// when the timeline is activated.
1824 0 : fn launch_wal_receiver(
1825 0 : self: &Arc<Self>,
1826 0 : ctx: &RequestContext,
1827 0 : broker_client: BrokerClientChannel,
1828 0 : ) {
1829 0 : info!(
1830 0 : "launching WAL receiver for timeline {} of tenant {}",
1831 0 : self.timeline_id, self.tenant_shard_id
1832 0 : );
1833 :
1834 0 : let tenant_conf_guard = self.tenant_conf.read().unwrap();
1835 0 : let wal_connect_timeout = tenant_conf_guard
1836 0 : .tenant_conf
1837 0 : .walreceiver_connect_timeout
1838 0 : .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
1839 0 : let lagging_wal_timeout = tenant_conf_guard
1840 0 : .tenant_conf
1841 0 : .lagging_wal_timeout
1842 0 : .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
1843 0 : let max_lsn_wal_lag = tenant_conf_guard
1844 0 : .tenant_conf
1845 0 : .max_lsn_wal_lag
1846 0 : .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
1847 0 : drop(tenant_conf_guard);
1848 0 :
1849 0 : let mut guard = self.walreceiver.lock().unwrap();
1850 0 : assert!(
1851 0 : guard.is_none(),
1852 0 : "multiple launches / re-launches of WAL receiver are not supported"
1853 : );
1854 0 : *guard = Some(WalReceiver::start(
1855 0 : Arc::clone(self),
1856 0 : WalReceiverConf {
1857 0 : wal_connect_timeout,
1858 0 : lagging_wal_timeout,
1859 0 : max_lsn_wal_lag,
1860 0 : auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
1861 0 : availability_zone: self.conf.availability_zone.clone(),
1862 0 : ingest_batch_size: self.conf.ingest_batch_size,
1863 0 : },
1864 0 : broker_client,
1865 0 : ctx,
1866 0 : ));
1867 0 : }
1868 :
1869 : /// Initialize with an empty layer map. Used when creating a new timeline.
1870 290 : pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
1871 290 : let mut layers = self.layers.try_write().expect(
1872 290 : "in the context where we call this function, no other task has access to the object",
1873 290 : );
1874 290 : layers.initialize_empty(Lsn(start_lsn.0));
1875 290 : }
1876 :
1877 : /// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
1878 : /// files.
1879 6 : pub(super) async fn load_layer_map(
1880 6 : &self,
1881 6 : disk_consistent_lsn: Lsn,
1882 6 : index_part: Option<IndexPart>,
1883 6 : ) -> anyhow::Result<()> {
1884 : use init::{Decision::*, Discovered, DismissedLayer};
1885 : use LayerFileName::*;
1886 :
1887 6 : let mut guard = self.layers.write().await;
1888 :
1889 6 : let timer = self.metrics.load_layer_map_histo.start_timer();
1890 6 :
1891 6 : // Scan timeline directory and create ImageFileName and DeltaFilename
1892 6 : // structs representing all files on disk
1893 6 : let timeline_path = self
1894 6 : .conf
1895 6 : .timeline_path(&self.tenant_shard_id, &self.timeline_id);
1896 6 : let conf = self.conf;
1897 6 : let span = tracing::Span::current();
1898 6 :
1899 6 : // Copy to move into the task we're about to spawn
1900 6 : let generation = self.generation;
1901 6 : let shard = self.get_shard_index();
1902 6 : let this = self.myself.upgrade().expect("&self method holds the arc");
1903 :
1904 6 : let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
1905 6 : move || {
1906 6 : let _g = span.entered();
1907 6 : let discovered = init::scan_timeline_dir(&timeline_path)?;
1908 6 : let mut discovered_layers = Vec::with_capacity(discovered.len());
1909 6 : let mut unrecognized_files = Vec::new();
1910 6 :
1911 6 : let mut path = timeline_path;
1912 :
1913 22 : for discovered in discovered {
1914 16 : let (name, kind) = match discovered {
1915 16 : Discovered::Layer(file_name, file_size) => {
1916 16 : discovered_layers.push((file_name, file_size));
1917 16 : continue;
1918 : }
1919 : Discovered::Metadata => {
1920 0 : warn!("found legacy metadata file, these should have been removed in load_tenant_config");
1921 0 : continue;
1922 : }
1923 : Discovered::IgnoredBackup => {
1924 0 : continue;
1925 : }
1926 0 : Discovered::Unknown(file_name) => {
1927 0 : // we will later error if there are any
1928 0 : unrecognized_files.push(file_name);
1929 0 : continue;
1930 : }
1931 0 : Discovered::Ephemeral(name) => (name, "old ephemeral file"),
1932 0 : Discovered::Temporary(name) => (name, "temporary timeline file"),
1933 0 : Discovered::TemporaryDownload(name) => (name, "temporary download"),
1934 : };
1935 0 : path.push(Utf8Path::new(&name));
1936 0 : init::cleanup(&path, kind)?;
1937 0 : path.pop();
1938 : }
1939 :
1940 6 : if !unrecognized_files.is_empty() {
1941 : // assume that if there are any there are many many.
1942 0 : let n = unrecognized_files.len();
1943 0 : let first = &unrecognized_files[..n.min(10)];
1944 0 : anyhow::bail!(
1945 0 : "unrecognized files in timeline dir (total {n}), first 10: {first:?}"
1946 0 : );
1947 6 : }
1948 6 :
1949 6 : let decided = init::reconcile(
1950 6 : discovered_layers,
1951 6 : index_part.as_ref(),
1952 6 : disk_consistent_lsn,
1953 6 : generation,
1954 6 : shard,
1955 6 : );
1956 6 :
1957 6 : let mut loaded_layers = Vec::new();
1958 6 : let mut needs_cleanup = Vec::new();
1959 6 : let mut total_physical_size = 0;
1960 :
1961 22 : for (name, decision) in decided {
1962 16 : let decision = match decision {
1963 0 : Ok(UseRemote { local, remote }) => {
1964 0 : // Remote is authoritative, but we may still choose to retain
1965 0 : // the local file if the contents appear to match
1966 0 : if local.file_size() == remote.file_size() {
1967 : // Use the local file, but take the remote metadata so that we pick up
1968 : // the correct generation.
1969 0 : UseLocal(remote)
1970 : } else {
1971 0 : path.push(name.file_name());
1972 0 : init::cleanup_local_file_for_remote(&path, &local, &remote)?;
1973 0 : path.pop();
1974 0 : UseRemote { local, remote }
1975 : }
1976 : }
1977 16 : Ok(decision) => decision,
1978 0 : Err(DismissedLayer::Future { local }) => {
1979 0 : if local.is_some() {
1980 0 : path.push(name.file_name());
1981 0 : init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
1982 0 : path.pop();
1983 0 : }
1984 0 : needs_cleanup.push(name);
1985 0 : continue;
1986 : }
1987 0 : Err(DismissedLayer::LocalOnly(local)) => {
1988 0 : path.push(name.file_name());
1989 0 : init::cleanup_local_only_file(&path, &name, &local)?;
1990 0 : path.pop();
1991 0 : // this file never existed remotely, we will have to do rework
1992 0 : continue;
1993 : }
1994 : };
1995 :
1996 16 : match &name {
1997 12 : Delta(d) => assert!(d.lsn_range.end <= disk_consistent_lsn + 1),
1998 4 : Image(i) => assert!(i.lsn <= disk_consistent_lsn),
1999 : }
2000 :
2001 16 : tracing::debug!(layer=%name, ?decision, "applied");
2002 :
2003 16 : let layer = match decision {
2004 16 : UseLocal(m) => {
2005 16 : total_physical_size += m.file_size();
2006 16 : Layer::for_resident(conf, &this, name, m).drop_eviction_guard()
2007 : }
2008 0 : Evicted(remote) | UseRemote { remote, .. } => {
2009 0 : Layer::for_evicted(conf, &this, name, remote)
2010 : }
2011 : };
2012 :
2013 16 : loaded_layers.push(layer);
2014 : }
2015 6 : Ok((loaded_layers, needs_cleanup, total_physical_size))
2016 6 : }
2017 6 : })
2018 6 : .await
2019 6 : .map_err(anyhow::Error::new)
2020 6 : .and_then(|x| x)?;
2021 :
2022 6 : let num_layers = loaded_layers.len();
2023 6 :
2024 6 : guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
2025 :
2026 6 : if let Some(rtc) = self.remote_client.as_ref() {
2027 6 : rtc.schedule_layer_file_deletion(&needs_cleanup)?;
2028 6 : rtc.schedule_index_upload_for_file_changes()?;
2029 : // This barrier orders above DELETEs before any later operations.
2030 : // This is critical because code executing after the barrier might
2031 : // create again objects with the same key that we just scheduled for deletion.
2032 : // For example, if we just scheduled deletion of an image layer "from the future",
2033 : // later compaction might run again and re-create the same image layer.
2034 : // "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
2035 : // "same" here means same key range and LSN.
2036 : //
2037 : // Without a barrier between above DELETEs and the re-creation's PUTs,
2038 : // the upload queue may execute the PUT first, then the DELETE.
2039 : // In our example, we will end up with an IndexPart referencing a non-existent object.
2040 : //
2041 : // 1. a future image layer is created and uploaded
2042 : // 2. ps restart
2043 : // 3. the future layer from (1) is deleted during load layer map
2044 : // 4. image layer is re-created and uploaded
2045 : // 5. deletion queue would like to delete (1) but actually deletes (4)
2046 : // 6. delete by name works as expected, but it now deletes the wrong (later) version
2047 : //
2048 : // See https://github.com/neondatabase/neon/issues/5878
2049 : //
2050 : // NB: generation numbers naturally protect against this because they disambiguate
2051 : // (1) and (4)
2052 6 : rtc.schedule_barrier()?;
2053 : // Tenant::create_timeline will wait for these uploads to happen before returning, or
2054 : // on retry.
2055 0 : }
2056 :
2057 6 : info!(
2058 6 : "loaded layer map with {} layers at {}, total physical size: {}",
2059 6 : num_layers, disk_consistent_lsn, total_physical_size
2060 6 : );
2061 :
2062 6 : timer.stop_and_record();
2063 6 : Ok(())
2064 6 : }
2065 :
2066 : /// Retrieve current logical size of the timeline.
2067 : ///
2068 : /// The size could be lagging behind the actual number, in case
2069 : /// the initial size calculation has not been run (gets triggered on the first size access).
2070 : ///
2071 : /// return size and boolean flag that shows if the size is exact
2072 0 : pub(crate) fn get_current_logical_size(
2073 0 : self: &Arc<Self>,
2074 0 : priority: GetLogicalSizePriority,
2075 0 : ctx: &RequestContext,
2076 0 : ) -> logical_size::CurrentLogicalSize {
2077 0 : if !self.tenant_shard_id.is_zero() {
2078 : // Logical size is only accurately maintained on shard zero: when called elsewhere, for example
2079 : // when HTTP API is serving a GET for timeline zero, return zero
2080 0 : return logical_size::CurrentLogicalSize::Approximate(logical_size::Approximate::zero());
2081 0 : }
2082 0 :
2083 0 : let current_size = self.current_logical_size.current_size();
2084 0 : debug!("Current size: {current_size:?}");
2085 :
2086 0 : match (current_size.accuracy(), priority) {
2087 0 : (logical_size::Accuracy::Exact, _) => (), // nothing to do
2088 0 : (logical_size::Accuracy::Approximate, GetLogicalSizePriority::Background) => {
2089 0 : // background task will eventually deliver an exact value, we're in no rush
2090 0 : }
2091 : (logical_size::Accuracy::Approximate, GetLogicalSizePriority::User) => {
2092 : // background task is not ready, but user is asking for it now;
2093 : // => make the background task skip the line
2094 : // (The alternative would be to calculate the size here, but,
2095 : // it can actually take a long time if the user has a lot of rels.
2096 : // And we'll inevitable need it again; So, let the background task do the work.)
2097 0 : match self
2098 0 : .current_logical_size
2099 0 : .cancel_wait_for_background_loop_concurrency_limit_semaphore
2100 0 : .get()
2101 : {
2102 0 : Some(cancel) => cancel.cancel(),
2103 : None => {
2104 0 : let state = self.current_state();
2105 0 : if matches!(
2106 0 : state,
2107 : TimelineState::Broken { .. } | TimelineState::Stopping
2108 0 : ) {
2109 0 :
2110 0 : // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
2111 0 : // Don't make noise.
2112 0 : } else {
2113 0 : warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work");
2114 : }
2115 : }
2116 : };
2117 : }
2118 : }
2119 :
2120 0 : if let CurrentLogicalSize::Approximate(_) = ¤t_size {
2121 0 : if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler {
2122 0 : let first = self
2123 0 : .current_logical_size
2124 0 : .did_return_approximate_to_walreceiver
2125 0 : .compare_exchange(
2126 0 : false,
2127 0 : true,
2128 0 : AtomicOrdering::Relaxed,
2129 0 : AtomicOrdering::Relaxed,
2130 0 : )
2131 0 : .is_ok();
2132 0 : if first {
2133 0 : crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc();
2134 0 : }
2135 0 : }
2136 0 : }
2137 :
2138 0 : current_size
2139 0 : }
2140 :
2141 0 : fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &RequestContext) {
2142 0 : let Some(initial_part_end) = self.current_logical_size.initial_part_end else {
2143 : // nothing to do for freshly created timelines;
2144 0 : assert_eq!(
2145 0 : self.current_logical_size.current_size().accuracy(),
2146 0 : logical_size::Accuracy::Exact,
2147 0 : );
2148 0 : self.current_logical_size.initialized.add_permits(1);
2149 0 : return;
2150 : };
2151 :
2152 0 : let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new();
2153 0 : let token = cancel_wait_for_background_loop_concurrency_limit_semaphore.clone();
2154 0 : self.current_logical_size
2155 0 : .cancel_wait_for_background_loop_concurrency_limit_semaphore.set(token)
2156 0 : .expect("initial logical size calculation task must be spawned exactly once per Timeline object");
2157 0 :
2158 0 : let self_clone = Arc::clone(self);
2159 0 : let background_ctx = ctx.detached_child(
2160 0 : TaskKind::InitialLogicalSizeCalculation,
2161 0 : DownloadBehavior::Download,
2162 0 : );
2163 0 : task_mgr::spawn(
2164 0 : task_mgr::BACKGROUND_RUNTIME.handle(),
2165 0 : task_mgr::TaskKind::InitialLogicalSizeCalculation,
2166 0 : Some(self.tenant_shard_id),
2167 0 : Some(self.timeline_id),
2168 0 : "initial size calculation",
2169 : false,
2170 : // NB: don't log errors here, task_mgr will do that.
2171 0 : async move {
2172 0 : let cancel = task_mgr::shutdown_token();
2173 0 : self_clone
2174 0 : .initial_logical_size_calculation_task(
2175 0 : initial_part_end,
2176 0 : cancel_wait_for_background_loop_concurrency_limit_semaphore,
2177 0 : cancel,
2178 0 : background_ctx,
2179 0 : )
2180 0 : .await;
2181 0 : Ok(())
2182 0 : }
2183 0 : .instrument(info_span!(parent: None, "initial_size_calculation", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id)),
2184 : );
2185 0 : }
2186 :
2187 0 : async fn initial_logical_size_calculation_task(
2188 0 : self: Arc<Self>,
2189 0 : initial_part_end: Lsn,
2190 0 : skip_concurrency_limiter: CancellationToken,
2191 0 : cancel: CancellationToken,
2192 0 : background_ctx: RequestContext,
2193 0 : ) {
2194 0 : scopeguard::defer! {
2195 0 : // Irrespective of the outcome of this operation, we should unblock anyone waiting for it.
2196 0 : self.current_logical_size.initialized.add_permits(1);
2197 0 : }
2198 :
2199 : enum BackgroundCalculationError {
2200 : Cancelled,
2201 : Other(anyhow::Error),
2202 : }
2203 :
2204 0 : let try_once = |attempt: usize| {
2205 0 : let background_ctx = &background_ctx;
2206 0 : let self_ref = &self;
2207 0 : let skip_concurrency_limiter = &skip_concurrency_limiter;
2208 0 : async move {
2209 0 : let cancel = task_mgr::shutdown_token();
2210 0 : let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
2211 0 : BackgroundLoopKind::InitialLogicalSizeCalculation,
2212 0 : background_ctx,
2213 0 : );
2214 :
2215 : use crate::metrics::initial_logical_size::StartCircumstances;
2216 0 : let (_maybe_permit, circumstances) = tokio::select! {
2217 0 : permit = wait_for_permit => {
2218 : (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit)
2219 : }
2220 : _ = self_ref.cancel.cancelled() => {
2221 : return Err(BackgroundCalculationError::Cancelled);
2222 : }
2223 : _ = cancel.cancelled() => {
2224 : return Err(BackgroundCalculationError::Cancelled);
2225 : },
2226 : () = skip_concurrency_limiter.cancelled() => {
2227 : // Some action that is part of a end user interaction requested logical size
2228 : // => break out of the rate limit
2229 : // TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime;
2230 : // but then again what happens if they cancel; also, we should just be using
2231 : // one runtime across the entire process, so, let's leave this for now.
2232 : (None, StartCircumstances::SkippedConcurrencyLimiter)
2233 : }
2234 : };
2235 :
2236 0 : let metrics_guard = if attempt == 1 {
2237 0 : crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances)
2238 : } else {
2239 0 : crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
2240 : };
2241 :
2242 0 : match self_ref
2243 0 : .logical_size_calculation_task(
2244 0 : initial_part_end,
2245 0 : LogicalSizeCalculationCause::Initial,
2246 0 : background_ctx,
2247 0 : )
2248 0 : .await
2249 : {
2250 0 : Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
2251 : Err(CalculateLogicalSizeError::Cancelled) => {
2252 0 : Err(BackgroundCalculationError::Cancelled)
2253 : }
2254 0 : Err(CalculateLogicalSizeError::Other(err)) => {
2255 0 : if let Some(PageReconstructError::AncestorStopping(_)) =
2256 0 : err.root_cause().downcast_ref()
2257 : {
2258 0 : Err(BackgroundCalculationError::Cancelled)
2259 : } else {
2260 0 : Err(BackgroundCalculationError::Other(err))
2261 : }
2262 : }
2263 : }
2264 0 : }
2265 0 : };
2266 :
2267 0 : let retrying = async {
2268 0 : let mut attempt = 0;
2269 0 : loop {
2270 0 : attempt += 1;
2271 0 :
2272 0 : match try_once(attempt).await {
2273 0 : Ok(res) => return ControlFlow::Continue(res),
2274 0 : Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
2275 0 : Err(BackgroundCalculationError::Other(e)) => {
2276 0 : warn!(attempt, "initial size calculation failed: {e:?}");
2277 : // exponential back-off doesn't make sense at these long intervals;
2278 : // use fixed retry interval with generous jitter instead
2279 0 : let sleep_duration = Duration::from_secs(
2280 0 : u64::try_from(
2281 0 : // 1hour base
2282 0 : (60_i64 * 60_i64)
2283 0 : // 10min jitter
2284 0 : + rand::thread_rng().gen_range(-10 * 60..10 * 60),
2285 0 : )
2286 0 : .expect("10min < 1hour"),
2287 0 : );
2288 0 : tokio::time::sleep(sleep_duration).await;
2289 : }
2290 : }
2291 : }
2292 0 : };
2293 :
2294 0 : let (calculated_size, metrics_guard) = tokio::select! {
2295 0 : res = retrying => {
2296 : match res {
2297 : ControlFlow::Continue(calculated_size) => calculated_size,
2298 : ControlFlow::Break(()) => return,
2299 : }
2300 : }
2301 : _ = cancel.cancelled() => {
2302 : return;
2303 : }
2304 : };
2305 :
2306 : // we cannot query current_logical_size.current_size() to know the current
2307 : // *negative* value, only truncated to u64.
2308 0 : let added = self
2309 0 : .current_logical_size
2310 0 : .size_added_after_initial
2311 0 : .load(AtomicOrdering::Relaxed);
2312 0 :
2313 0 : let sum = calculated_size.saturating_add_signed(added);
2314 0 :
2315 0 : // set the gauge value before it can be set in `update_current_logical_size`.
2316 0 : self.metrics.current_logical_size_gauge.set(sum);
2317 0 :
2318 0 : self.current_logical_size
2319 0 : .initial_logical_size
2320 0 : .set((calculated_size, metrics_guard.calculation_result_saved()))
2321 0 : .ok()
2322 0 : .expect("only this task sets it");
2323 0 : }
2324 :
2325 0 : pub(crate) fn spawn_ondemand_logical_size_calculation(
2326 0 : self: &Arc<Self>,
2327 0 : lsn: Lsn,
2328 0 : cause: LogicalSizeCalculationCause,
2329 0 : ctx: RequestContext,
2330 0 : ) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
2331 0 : let (sender, receiver) = oneshot::channel();
2332 0 : let self_clone = Arc::clone(self);
2333 0 : // XXX if our caller loses interest, i.e., ctx is cancelled,
2334 0 : // we should stop the size calculation work and return an error.
2335 0 : // That would require restructuring this function's API to
2336 0 : // return the result directly, instead of a Receiver for the result.
2337 0 : let ctx = ctx.detached_child(
2338 0 : TaskKind::OndemandLogicalSizeCalculation,
2339 0 : DownloadBehavior::Download,
2340 0 : );
2341 0 : task_mgr::spawn(
2342 0 : task_mgr::BACKGROUND_RUNTIME.handle(),
2343 0 : task_mgr::TaskKind::OndemandLogicalSizeCalculation,
2344 0 : Some(self.tenant_shard_id),
2345 0 : Some(self.timeline_id),
2346 0 : "ondemand logical size calculation",
2347 0 : false,
2348 0 : async move {
2349 0 : let res = self_clone
2350 0 : .logical_size_calculation_task(lsn, cause, &ctx)
2351 0 : .await;
2352 0 : let _ = sender.send(res).ok();
2353 0 : Ok(()) // Receiver is responsible for handling errors
2354 0 : }
2355 0 : .in_current_span(),
2356 0 : );
2357 0 : receiver
2358 0 : }
2359 :
2360 : /// # Cancel-Safety
2361 : ///
2362 : /// This method is cancellation-safe.
2363 0 : #[instrument(skip_all)]
2364 : async fn logical_size_calculation_task(
2365 : self: &Arc<Self>,
2366 : lsn: Lsn,
2367 : cause: LogicalSizeCalculationCause,
2368 : ctx: &RequestContext,
2369 : ) -> Result<u64, CalculateLogicalSizeError> {
2370 : crate::span::debug_assert_current_span_has_tenant_and_timeline_id();
2371 : // We should never be calculating logical sizes on shard !=0, because these shards do not have
2372 : // accurate relation sizes, and they do not emit consumption metrics.
2373 : debug_assert!(self.tenant_shard_id.is_zero());
2374 :
2375 : let guard = self
2376 : .gate
2377 : .enter()
2378 0 : .map_err(|_| CalculateLogicalSizeError::Cancelled)?;
2379 :
2380 : let self_calculation = Arc::clone(self);
2381 :
2382 0 : let mut calculation = pin!(async {
2383 0 : let ctx = ctx.attached_child();
2384 0 : self_calculation
2385 0 : .calculate_logical_size(lsn, cause, &guard, &ctx)
2386 0 : .await
2387 0 : });
2388 :
2389 0 : tokio::select! {
2390 0 : res = &mut calculation => { res }
2391 : _ = self.cancel.cancelled() => {
2392 0 : debug!("cancelling logical size calculation for timeline shutdown");
2393 : calculation.await
2394 : }
2395 : _ = task_mgr::shutdown_watcher() => {
2396 0 : debug!("cancelling logical size calculation for task shutdown");
2397 : calculation.await
2398 : }
2399 : }
2400 : }
2401 :
2402 : /// Calculate the logical size of the database at the latest LSN.
2403 : ///
2404 : /// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
2405 : /// especially if we need to download remote layers.
2406 : ///
2407 : /// # Cancel-Safety
2408 : ///
2409 : /// This method is cancellation-safe.
2410 0 : async fn calculate_logical_size(
2411 0 : &self,
2412 0 : up_to_lsn: Lsn,
2413 0 : cause: LogicalSizeCalculationCause,
2414 0 : _guard: &GateGuard,
2415 0 : ctx: &RequestContext,
2416 0 : ) -> Result<u64, CalculateLogicalSizeError> {
2417 0 : info!(
2418 0 : "Calculating logical size for timeline {} at {}",
2419 0 : self.timeline_id, up_to_lsn
2420 0 : );
2421 :
2422 0 : pausable_failpoint!("timeline-calculate-logical-size-pause");
2423 :
2424 : // See if we've already done the work for initial size calculation.
2425 : // This is a short-cut for timelines that are mostly unused.
2426 0 : if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
2427 0 : return Ok(size);
2428 0 : }
2429 0 : let storage_time_metrics = match cause {
2430 : LogicalSizeCalculationCause::Initial
2431 : | LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
2432 0 : | LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
2433 : LogicalSizeCalculationCause::EvictionTaskImitation => {
2434 0 : &self.metrics.imitate_logical_size_histo
2435 : }
2436 : };
2437 0 : let timer = storage_time_metrics.start_timer();
2438 0 : let logical_size = self
2439 0 : .get_current_logical_size_non_incremental(up_to_lsn, ctx)
2440 0 : .await?;
2441 0 : debug!("calculated logical size: {logical_size}");
2442 0 : timer.stop_and_record();
2443 0 : Ok(logical_size)
2444 0 : }
2445 :
2446 : /// Update current logical size, adding `delta' to the old value.
2447 270570 : fn update_current_logical_size(&self, delta: i64) {
2448 270570 : let logical_size = &self.current_logical_size;
2449 270570 : logical_size.increment_size(delta);
2450 270570 :
2451 270570 : // Also set the value in the prometheus gauge. Note that
2452 270570 : // there is a race condition here: if this is is called by two
2453 270570 : // threads concurrently, the prometheus gauge might be set to
2454 270570 : // one value while current_logical_size is set to the
2455 270570 : // other.
2456 270570 : match logical_size.current_size() {
2457 270570 : CurrentLogicalSize::Exact(ref new_current_size) => self
2458 270570 : .metrics
2459 270570 : .current_logical_size_gauge
2460 270570 : .set(new_current_size.into()),
2461 0 : CurrentLogicalSize::Approximate(_) => {
2462 0 : // don't update the gauge yet, this allows us not to update the gauge back and
2463 0 : // forth between the initial size calculation task.
2464 0 : }
2465 : }
2466 270570 : }
2467 :
2468 2424 : pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: u64) {
2469 2424 : self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed);
2470 2424 : let aux_metric =
2471 2424 : self.directory_metrics[DirectoryKind::AuxFiles.offset()].load(AtomicOrdering::Relaxed);
2472 2424 :
2473 2424 : let sum_of_entries = self
2474 2424 : .directory_metrics
2475 2424 : .iter()
2476 16968 : .map(|v| v.load(AtomicOrdering::Relaxed))
2477 2424 : .sum();
2478 2424 : // Set a high general threshold and a lower threshold for the auxiliary files,
2479 2424 : // as we can have large numbers of relations in the db directory.
2480 2424 : const SUM_THRESHOLD: u64 = 5000;
2481 2424 : const AUX_THRESHOLD: u64 = 1000;
2482 2424 : if sum_of_entries >= SUM_THRESHOLD || aux_metric >= AUX_THRESHOLD {
2483 0 : self.metrics
2484 0 : .directory_entries_count_gauge
2485 0 : .set(sum_of_entries);
2486 2424 : } else if let Some(metric) = Lazy::get(&self.metrics.directory_entries_count_gauge) {
2487 0 : metric.set(sum_of_entries);
2488 2424 : }
2489 2424 : }
2490 :
2491 0 : async fn find_layer(&self, layer_file_name: &str) -> Option<Layer> {
2492 0 : let guard = self.layers.read().await;
2493 0 : for historic_layer in guard.layer_map().iter_historic_layers() {
2494 0 : let historic_layer_name = historic_layer.filename().file_name();
2495 0 : if layer_file_name == historic_layer_name {
2496 0 : return Some(guard.get_from_desc(&historic_layer));
2497 0 : }
2498 : }
2499 :
2500 0 : None
2501 0 : }
2502 :
2503 : /// The timeline heatmap is a hint to secondary locations from the primary location,
2504 : /// indicating which layers are currently on-disk on the primary.
2505 : ///
2506 : /// None is returned if the Timeline is in a state where uploading a heatmap
2507 : /// doesn't make sense, such as shutting down or initializing. The caller
2508 : /// should treat this as a cue to simply skip doing any heatmap uploading
2509 : /// for this timeline.
2510 0 : pub(crate) async fn generate_heatmap(&self) -> Option<HeatMapTimeline> {
2511 : // no point in heatmaps without remote client
2512 0 : let _remote_client = self.remote_client.as_ref()?;
2513 :
2514 0 : if !self.is_active() {
2515 0 : return None;
2516 0 : }
2517 :
2518 0 : let guard = self.layers.read().await;
2519 :
2520 0 : let resident = guard.resident_layers().map(|layer| {
2521 0 : let last_activity_ts = layer.access_stats().latest_activity_or_now();
2522 0 :
2523 0 : HeatMapLayer::new(
2524 0 : layer.layer_desc().filename(),
2525 0 : layer.metadata().into(),
2526 0 : last_activity_ts,
2527 0 : )
2528 0 : });
2529 :
2530 0 : let layers = resident.collect().await;
2531 :
2532 0 : Some(HeatMapTimeline::new(self.timeline_id, layers))
2533 0 : }
2534 : }
2535 :
2536 : type TraversalId = String;
2537 :
2538 : trait TraversalLayerExt {
2539 : fn traversal_id(&self) -> TraversalId;
2540 : }
2541 :
2542 : impl TraversalLayerExt for Layer {
2543 104 : fn traversal_id(&self) -> TraversalId {
2544 104 : self.local_path().to_string()
2545 104 : }
2546 : }
2547 :
2548 : impl TraversalLayerExt for Arc<InMemoryLayer> {
2549 4 : fn traversal_id(&self) -> TraversalId {
2550 4 : format!("timeline {} in-memory {self}", self.get_timeline_id())
2551 4 : }
2552 : }
2553 :
2554 : impl Timeline {
2555 : ///
2556 : /// Get a handle to a Layer for reading.
2557 : ///
2558 : /// The returned Layer might be from an ancestor timeline, if the
2559 : /// segment hasn't been updated on this timeline yet.
2560 : ///
2561 : /// This function takes the current timeline's locked LayerMap as an argument,
2562 : /// so callers can avoid potential race conditions.
2563 : ///
2564 : /// # Cancel-Safety
2565 : ///
2566 : /// This method is cancellation-safe.
2567 502524 : async fn get_reconstruct_data(
2568 502524 : &self,
2569 502524 : key: Key,
2570 502524 : request_lsn: Lsn,
2571 502524 : reconstruct_state: &mut ValueReconstructState,
2572 502524 : ctx: &RequestContext,
2573 502524 : ) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
2574 502524 : // Start from the current timeline.
2575 502524 : let mut timeline_owned;
2576 502524 : let mut timeline = self;
2577 502524 :
2578 502524 : let mut read_count = scopeguard::guard(0, |cnt| {
2579 502524 : crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
2580 502524 : });
2581 502524 :
2582 502524 : // For debugging purposes, collect the path of layers that we traversed
2583 502524 : // through. It's included in the error message if we fail to find the key.
2584 502524 : let mut traversal_path = Vec::<TraversalPathItem>::new();
2585 :
2586 502524 : let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
2587 0 : *cached_lsn
2588 : } else {
2589 502524 : Lsn(0)
2590 : };
2591 :
2592 : // 'prev_lsn' tracks the last LSN that we were at in our search. It's used
2593 : // to check that each iteration make some progress, to break infinite
2594 : // looping if something goes wrong.
2595 502524 : let mut prev_lsn = Lsn(u64::MAX);
2596 502524 :
2597 502524 : let mut result = ValueReconstructResult::Continue;
2598 502524 : let mut cont_lsn = Lsn(request_lsn.0 + 1);
2599 :
2600 1356666 : 'outer: loop {
2601 1356666 : if self.cancel.is_cancelled() {
2602 0 : return Err(PageReconstructError::Cancelled);
2603 1356666 : }
2604 1356666 :
2605 1356666 : // The function should have updated 'state'
2606 1356666 : //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
2607 1356666 : match result {
2608 502416 : ValueReconstructResult::Complete => return Ok(traversal_path),
2609 : ValueReconstructResult::Continue => {
2610 : // If we reached an earlier cached page image, we're done.
2611 854244 : if cont_lsn == cached_lsn + 1 {
2612 0 : MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
2613 0 : return Ok(traversal_path);
2614 854244 : }
2615 854244 : if prev_lsn <= cont_lsn {
2616 : // Didn't make any progress in last iteration. Error out to avoid
2617 : // getting stuck in the loop.
2618 100 : return Err(layer_traversal_error(format!(
2619 100 : "could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
2620 100 : key,
2621 100 : Lsn(cont_lsn.0 - 1),
2622 100 : request_lsn,
2623 100 : timeline.ancestor_lsn
2624 100 : ), traversal_path));
2625 854144 : }
2626 854144 : prev_lsn = cont_lsn;
2627 : }
2628 : ValueReconstructResult::Missing => {
2629 : return Err(layer_traversal_error(
2630 6 : if cfg!(test) {
2631 6 : format!(
2632 6 : "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}\n{}",
2633 6 : key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
2634 6 : )
2635 : } else {
2636 0 : format!(
2637 0 : "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}",
2638 0 : key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn
2639 0 : )
2640 : },
2641 6 : traversal_path,
2642 : ));
2643 : }
2644 : }
2645 :
2646 : // Recurse into ancestor if needed
2647 854144 : if is_inherited_key(key) && Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
2648 0 : trace!(
2649 0 : "going into ancestor {}, cont_lsn is {}",
2650 0 : timeline.ancestor_lsn,
2651 0 : cont_lsn
2652 0 : );
2653 :
2654 226769 : timeline_owned = timeline.get_ready_ancestor_timeline(ctx).await?;
2655 226767 : timeline = &*timeline_owned;
2656 226767 : prev_lsn = Lsn(u64::MAX);
2657 226767 : continue 'outer;
2658 627375 : }
2659 :
2660 627375 : let guard = timeline.layers.read().await;
2661 627375 : let layers = guard.layer_map();
2662 :
2663 : // Check the open and frozen in-memory layers first, in order from newest
2664 : // to oldest.
2665 627375 : if let Some(open_layer) = &layers.open_layer {
2666 556312 : let start_lsn = open_layer.get_lsn_range().start;
2667 556312 : if cont_lsn > start_lsn {
2668 : //info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
2669 : // Get all the data needed to reconstruct the page version from this layer.
2670 : // But if we have an older cached page image, no need to go past that.
2671 502209 : let lsn_floor = max(cached_lsn + 1, start_lsn);
2672 502209 : result = match open_layer
2673 502209 : .get_value_reconstruct_data(
2674 502209 : key,
2675 502209 : lsn_floor..cont_lsn,
2676 502209 : reconstruct_state,
2677 502209 : ctx,
2678 502209 : )
2679 8526 : .await
2680 : {
2681 502209 : Ok(result) => result,
2682 0 : Err(e) => return Err(PageReconstructError::from(e)),
2683 : };
2684 502209 : cont_lsn = lsn_floor;
2685 502209 : // metrics: open_layer does not count as fs access, so we are not updating `read_count`
2686 502209 : traversal_path.push((
2687 502209 : result,
2688 502209 : cont_lsn,
2689 502209 : Box::new({
2690 502209 : let open_layer = Arc::clone(open_layer);
2691 502209 : move || open_layer.traversal_id()
2692 502209 : }),
2693 502209 : ));
2694 502209 : continue 'outer;
2695 54103 : }
2696 71063 : }
2697 125166 : for frozen_layer in layers.frozen_layers.iter().rev() {
2698 1036 : let start_lsn = frozen_layer.get_lsn_range().start;
2699 1036 : if cont_lsn > start_lsn {
2700 : //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
2701 1036 : let lsn_floor = max(cached_lsn + 1, start_lsn);
2702 1036 : result = match frozen_layer
2703 1036 : .get_value_reconstruct_data(
2704 1036 : key,
2705 1036 : lsn_floor..cont_lsn,
2706 1036 : reconstruct_state,
2707 1036 : ctx,
2708 1036 : )
2709 0 : .await
2710 : {
2711 1036 : Ok(result) => result,
2712 0 : Err(e) => return Err(PageReconstructError::from(e)),
2713 : };
2714 1036 : cont_lsn = lsn_floor;
2715 1036 : // metrics: open_layer does not count as fs access, so we are not updating `read_count`
2716 1036 : traversal_path.push((
2717 1036 : result,
2718 1036 : cont_lsn,
2719 1036 : Box::new({
2720 1036 : let frozen_layer = Arc::clone(frozen_layer);
2721 1036 : move || frozen_layer.traversal_id()
2722 1036 : }),
2723 1036 : ));
2724 1036 : continue 'outer;
2725 0 : }
2726 : }
2727 :
2728 124130 : if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
2729 124028 : let layer = guard.get_from_desc(&layer);
2730 124028 : // Get all the data needed to reconstruct the page version from this layer.
2731 124028 : // But if we have an older cached page image, no need to go past that.
2732 124028 : let lsn_floor = max(cached_lsn + 1, lsn_floor);
2733 124028 : result = match layer
2734 124028 : .get_value_reconstruct_data(key, lsn_floor..cont_lsn, reconstruct_state, ctx)
2735 23310 : .await
2736 : {
2737 124028 : Ok(result) => result,
2738 0 : Err(e) => return Err(PageReconstructError::from(e)),
2739 : };
2740 124028 : cont_lsn = lsn_floor;
2741 124028 : *read_count += 1;
2742 124028 : traversal_path.push((
2743 124028 : result,
2744 124028 : cont_lsn,
2745 124028 : Box::new({
2746 124028 : let layer = layer.to_owned();
2747 124028 : move || layer.traversal_id()
2748 124028 : }),
2749 124028 : ));
2750 124028 : continue 'outer;
2751 102 : } else if timeline.ancestor_timeline.is_some() {
2752 : // Nothing on this timeline. Traverse to parent
2753 100 : result = ValueReconstructResult::Continue;
2754 100 : cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
2755 100 : continue 'outer;
2756 : } else {
2757 : // Nothing found
2758 2 : result = ValueReconstructResult::Missing;
2759 2 : continue 'outer;
2760 : }
2761 : }
2762 502524 : }
2763 :
2764 : /// Get the data needed to reconstruct all keys in the provided keyspace
2765 : ///
2766 : /// The algorithm is as follows:
2767 : /// 1. While some keys are still not done and there's a timeline to visit:
2768 : /// 2. Visit the timeline (see [`Timeline::get_vectored_reconstruct_data_timeline`]:
2769 : /// 2.1: Build the fringe for the current keyspace
2770 : /// 2.2 Visit the newest layer from the fringe to collect all values for the range it
2771 : /// intersects
2772 : /// 2.3. Pop the timeline from the fringe
2773 : /// 2.4. If the fringe is empty, go back to 1
2774 10 : async fn get_vectored_reconstruct_data(
2775 10 : &self,
2776 10 : mut keyspace: KeySpace,
2777 10 : request_lsn: Lsn,
2778 10 : reconstruct_state: &mut ValuesReconstructState,
2779 10 : ctx: &RequestContext,
2780 10 : ) -> Result<(), GetVectoredError> {
2781 10 : let mut timeline_owned: Arc<Timeline>;
2782 10 : let mut timeline = self;
2783 10 :
2784 10 : let mut cont_lsn = Lsn(request_lsn.0 + 1);
2785 :
2786 : loop {
2787 10 : if self.cancel.is_cancelled() {
2788 0 : return Err(GetVectoredError::Cancelled);
2789 10 : }
2790 :
2791 10 : let completed = Self::get_vectored_reconstruct_data_timeline(
2792 10 : timeline,
2793 10 : keyspace.clone(),
2794 10 : cont_lsn,
2795 10 : reconstruct_state,
2796 10 : &self.cancel,
2797 10 : ctx,
2798 10 : )
2799 25 : .await?;
2800 :
2801 10 : keyspace.remove_overlapping_with(&completed);
2802 10 : if keyspace.total_size() == 0 || timeline.ancestor_timeline.is_none() {
2803 10 : break;
2804 0 : }
2805 0 :
2806 0 : cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
2807 0 : timeline_owned = timeline
2808 0 : .get_ready_ancestor_timeline(ctx)
2809 0 : .await
2810 0 : .map_err(GetVectoredError::GetReadyAncestorError)?;
2811 0 : timeline = &*timeline_owned;
2812 : }
2813 :
2814 10 : if keyspace.total_size() != 0 {
2815 0 : return Err(GetVectoredError::MissingKey(keyspace.start().unwrap()));
2816 10 : }
2817 10 :
2818 10 : Ok(())
2819 10 : }
2820 :
2821 : /// Collect the reconstruct data for a ketspace from the specified timeline.
2822 : ///
2823 : /// Maintain a fringe [`LayerFringe`] which tracks all the layers that intersect
2824 : /// the current keyspace. The current keyspace of the search at any given timeline
2825 : /// is the original keyspace minus all the keys that have been completed minus
2826 : /// any keys for which we couldn't find an intersecting layer. It's not tracked explicitly,
2827 : /// but if you merge all the keyspaces in the fringe, you get the "current keyspace".
2828 : ///
2829 : /// This is basically a depth-first search visitor implementation where a vertex
2830 : /// is the (layer, lsn range, key space) tuple. The fringe acts as the stack.
2831 : ///
2832 : /// At each iteration pop the top of the fringe (the layer with the highest Lsn)
2833 : /// and get all the required reconstruct data from the layer in one go.
2834 10 : async fn get_vectored_reconstruct_data_timeline(
2835 10 : timeline: &Timeline,
2836 10 : keyspace: KeySpace,
2837 10 : mut cont_lsn: Lsn,
2838 10 : reconstruct_state: &mut ValuesReconstructState,
2839 10 : cancel: &CancellationToken,
2840 10 : ctx: &RequestContext,
2841 10 : ) -> Result<KeySpace, GetVectoredError> {
2842 10 : let mut unmapped_keyspace = keyspace.clone();
2843 10 : let mut fringe = LayerFringe::new();
2844 10 :
2845 10 : let mut completed_keyspace = KeySpace::default();
2846 :
2847 : // Hold the layer map whilst visiting the timeline to prevent
2848 : // compaction, eviction and flushes from rendering the layers unreadable.
2849 : //
2850 : // TODO: Do we actually need to do this? In theory holding on
2851 : // to [`tenant::storage_layer::Layer`] should be enough. However,
2852 : // [`Timeline::get`] also holds the lock during IO, so more investigation
2853 : // is needed.
2854 10 : let guard = timeline.layers.read().await;
2855 10 : let layers = guard.layer_map();
2856 :
2857 20 : 'outer: loop {
2858 20 : if cancel.is_cancelled() {
2859 0 : return Err(GetVectoredError::Cancelled);
2860 20 : }
2861 20 :
2862 20 : let keys_done_last_step = reconstruct_state.consume_done_keys();
2863 20 : unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
2864 20 : completed_keyspace.merge(&keys_done_last_step);
2865 20 :
2866 20 : let in_memory_layer = layers.find_in_memory_layer(|l| {
2867 0 : let start_lsn = l.get_lsn_range().start;
2868 0 : cont_lsn > start_lsn
2869 20 : });
2870 20 :
2871 20 : match in_memory_layer {
2872 0 : Some(l) => {
2873 0 : fringe.update(
2874 0 : ReadableLayerDesc::InMemory {
2875 0 : handle: l,
2876 0 : lsn_ceil: cont_lsn,
2877 0 : },
2878 0 : unmapped_keyspace.clone(),
2879 0 : );
2880 0 : }
2881 : None => {
2882 20 : for range in unmapped_keyspace.ranges.iter() {
2883 10 : let results = match layers.range_search(range.clone(), cont_lsn) {
2884 10 : Some(res) => res,
2885 : None => {
2886 0 : break 'outer;
2887 : }
2888 : };
2889 :
2890 10 : results
2891 10 : .found
2892 10 : .into_iter()
2893 10 : .map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
2894 10 : (
2895 10 : ReadableLayerDesc::Persistent {
2896 10 : desc: (*layer).clone(),
2897 10 : lsn_range: lsn_floor..cont_lsn,
2898 10 : },
2899 10 : keyspace_accum.to_keyspace(),
2900 10 : )
2901 10 : })
2902 10 : .for_each(|(layer, keyspace)| fringe.update(layer, keyspace));
2903 : }
2904 : }
2905 : }
2906 :
2907 20 : if let Some((layer_to_read, keyspace_to_read)) = fringe.next_layer() {
2908 10 : layer_to_read
2909 10 : .get_values_reconstruct_data(
2910 10 : &guard,
2911 10 : keyspace_to_read.clone(),
2912 10 : reconstruct_state,
2913 10 : ctx,
2914 10 : )
2915 25 : .await?;
2916 :
2917 10 : unmapped_keyspace = keyspace_to_read;
2918 10 : cont_lsn = layer_to_read.get_lsn_floor();
2919 : } else {
2920 10 : break;
2921 : }
2922 : }
2923 :
2924 10 : Ok(completed_keyspace)
2925 10 : }
2926 :
2927 : /// # Cancel-safety
2928 : ///
2929 : /// This method is cancellation-safe.
2930 502524 : async fn lookup_cached_page(
2931 502524 : &self,
2932 502524 : key: &Key,
2933 502524 : lsn: Lsn,
2934 502524 : ctx: &RequestContext,
2935 502524 : ) -> Option<(Lsn, Bytes)> {
2936 502524 : let cache = page_cache::get();
2937 :
2938 : // FIXME: It's pointless to check the cache for things that are not 8kB pages.
2939 : // We should look at the key to determine if it's a cacheable object
2940 502524 : let (lsn, read_guard) = cache
2941 502524 : .lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
2942 502524 : .await?;
2943 0 : let img = Bytes::from(read_guard.to_vec());
2944 0 : Some((lsn, img))
2945 502524 : }
2946 :
2947 226769 : async fn get_ready_ancestor_timeline(
2948 226769 : &self,
2949 226769 : ctx: &RequestContext,
2950 226769 : ) -> Result<Arc<Timeline>, GetReadyAncestorError> {
2951 226769 : let ancestor = match self.get_ancestor_timeline() {
2952 226769 : Ok(timeline) => timeline,
2953 0 : Err(e) => return Err(GetReadyAncestorError::from(e)),
2954 : };
2955 :
2956 : // It's possible that the ancestor timeline isn't active yet, or
2957 : // is active but hasn't yet caught up to the branch point. Wait
2958 : // for it.
2959 : //
2960 : // This cannot happen while the pageserver is running normally,
2961 : // because you cannot create a branch from a point that isn't
2962 : // present in the pageserver yet. However, we don't wait for the
2963 : // branch point to be uploaded to cloud storage before creating
2964 : // a branch. I.e., the branch LSN need not be remote consistent
2965 : // for the branching operation to succeed.
2966 : //
2967 : // Hence, if we try to load a tenant in such a state where
2968 : // 1. the existence of the branch was persisted (in IndexPart and/or locally)
2969 : // 2. but the ancestor state is behind branch_lsn because it was not yet persisted
2970 : // then we will need to wait for the ancestor timeline to
2971 : // re-stream WAL up to branch_lsn before we access it.
2972 : //
2973 : // How can a tenant get in such a state?
2974 : // - ungraceful pageserver process exit
2975 : // - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
2976 : //
2977 : // NB: this could be avoided by requiring
2978 : // branch_lsn >= remote_consistent_lsn
2979 : // during branch creation.
2980 226769 : match ancestor.wait_to_become_active(ctx).await {
2981 226767 : Ok(()) => {}
2982 : Err(TimelineState::Stopping) => {
2983 0 : return Err(GetReadyAncestorError::AncestorStopping(
2984 0 : ancestor.timeline_id,
2985 0 : ));
2986 : }
2987 2 : Err(state) => {
2988 2 : return Err(GetReadyAncestorError::Other(anyhow::anyhow!(
2989 2 : "Timeline {} will not become active. Current state: {:?}",
2990 2 : ancestor.timeline_id,
2991 2 : &state,
2992 2 : )));
2993 : }
2994 : }
2995 226767 : ancestor
2996 226767 : .wait_lsn(self.ancestor_lsn, ctx)
2997 0 : .await
2998 226767 : .map_err(|e| match e {
2999 0 : e @ WaitLsnError::Timeout(_) => GetReadyAncestorError::AncestorLsnTimeout(e),
3000 0 : WaitLsnError::Shutdown => GetReadyAncestorError::Cancelled,
3001 0 : e @ WaitLsnError::BadState => GetReadyAncestorError::Other(anyhow::anyhow!(e)),
3002 226767 : })?;
3003 :
3004 226767 : Ok(ancestor)
3005 226769 : }
3006 :
3007 226769 : fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
3008 226769 : let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
3009 0 : format!(
3010 0 : "Ancestor is missing. Timeline id: {} Ancestor id {:?}",
3011 0 : self.timeline_id,
3012 0 : self.get_ancestor_timeline_id(),
3013 0 : )
3014 226769 : })?;
3015 226769 : Ok(Arc::clone(ancestor))
3016 226769 : }
3017 :
3018 12 : pub(crate) fn get_shard_identity(&self) -> &ShardIdentity {
3019 12 : &self.shard_identity
3020 12 : }
3021 :
3022 : ///
3023 : /// Get a handle to the latest layer for appending.
3024 : ///
3025 2628044 : async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
3026 2628044 : let mut guard = self.layers.write().await;
3027 2628044 : let layer = guard
3028 2628044 : .get_layer_for_write(
3029 2628044 : lsn,
3030 2628044 : self.get_last_record_lsn(),
3031 2628044 : self.conf,
3032 2628044 : self.timeline_id,
3033 2628044 : self.tenant_shard_id,
3034 2628044 : )
3035 362 : .await?;
3036 2628044 : Ok(layer)
3037 2628044 : }
3038 :
3039 2214102 : async fn put_value(
3040 2214102 : &self,
3041 2214102 : key: Key,
3042 2214102 : lsn: Lsn,
3043 2214102 : val: &Value,
3044 2214102 : ctx: &RequestContext,
3045 2214102 : ) -> anyhow::Result<()> {
3046 : //info!("PUT: key {} at {}", key, lsn);
3047 2214102 : let layer = self.get_layer_for_write(lsn).await?;
3048 2214102 : layer.put_value(key, lsn, val, ctx).await?;
3049 2214102 : Ok(())
3050 2214102 : }
3051 :
3052 413940 : async fn put_values(
3053 413940 : &self,
3054 413940 : values: &HashMap<Key, Vec<(Lsn, Value)>>,
3055 413940 : ctx: &RequestContext,
3056 413940 : ) -> anyhow::Result<()> {
3057 : // Pick the first LSN in the batch to get the layer to write to.
3058 413940 : for lsns in values.values() {
3059 413940 : if let Some((lsn, _)) = lsns.first() {
3060 413940 : let layer = self.get_layer_for_write(*lsn).await?;
3061 413940 : layer.put_values(values, ctx).await?;
3062 413940 : break;
3063 0 : }
3064 : }
3065 413940 : Ok(())
3066 413940 : }
3067 :
3068 2 : async fn put_tombstones(&self, tombstones: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
3069 2 : if let Some((_, lsn)) = tombstones.first() {
3070 2 : let layer = self.get_layer_for_write(*lsn).await?;
3071 2 : layer.put_tombstones(tombstones).await?;
3072 0 : }
3073 2 : Ok(())
3074 2 : }
3075 :
3076 3102912 : pub(crate) fn finish_write(&self, new_lsn: Lsn) {
3077 3102912 : assert!(new_lsn.is_aligned());
3078 :
3079 3102912 : self.metrics.last_record_gauge.set(new_lsn.0 as i64);
3080 3102912 : self.last_record_lsn.advance(new_lsn);
3081 3102912 : }
3082 :
3083 530 : async fn freeze_inmem_layer(&self, write_lock_held: bool) {
3084 : // Freeze the current open in-memory layer. It will be written to disk on next
3085 : // iteration.
3086 530 : let _write_guard = if write_lock_held {
3087 0 : None
3088 : } else {
3089 530 : Some(self.write_lock.lock().await)
3090 : };
3091 530 : let mut guard = self.layers.write().await;
3092 530 : guard
3093 530 : .try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at)
3094 3 : .await;
3095 530 : }
3096 :
3097 : /// Layer flusher task's main loop.
3098 292 : async fn flush_loop(
3099 292 : self: &Arc<Self>,
3100 292 : mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
3101 292 : ctx: &RequestContext,
3102 292 : ) {
3103 292 : info!("started flush loop");
3104 : loop {
3105 822 : tokio::select! {
3106 1292 : _ = self.cancel.cancelled() => {
3107 1292 : info!("shutting down layer flush task");
3108 1292 : break;
3109 1292 : },
3110 1292 : _ = task_mgr::shutdown_watcher() => {
3111 1292 : info!("shutting down layer flush task");
3112 1292 : break;
3113 1292 : },
3114 1292 : _ = layer_flush_start_rx.changed() => {}
3115 1292 : }
3116 :
3117 0 : trace!("waking up");
3118 530 : let timer = self.metrics.flush_time_histo.start_timer();
3119 530 : let flush_counter = *layer_flush_start_rx.borrow();
3120 1054 : let result = loop {
3121 1054 : if self.cancel.is_cancelled() {
3122 0 : info!("dropping out of flush loop for timeline shutdown");
3123 : // Note: we do not bother transmitting into [`layer_flush_done_tx`], because
3124 : // anyone waiting on that will respect self.cancel as well: they will stop
3125 : // waiting at the same time we as drop out of this loop.
3126 0 : return;
3127 1054 : }
3128 :
3129 1054 : let layer_to_flush = {
3130 1054 : let guard = self.layers.read().await;
3131 1054 : guard.layer_map().frozen_layers.front().cloned()
3132 : // drop 'layers' lock to allow concurrent reads and writes
3133 : };
3134 1054 : let Some(layer_to_flush) = layer_to_flush else {
3135 530 : break Ok(());
3136 : };
3137 1716 : match self.flush_frozen_layer(layer_to_flush, ctx).await {
3138 524 : Ok(()) => {}
3139 : Err(FlushLayerError::Cancelled) => {
3140 0 : info!("dropping out of flush loop for timeline shutdown");
3141 0 : return;
3142 : }
3143 0 : err @ Err(
3144 : FlushLayerError::Other(_) | FlushLayerError::CreateImageLayersError(_),
3145 : ) => {
3146 0 : error!("could not flush frozen layer: {err:?}");
3147 0 : break err;
3148 : }
3149 : }
3150 : };
3151 : // Notify any listeners that we're done
3152 530 : let _ = self
3153 530 : .layer_flush_done_tx
3154 530 : .send_replace((flush_counter, result));
3155 530 :
3156 530 : timer.stop_and_record();
3157 : }
3158 8 : }
3159 :
3160 530 : async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
3161 530 : let mut rx = self.layer_flush_done_tx.subscribe();
3162 530 :
3163 530 : // Increment the flush cycle counter and wake up the flush task.
3164 530 : // Remember the new value, so that when we listen for the flush
3165 530 : // to finish, we know when the flush that we initiated has
3166 530 : // finished, instead of some other flush that was started earlier.
3167 530 : let mut my_flush_request = 0;
3168 530 :
3169 530 : let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
3170 530 : if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
3171 0 : anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
3172 530 : }
3173 530 :
3174 530 : self.layer_flush_start_tx.send_modify(|counter| {
3175 530 : my_flush_request = *counter + 1;
3176 530 : *counter = my_flush_request;
3177 530 : });
3178 :
3179 1060 : loop {
3180 1060 : {
3181 1060 : let (last_result_counter, last_result) = &*rx.borrow();
3182 1060 : if *last_result_counter >= my_flush_request {
3183 530 : if let Err(_err) = last_result {
3184 : // We already logged the original error in
3185 : // flush_loop. We cannot propagate it to the caller
3186 : // here, because it might not be Cloneable
3187 0 : anyhow::bail!(
3188 0 : "Could not flush frozen layer. Request id: {}",
3189 0 : my_flush_request
3190 0 : );
3191 : } else {
3192 530 : return Ok(());
3193 : }
3194 530 : }
3195 : }
3196 0 : trace!("waiting for flush to complete");
3197 530 : tokio::select! {
3198 530 : rx_e = rx.changed() => {
3199 : rx_e?;
3200 : },
3201 : // Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring
3202 : // the notification from [`flush_loop`] that it completed.
3203 : _ = self.cancel.cancelled() => {
3204 0 : tracing::info!("Cancelled layer flush due on timeline shutdown");
3205 : return Ok(())
3206 : }
3207 : };
3208 0 : trace!("done")
3209 : }
3210 530 : }
3211 :
3212 0 : fn flush_frozen_layers(&self) {
3213 0 : self.layer_flush_start_tx.send_modify(|val| *val += 1);
3214 0 : }
3215 :
3216 : /// Flush one frozen in-memory layer to disk, as a new delta layer.
3217 1048 : #[instrument(skip_all, fields(layer=%frozen_layer))]
3218 : async fn flush_frozen_layer(
3219 : self: &Arc<Self>,
3220 : frozen_layer: Arc<InMemoryLayer>,
3221 : ctx: &RequestContext,
3222 : ) -> Result<(), FlushLayerError> {
3223 : debug_assert_current_span_has_tenant_and_timeline_id();
3224 : // As a special case, when we have just imported an image into the repository,
3225 : // instead of writing out a L0 delta layer, we directly write out image layer
3226 : // files instead. This is possible as long as *all* the data imported into the
3227 : // repository have the same LSN.
3228 : let lsn_range = frozen_layer.get_lsn_range();
3229 : let (layers_to_upload, delta_layer_to_add) =
3230 : if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
3231 : #[cfg(test)]
3232 : match &mut *self.flush_loop_state.lock().unwrap() {
3233 : FlushLoopState::NotStarted | FlushLoopState::Exited => {
3234 : panic!("flush loop not running")
3235 : }
3236 : FlushLoopState::Running {
3237 : initdb_optimization_count,
3238 : ..
3239 : } => {
3240 : *initdb_optimization_count += 1;
3241 : }
3242 : }
3243 : // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
3244 : // require downloading anything during initial import.
3245 : let (partitioning, _lsn) = self
3246 : .repartition(
3247 : self.initdb_lsn,
3248 : self.get_compaction_target_size(),
3249 : EnumSet::empty(),
3250 : ctx,
3251 : )
3252 : .await?;
3253 :
3254 : if self.cancel.is_cancelled() {
3255 : return Err(FlushLayerError::Cancelled);
3256 : }
3257 :
3258 : // For image layers, we add them immediately into the layer map.
3259 : (
3260 : self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
3261 : .await?,
3262 : None,
3263 : )
3264 : } else {
3265 : #[cfg(test)]
3266 : match &mut *self.flush_loop_state.lock().unwrap() {
3267 : FlushLoopState::NotStarted | FlushLoopState::Exited => {
3268 : panic!("flush loop not running")
3269 : }
3270 : FlushLoopState::Running {
3271 : expect_initdb_optimization,
3272 : ..
3273 : } => {
3274 : assert!(!*expect_initdb_optimization, "expected initdb optimization");
3275 : }
3276 : }
3277 : // Normal case, write out a L0 delta layer file.
3278 : // `create_delta_layer` will not modify the layer map.
3279 : // We will remove frozen layer and add delta layer in one atomic operation later.
3280 : let layer = self.create_delta_layer(&frozen_layer, ctx).await?;
3281 : (
3282 : // FIXME: even though we have a single image and single delta layer assumption
3283 : // we push them to vec
3284 : vec![layer.clone()],
3285 : Some(layer),
3286 : )
3287 : };
3288 :
3289 524 : pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
3290 :
3291 : if self.cancel.is_cancelled() {
3292 : return Err(FlushLayerError::Cancelled);
3293 : }
3294 :
3295 : let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
3296 : let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
3297 :
3298 : // The new on-disk layers are now in the layer map. We can remove the
3299 : // in-memory layer from the map now. The flushed layer is stored in
3300 : // the mapping in `create_delta_layer`.
3301 : {
3302 : let mut guard = self.layers.write().await;
3303 :
3304 : if self.cancel.is_cancelled() {
3305 : return Err(FlushLayerError::Cancelled);
3306 : }
3307 :
3308 : guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
3309 :
3310 : if disk_consistent_lsn != old_disk_consistent_lsn {
3311 : assert!(disk_consistent_lsn > old_disk_consistent_lsn);
3312 : self.disk_consistent_lsn.store(disk_consistent_lsn);
3313 :
3314 : // Schedule remote uploads that will reflect our new disk_consistent_lsn
3315 : self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?;
3316 : }
3317 : // release lock on 'layers'
3318 : };
3319 :
3320 : // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
3321 : // a compaction can delete the file and then it won't be available for uploads any more.
3322 : // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
3323 : // race situation.
3324 : // See https://github.com/neondatabase/neon/issues/4526
3325 524 : pausable_failpoint!("flush-frozen-pausable");
3326 :
3327 : // This failpoint is used by another test case `test_pageserver_recovery`.
3328 0 : fail_point!("flush-frozen-exit");
3329 :
3330 : Ok(())
3331 : }
3332 :
3333 : /// Update metadata file
3334 524 : fn schedule_uploads(
3335 524 : &self,
3336 524 : disk_consistent_lsn: Lsn,
3337 524 : layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
3338 524 : ) -> anyhow::Result<TimelineMetadata> {
3339 524 : // We can only save a valid 'prev_record_lsn' value on disk if we
3340 524 : // flushed *all* in-memory changes to disk. We only track
3341 524 : // 'prev_record_lsn' in memory for the latest processed record, so we
3342 524 : // don't remember what the correct value that corresponds to some old
3343 524 : // LSN is. But if we flush everything, then the value corresponding
3344 524 : // current 'last_record_lsn' is correct and we can store it on disk.
3345 524 : let RecordLsn {
3346 524 : last: last_record_lsn,
3347 524 : prev: prev_record_lsn,
3348 524 : } = self.last_record_lsn.load();
3349 524 : let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
3350 524 : Some(prev_record_lsn)
3351 : } else {
3352 0 : None
3353 : };
3354 :
3355 524 : let ancestor_timeline_id = self
3356 524 : .ancestor_timeline
3357 524 : .as_ref()
3358 524 : .map(|ancestor| ancestor.timeline_id);
3359 524 :
3360 524 : let metadata = TimelineMetadata::new(
3361 524 : disk_consistent_lsn,
3362 524 : ondisk_prev_record_lsn,
3363 524 : ancestor_timeline_id,
3364 524 : self.ancestor_lsn,
3365 524 : *self.latest_gc_cutoff_lsn.read(),
3366 524 : self.initdb_lsn,
3367 524 : self.pg_version,
3368 524 : );
3369 524 :
3370 524 : fail_point!("checkpoint-before-saving-metadata", |x| bail!(
3371 0 : "{}",
3372 0 : x.unwrap()
3373 524 : ));
3374 :
3375 524 : if let Some(remote_client) = &self.remote_client {
3376 1048 : for layer in layers_to_upload {
3377 524 : remote_client.schedule_layer_file_upload(layer)?;
3378 : }
3379 524 : remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
3380 0 : }
3381 :
3382 524 : Ok(metadata)
3383 524 : }
3384 :
3385 0 : pub(crate) async fn preserve_initdb_archive(&self) -> anyhow::Result<()> {
3386 0 : if let Some(remote_client) = &self.remote_client {
3387 0 : remote_client
3388 0 : .preserve_initdb_archive(
3389 0 : &self.tenant_shard_id.tenant_id,
3390 0 : &self.timeline_id,
3391 0 : &self.cancel,
3392 0 : )
3393 0 : .await?;
3394 : } else {
3395 0 : bail!("No remote storage configured, but was asked to backup the initdb archive for {} / {}", self.tenant_shard_id.tenant_id, self.timeline_id);
3396 : }
3397 0 : Ok(())
3398 0 : }
3399 :
3400 : // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
3401 : // in layer map immediately. The caller is responsible to put it into the layer map.
3402 450 : async fn create_delta_layer(
3403 450 : self: &Arc<Self>,
3404 450 : frozen_layer: &Arc<InMemoryLayer>,
3405 450 : ctx: &RequestContext,
3406 450 : ) -> anyhow::Result<ResidentLayer> {
3407 450 : let span = tracing::info_span!("blocking");
3408 450 : let new_delta: ResidentLayer = tokio::task::spawn_blocking({
3409 450 : let self_clone = Arc::clone(self);
3410 450 : let frozen_layer = Arc::clone(frozen_layer);
3411 450 : let ctx = ctx.attached_child();
3412 450 : move || {
3413 450 : // Write it out
3414 450 : // Keep this inside `spawn_blocking` and `Handle::current`
3415 450 : // as long as the write path is still sync and the read impl
3416 450 : // is still not fully async. Otherwise executor threads would
3417 450 : // be blocked.
3418 450 : let _g = span.entered();
3419 450 : let new_delta =
3420 450 : Handle::current().block_on(frozen_layer.write_to_disk(&self_clone, &ctx))?;
3421 450 : let new_delta_path = new_delta.local_path().to_owned();
3422 450 :
3423 450 : // Sync it to disk.
3424 450 : //
3425 450 : // We must also fsync the timeline dir to ensure the directory entries for
3426 450 : // new layer files are durable.
3427 450 : //
3428 450 : // NB: timeline dir must be synced _after_ the file contents are durable.
3429 450 : // So, two separate fsyncs are required, they mustn't be batched.
3430 450 : //
3431 450 : // TODO: If we're running inside 'flush_frozen_layers' and there are multiple
3432 450 : // files to flush, the fsync overhead can be reduces as follows:
3433 450 : // 1. write them all to temporary file names
3434 450 : // 2. fsync them
3435 450 : // 3. rename to the final name
3436 450 : // 4. fsync the parent directory.
3437 450 : // Note that (1),(2),(3) today happen inside write_to_disk().
3438 450 : //
3439 450 : // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
3440 450 : par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
3441 450 : par_fsync::par_fsync(&[self_clone
3442 450 : .conf
3443 450 : .timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id)])
3444 450 : .context("fsync of timeline dir")?;
3445 :
3446 450 : anyhow::Ok(new_delta)
3447 450 : }
3448 450 : })
3449 450 : .await
3450 450 : .context("spawn_blocking")
3451 450 : .and_then(|x| x)?;
3452 :
3453 450 : Ok(new_delta)
3454 450 : }
3455 :
3456 482 : async fn repartition(
3457 482 : &self,
3458 482 : lsn: Lsn,
3459 482 : partition_size: u64,
3460 482 : flags: EnumSet<CompactFlags>,
3461 482 : ctx: &RequestContext,
3462 482 : ) -> anyhow::Result<(KeyPartitioning, Lsn)> {
3463 482 : let Ok(mut partitioning_guard) = self.partitioning.try_lock() else {
3464 : // NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline.
3465 : // The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
3466 : // and hence before the compaction task starts.
3467 0 : anyhow::bail!("repartition() called concurrently, this should not happen");
3468 : };
3469 482 : if lsn < partitioning_guard.1 {
3470 0 : anyhow::bail!("repartition() called with LSN going backwards, this should not happen");
3471 482 : }
3472 482 :
3473 482 : let distance = lsn.0 - partitioning_guard.1 .0;
3474 482 : if partitioning_guard.1 != Lsn(0)
3475 308 : && distance <= self.repartition_threshold
3476 308 : && !flags.contains(CompactFlags::ForceRepartition)
3477 : {
3478 0 : debug!(
3479 0 : distance,
3480 0 : threshold = self.repartition_threshold,
3481 0 : "no repartitioning needed"
3482 0 : );
3483 308 : return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
3484 174 : }
3485 :
3486 13313 : let keyspace = self.collect_keyspace(lsn, ctx).await?;
3487 174 : let partitioning = keyspace.partition(partition_size);
3488 174 :
3489 174 : *partitioning_guard = (partitioning, lsn);
3490 174 :
3491 174 : Ok((partitioning_guard.0.clone(), partitioning_guard.1))
3492 482 : }
3493 :
3494 : // Is it time to create a new image layer for the given partition?
3495 408 : async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
3496 408 : let threshold = self.get_image_creation_threshold();
3497 :
3498 408 : let guard = self.layers.read().await;
3499 408 : let layers = guard.layer_map();
3500 408 :
3501 408 : let mut max_deltas = 0;
3502 2856 : for part_range in &partition.ranges {
3503 2448 : let image_coverage = layers.image_coverage(part_range, lsn);
3504 5204 : for (img_range, last_img) in image_coverage {
3505 2756 : let img_lsn = if let Some(last_img) = last_img {
3506 2156 : last_img.get_lsn_range().end
3507 : } else {
3508 600 : Lsn(0)
3509 : };
3510 : // Let's consider an example:
3511 : //
3512 : // delta layer with LSN range 71-81
3513 : // delta layer with LSN range 81-91
3514 : // delta layer with LSN range 91-101
3515 : // image layer at LSN 100
3516 : //
3517 : // If 'lsn' is still 100, i.e. no new WAL has been processed since the last image layer,
3518 : // there's no need to create a new one. We check this case explicitly, to avoid passing
3519 : // a bogus range to count_deltas below, with start > end. It's even possible that there
3520 : // are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
3521 : // after we read last_record_lsn, which is passed here in the 'lsn' argument.
3522 2756 : if img_lsn < lsn {
3523 600 : let num_deltas =
3524 600 : layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold));
3525 600 :
3526 600 : max_deltas = max_deltas.max(num_deltas);
3527 600 : if num_deltas >= threshold {
3528 0 : debug!(
3529 0 : "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
3530 0 : img_range.start, img_range.end, num_deltas, img_lsn, lsn
3531 0 : );
3532 0 : return true;
3533 600 : }
3534 2156 : }
3535 : }
3536 : }
3537 :
3538 0 : debug!(
3539 0 : max_deltas,
3540 0 : "none of the partitioned ranges had >= {threshold} deltas"
3541 0 : );
3542 408 : false
3543 408 : }
3544 :
3545 964 : #[tracing::instrument(skip_all, fields(%lsn, %force))]
3546 : async fn create_image_layers(
3547 : self: &Arc<Timeline>,
3548 : partitioning: &KeyPartitioning,
3549 : lsn: Lsn,
3550 : force: bool,
3551 : ctx: &RequestContext,
3552 : ) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
3553 : let timer = self.metrics.create_images_time_histo.start_timer();
3554 : let mut image_layers = Vec::new();
3555 :
3556 : // We need to avoid holes between generated image layers.
3557 : // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
3558 : // image layer with hole between them. In this case such layer can not be utilized by GC.
3559 : //
3560 : // How such hole between partitions can appear?
3561 : // if we have relation with relid=1 and size 100 and relation with relid=2 with size 200 then result of
3562 : // KeySpace::partition may contain partitions <100000000..100000099> and <200000000..200000199>.
3563 : // If there is delta layer <100000000..300000000> then it never be garbage collected because
3564 : // image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
3565 : let mut start = Key::MIN;
3566 :
3567 : for partition in partitioning.parts.iter() {
3568 : let img_range = start..partition.ranges.last().unwrap().end;
3569 : if !force && !self.time_for_new_image_layer(partition, lsn).await {
3570 : start = img_range.end;
3571 : continue;
3572 : }
3573 :
3574 : let mut image_layer_writer = ImageLayerWriter::new(
3575 : self.conf,
3576 : self.timeline_id,
3577 : self.tenant_shard_id,
3578 : &img_range,
3579 : lsn,
3580 : )
3581 : .await?;
3582 :
3583 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
3584 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
3585 0 : "failpoint image-layer-writer-fail-before-finish"
3586 0 : )))
3587 0 : });
3588 :
3589 : let mut wrote_keys = false;
3590 :
3591 : let mut key_request_accum = KeySpaceAccum::new();
3592 : for range in &partition.ranges {
3593 : let mut key = range.start;
3594 : while key < range.end {
3595 : // Decide whether to retain this key: usually we do, but sharded tenants may
3596 : // need to drop keys that don't belong to them. If we retain the key, add it
3597 : // to `key_request_accum` for later issuing a vectored get
3598 : if self.shard_identity.is_key_disposable(&key) {
3599 0 : debug!(
3600 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
3601 0 : key,
3602 0 : self.shard_identity.get_shard_number(&key)
3603 0 : );
3604 : } else {
3605 : key_request_accum.add_key(key);
3606 : }
3607 :
3608 : let last_key_in_range = key.next() == range.end;
3609 : key = key.next();
3610 :
3611 : // Maybe flush `key_rest_accum`
3612 : if key_request_accum.size() >= Timeline::MAX_GET_VECTORED_KEYS
3613 : || last_key_in_range
3614 : {
3615 : let results = self
3616 : .get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
3617 : .await?;
3618 :
3619 : for (img_key, img) in results {
3620 : let img = match img {
3621 : Ok(img) => img,
3622 : Err(err) => {
3623 : // If we fail to reconstruct a VM or FSM page, we can zero the
3624 : // page without losing any actual user data. That seems better
3625 : // than failing repeatedly and getting stuck.
3626 : //
3627 : // We had a bug at one point, where we truncated the FSM and VM
3628 : // in the pageserver, but the Postgres didn't know about that
3629 : // and continued to generate incremental WAL records for pages
3630 : // that didn't exist in the pageserver. Trying to replay those
3631 : // WAL records failed to find the previous image of the page.
3632 : // This special case allows us to recover from that situation.
3633 : // See https://github.com/neondatabase/neon/issues/2601.
3634 : //
3635 : // Unfortunately we cannot do this for the main fork, or for
3636 : // any metadata keys, keys, as that would lead to actual data
3637 : // loss.
3638 : if is_rel_fsm_block_key(img_key) || is_rel_vm_block_key(img_key)
3639 : {
3640 0 : warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
3641 : ZERO_PAGE.clone()
3642 : } else {
3643 : return Err(CreateImageLayersError::PageReconstructError(
3644 : err,
3645 : ));
3646 : }
3647 : }
3648 : };
3649 :
3650 : // Write all the keys we just read into our new image layer.
3651 : image_layer_writer.put_image(img_key, img).await?;
3652 : wrote_keys = true;
3653 : }
3654 : }
3655 : }
3656 : }
3657 :
3658 : if wrote_keys {
3659 : // Normal path: we have written some data into the new image layer for this
3660 : // partition, so flush it to disk.
3661 : start = img_range.end;
3662 : let image_layer = image_layer_writer.finish(self).await?;
3663 : image_layers.push(image_layer);
3664 : } else {
3665 : // Special case: the image layer may be empty if this is a sharded tenant and the
3666 : // partition does not cover any keys owned by this shard. In this case, to ensure
3667 : // we don't leave gaps between image layers, leave `start` where it is, so that the next
3668 : // layer we write will cover the key range that we just scanned.
3669 0 : tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
3670 : }
3671 : }
3672 :
3673 : // Sync the new layer to disk before adding it to the layer map, to make sure
3674 : // we don't garbage collect something based on the new layer, before it has
3675 : // reached the disk.
3676 : //
3677 : // We must also fsync the timeline dir to ensure the directory entries for
3678 : // new layer files are durable
3679 : //
3680 : // Compaction creates multiple image layers. It would be better to create them all
3681 : // and fsync them all in parallel.
3682 : let all_paths = image_layers
3683 : .iter()
3684 74 : .map(|layer| layer.local_path().to_owned())
3685 : .collect::<Vec<_>>();
3686 :
3687 : par_fsync::par_fsync_async(&all_paths)
3688 : .await
3689 : .context("fsync of newly created layer files")?;
3690 :
3691 : if !all_paths.is_empty() {
3692 : par_fsync::par_fsync_async(&[self
3693 : .conf
3694 : .timeline_path(&self.tenant_shard_id, &self.timeline_id)])
3695 : .await
3696 : .context("fsync of timeline dir")?;
3697 : }
3698 :
3699 : let mut guard = self.layers.write().await;
3700 :
3701 : // FIXME: we could add the images to be uploaded *before* returning from here, but right
3702 : // now they are being scheduled outside of write lock
3703 : guard.track_new_image_layers(&image_layers, &self.metrics);
3704 : drop_wlock(guard);
3705 : timer.stop_and_record();
3706 :
3707 : Ok(image_layers)
3708 : }
3709 :
3710 : /// Wait until the background initial logical size calculation is complete, or
3711 : /// this Timeline is shut down. Calling this function will cause the initial
3712 : /// logical size calculation to skip waiting for the background jobs barrier.
3713 0 : pub(crate) async fn await_initial_logical_size(self: Arc<Self>) {
3714 0 : if let Some(await_bg_cancel) = self
3715 0 : .current_logical_size
3716 0 : .cancel_wait_for_background_loop_concurrency_limit_semaphore
3717 0 : .get()
3718 0 : {
3719 0 : await_bg_cancel.cancel();
3720 0 : } else {
3721 : // We should not wait if we were not able to explicitly instruct
3722 : // the logical size cancellation to skip the concurrency limit semaphore.
3723 : // TODO: this is an unexpected case. We should restructure so that it
3724 : // can't happen.
3725 0 : tracing::info!(
3726 0 : "await_initial_logical_size: can't get semaphore cancel token, skipping"
3727 0 : );
3728 : }
3729 :
3730 0 : tokio::select!(
3731 0 : _ = self.current_logical_size.initialized.acquire() => {},
3732 0 : _ = self.cancel.cancelled() => {}
3733 0 : )
3734 0 : }
3735 : }
3736 :
3737 378 : #[derive(Default)]
3738 : struct CompactLevel0Phase1Result {
3739 : new_layers: Vec<ResidentLayer>,
3740 : deltas_to_compact: Vec<Layer>,
3741 : }
3742 :
3743 : /// Top-level failure to compact.
3744 0 : #[derive(Debug, thiserror::Error)]
3745 : pub(crate) enum CompactionError {
3746 : #[error("The timeline or pageserver is shutting down")]
3747 : ShuttingDown,
3748 : /// Compaction cannot be done right now; page reconstruction and so on.
3749 : #[error(transparent)]
3750 : Other(#[from] anyhow::Error),
3751 : }
3752 :
3753 : impl From<CollectKeySpaceError> for CompactionError {
3754 0 : fn from(err: CollectKeySpaceError) -> Self {
3755 0 : match err {
3756 : CollectKeySpaceError::Cancelled
3757 : | CollectKeySpaceError::PageRead(PageReconstructError::Cancelled) => {
3758 0 : CompactionError::ShuttingDown
3759 : }
3760 0 : e => CompactionError::Other(e.into()),
3761 : }
3762 0 : }
3763 : }
3764 :
3765 : #[serde_as]
3766 420 : #[derive(serde::Serialize)]
3767 : struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
3768 :
3769 2856 : #[derive(Default)]
3770 : enum DurationRecorder {
3771 : #[default]
3772 : NotStarted,
3773 : Recorded(RecordedDuration, tokio::time::Instant),
3774 : }
3775 :
3776 : impl DurationRecorder {
3777 558 : fn till_now(&self) -> DurationRecorder {
3778 558 : match self {
3779 : DurationRecorder::NotStarted => {
3780 0 : panic!("must only call on recorded measurements")
3781 : }
3782 558 : DurationRecorder::Recorded(_, ended) => {
3783 558 : let now = tokio::time::Instant::now();
3784 558 : DurationRecorder::Recorded(RecordedDuration(now - *ended), now)
3785 558 : }
3786 558 : }
3787 558 : }
3788 210 : fn into_recorded(self) -> Option<RecordedDuration> {
3789 210 : match self {
3790 0 : DurationRecorder::NotStarted => None,
3791 210 : DurationRecorder::Recorded(recorded, _) => Some(recorded),
3792 : }
3793 210 : }
3794 : }
3795 :
3796 408 : #[derive(Default)]
3797 : struct CompactLevel0Phase1StatsBuilder {
3798 : version: Option<u64>,
3799 : tenant_id: Option<TenantShardId>,
3800 : timeline_id: Option<TimelineId>,
3801 : read_lock_acquisition_micros: DurationRecorder,
3802 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
3803 : read_lock_held_key_sort_micros: DurationRecorder,
3804 : read_lock_held_prerequisites_micros: DurationRecorder,
3805 : read_lock_held_compute_holes_micros: DurationRecorder,
3806 : read_lock_drop_micros: DurationRecorder,
3807 : write_layer_files_micros: DurationRecorder,
3808 : level0_deltas_count: Option<usize>,
3809 : new_deltas_count: Option<usize>,
3810 : new_deltas_size: Option<u64>,
3811 : }
3812 :
3813 30 : #[derive(serde::Serialize)]
3814 : struct CompactLevel0Phase1Stats {
3815 : version: u64,
3816 : tenant_id: TenantShardId,
3817 : timeline_id: TimelineId,
3818 : read_lock_acquisition_micros: RecordedDuration,
3819 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
3820 : read_lock_held_key_sort_micros: RecordedDuration,
3821 : read_lock_held_prerequisites_micros: RecordedDuration,
3822 : read_lock_held_compute_holes_micros: RecordedDuration,
3823 : read_lock_drop_micros: RecordedDuration,
3824 : write_layer_files_micros: RecordedDuration,
3825 : level0_deltas_count: usize,
3826 : new_deltas_count: usize,
3827 : new_deltas_size: u64,
3828 : }
3829 :
3830 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
3831 : type Error = anyhow::Error;
3832 :
3833 30 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
3834 30 : Ok(Self {
3835 30 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
3836 30 : tenant_id: value
3837 30 : .tenant_id
3838 30 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
3839 30 : timeline_id: value
3840 30 : .timeline_id
3841 30 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
3842 30 : read_lock_acquisition_micros: value
3843 30 : .read_lock_acquisition_micros
3844 30 : .into_recorded()
3845 30 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
3846 30 : read_lock_held_spawn_blocking_startup_micros: value
3847 30 : .read_lock_held_spawn_blocking_startup_micros
3848 30 : .into_recorded()
3849 30 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
3850 30 : read_lock_held_key_sort_micros: value
3851 30 : .read_lock_held_key_sort_micros
3852 30 : .into_recorded()
3853 30 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
3854 30 : read_lock_held_prerequisites_micros: value
3855 30 : .read_lock_held_prerequisites_micros
3856 30 : .into_recorded()
3857 30 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
3858 30 : read_lock_held_compute_holes_micros: value
3859 30 : .read_lock_held_compute_holes_micros
3860 30 : .into_recorded()
3861 30 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
3862 30 : read_lock_drop_micros: value
3863 30 : .read_lock_drop_micros
3864 30 : .into_recorded()
3865 30 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
3866 30 : write_layer_files_micros: value
3867 30 : .write_layer_files_micros
3868 30 : .into_recorded()
3869 30 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
3870 30 : level0_deltas_count: value
3871 30 : .level0_deltas_count
3872 30 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
3873 30 : new_deltas_count: value
3874 30 : .new_deltas_count
3875 30 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
3876 30 : new_deltas_size: value
3877 30 : .new_deltas_size
3878 30 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
3879 : })
3880 30 : }
3881 : }
3882 :
3883 : impl Timeline {
3884 : /// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
3885 408 : async fn compact_level0_phase1(
3886 408 : self: &Arc<Self>,
3887 408 : guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
3888 408 : mut stats: CompactLevel0Phase1StatsBuilder,
3889 408 : target_file_size: u64,
3890 408 : ctx: &RequestContext,
3891 408 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
3892 408 : stats.read_lock_held_spawn_blocking_startup_micros =
3893 408 : stats.read_lock_acquisition_micros.till_now(); // set by caller
3894 408 : let layers = guard.layer_map();
3895 408 : let level0_deltas = layers.get_level0_deltas()?;
3896 408 : let mut level0_deltas = level0_deltas
3897 408 : .into_iter()
3898 1770 : .map(|x| guard.get_from_desc(&x))
3899 408 : .collect_vec();
3900 408 : stats.level0_deltas_count = Some(level0_deltas.len());
3901 408 : // Only compact if enough layers have accumulated.
3902 408 : let threshold = self.get_compaction_threshold();
3903 408 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
3904 0 : debug!(
3905 0 : level0_deltas = level0_deltas.len(),
3906 0 : threshold, "too few deltas to compact"
3907 0 : );
3908 378 : return Ok(CompactLevel0Phase1Result::default());
3909 30 : }
3910 30 :
3911 30 : // This failpoint is used together with `test_duplicate_layers` integration test.
3912 30 : // It returns the compaction result exactly the same layers as input to compaction.
3913 30 : // We want to ensure that this will not cause any problem when updating the layer map
3914 30 : // after the compaction is finished.
3915 30 : //
3916 30 : // Currently, there are two rare edge cases that will cause duplicated layers being
3917 30 : // inserted.
3918 30 : // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
3919 30 : // is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
3920 30 : // map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
3921 30 : // point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
3922 30 : // and this causes an overwrite. This is acceptable because the content is the same, and we should do a
3923 30 : // layer replace instead of the normal remove / upload process.
3924 30 : // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
3925 30 : // size length. Compaction will likely create the same set of n files afterwards.
3926 30 : //
3927 30 : // This failpoint is a superset of both of the cases.
3928 30 : if cfg!(feature = "testing") {
3929 30 : let active = (|| {
3930 30 : ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
3931 30 : false
3932 30 : })();
3933 30 :
3934 30 : if active {
3935 0 : let mut new_layers = Vec::with_capacity(level0_deltas.len());
3936 0 : for delta in &level0_deltas {
3937 : // we are just faking these layers as being produced again for this failpoint
3938 0 : new_layers.push(
3939 0 : delta
3940 0 : .download_and_keep_resident()
3941 0 : .await
3942 0 : .context("download layer for failpoint")?,
3943 : );
3944 : }
3945 0 : tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
3946 0 : return Ok(CompactLevel0Phase1Result {
3947 0 : new_layers,
3948 0 : deltas_to_compact: level0_deltas,
3949 0 : });
3950 30 : }
3951 0 : }
3952 :
3953 : // Gather the files to compact in this iteration.
3954 : //
3955 : // Start with the oldest Level 0 delta file, and collect any other
3956 : // level 0 files that form a contiguous sequence, such that the end
3957 : // LSN of previous file matches the start LSN of the next file.
3958 : //
3959 : // Note that if the files don't form such a sequence, we might
3960 : // "compact" just a single file. That's a bit pointless, but it allows
3961 : // us to get rid of the level 0 file, and compact the other files on
3962 : // the next iteration. This could probably made smarter, but such
3963 : // "gaps" in the sequence of level 0 files should only happen in case
3964 : // of a crash, partial download from cloud storage, or something like
3965 : // that, so it's not a big deal in practice.
3966 540 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
3967 30 : let mut level0_deltas_iter = level0_deltas.iter();
3968 30 :
3969 30 : let first_level0_delta = level0_deltas_iter.next().unwrap();
3970 30 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
3971 30 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
3972 30 :
3973 30 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
3974 300 : for l in level0_deltas_iter {
3975 270 : let lsn_range = &l.layer_desc().lsn_range;
3976 270 :
3977 270 : if lsn_range.start != prev_lsn_end {
3978 0 : break;
3979 270 : }
3980 270 : deltas_to_compact.push(l.download_and_keep_resident().await?);
3981 270 : prev_lsn_end = lsn_range.end;
3982 : }
3983 30 : let lsn_range = Range {
3984 30 : start: deltas_to_compact
3985 30 : .first()
3986 30 : .unwrap()
3987 30 : .layer_desc()
3988 30 : .lsn_range
3989 30 : .start,
3990 30 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
3991 30 : };
3992 :
3993 30 : info!(
3994 30 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
3995 30 : lsn_range.start,
3996 30 : lsn_range.end,
3997 30 : deltas_to_compact.len(),
3998 30 : level0_deltas.len()
3999 30 : );
4000 :
4001 300 : for l in deltas_to_compact.iter() {
4002 300 : info!("compact includes {l}");
4003 : }
4004 :
4005 : // We don't need the original list of layers anymore. Drop it so that
4006 : // we don't accidentally use it later in the function.
4007 30 : drop(level0_deltas);
4008 30 :
4009 30 : stats.read_lock_held_prerequisites_micros = stats
4010 30 : .read_lock_held_spawn_blocking_startup_micros
4011 30 : .till_now();
4012 30 :
4013 30 : // Determine N largest holes where N is number of compacted layers.
4014 30 : let max_holes = deltas_to_compact.len();
4015 30 : let last_record_lsn = self.get_last_record_lsn();
4016 30 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
4017 30 : let min_hole_coverage_size = 3; // TODO: something more flexible?
4018 30 :
4019 30 : // min-heap (reserve space for one more element added before eviction)
4020 30 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
4021 30 : let mut prev: Option<Key> = None;
4022 30 :
4023 30 : let mut all_keys = Vec::new();
4024 :
4025 300 : for l in deltas_to_compact.iter() {
4026 2407 : all_keys.extend(l.load_keys(ctx).await?);
4027 : }
4028 :
4029 : // FIXME: should spawn_blocking the rest of this function
4030 :
4031 : // The current stdlib sorting implementation is designed in a way where it is
4032 : // particularly fast where the slice is made up of sorted sub-ranges.
4033 4931312 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
4034 30 :
4035 30 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
4036 :
4037 2102000 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
4038 2102000 : if let Some(prev_key) = prev {
4039 : // just first fast filter
4040 2101970 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
4041 0 : let key_range = prev_key..next_key;
4042 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
4043 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
4044 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
4045 0 : // That is why it is better to measure size of hole as number of covering image layers.
4046 0 : let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
4047 0 : if coverage_size >= min_hole_coverage_size {
4048 0 : heap.push(Hole {
4049 0 : key_range,
4050 0 : coverage_size,
4051 0 : });
4052 0 : if heap.len() > max_holes {
4053 0 : heap.pop(); // remove smallest hole
4054 0 : }
4055 0 : }
4056 2101970 : }
4057 30 : }
4058 2102000 : prev = Some(next_key.next());
4059 : }
4060 30 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
4061 30 : drop_rlock(guard);
4062 30 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
4063 30 : let mut holes = heap.into_vec();
4064 30 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
4065 30 : let mut next_hole = 0; // index of next hole in holes vector
4066 30 :
4067 30 : // This iterator walks through all key-value pairs from all the layers
4068 30 : // we're compacting, in key, LSN order.
4069 30 : let all_values_iter = all_keys.iter();
4070 30 :
4071 30 : // This iterator walks through all keys and is needed to calculate size used by each key
4072 30 : let mut all_keys_iter = all_keys
4073 30 : .iter()
4074 2102000 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
4075 2101970 : .coalesce(|mut prev, cur| {
4076 2101970 : // Coalesce keys that belong to the same key pair.
4077 2101970 : // This ensures that compaction doesn't put them
4078 2101970 : // into different layer files.
4079 2101970 : // Still limit this by the target file size,
4080 2101970 : // so that we keep the size of the files in
4081 2101970 : // check.
4082 2101970 : if prev.0 == cur.0 && prev.2 < target_file_size {
4083 92000 : prev.2 += cur.2;
4084 92000 : Ok(prev)
4085 : } else {
4086 2009970 : Err((prev, cur))
4087 : }
4088 2101970 : });
4089 30 :
4090 30 : // Merge the contents of all the input delta layers into a new set
4091 30 : // of delta layers, based on the current partitioning.
4092 30 : //
4093 30 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
4094 30 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
4095 30 : // would be too large. In that case, we also split on the LSN dimension.
4096 30 : //
4097 30 : // LSN
4098 30 : // ^
4099 30 : // |
4100 30 : // | +-----------+ +--+--+--+--+
4101 30 : // | | | | | | | |
4102 30 : // | +-----------+ | | | | |
4103 30 : // | | | | | | | |
4104 30 : // | +-----------+ ==> | | | | |
4105 30 : // | | | | | | | |
4106 30 : // | +-----------+ | | | | |
4107 30 : // | | | | | | | |
4108 30 : // | +-----------+ +--+--+--+--+
4109 30 : // |
4110 30 : // +--------------> key
4111 30 : //
4112 30 : //
4113 30 : // If one key (X) has a lot of page versions:
4114 30 : //
4115 30 : // LSN
4116 30 : // ^
4117 30 : // | (X)
4118 30 : // | +-----------+ +--+--+--+--+
4119 30 : // | | | | | | | |
4120 30 : // | +-----------+ | | +--+ |
4121 30 : // | | | | | | | |
4122 30 : // | +-----------+ ==> | | | | |
4123 30 : // | | | | | +--+ |
4124 30 : // | +-----------+ | | | | |
4125 30 : // | | | | | | | |
4126 30 : // | +-----------+ +--+--+--+--+
4127 30 : // |
4128 30 : // +--------------> key
4129 30 : // TODO: this actually divides the layers into fixed-size chunks, not
4130 30 : // based on the partitioning.
4131 30 : //
4132 30 : // TODO: we should also opportunistically materialize and
4133 30 : // garbage collect what we can.
4134 30 : let mut new_layers = Vec::new();
4135 30 : let mut prev_key: Option<Key> = None;
4136 30 : let mut writer: Option<DeltaLayerWriter> = None;
4137 30 : let mut key_values_total_size = 0u64;
4138 30 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
4139 30 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
4140 :
4141 : for &DeltaEntry {
4142 2102000 : key, lsn, ref val, ..
4143 2102030 : } in all_values_iter
4144 : {
4145 2102000 : let value = val.load(ctx).await?;
4146 2102000 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
4147 2102000 : // We need to check key boundaries once we reach next key or end of layer with the same key
4148 2102000 : if !same_key || lsn == dup_end_lsn {
4149 2010000 : let mut next_key_size = 0u64;
4150 2010000 : let is_dup_layer = dup_end_lsn.is_valid();
4151 2010000 : dup_start_lsn = Lsn::INVALID;
4152 2010000 : if !same_key {
4153 2010000 : dup_end_lsn = Lsn::INVALID;
4154 2010000 : }
4155 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
4156 2010000 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
4157 2010000 : next_key_size = next_size;
4158 2010000 : if key != next_key {
4159 2009970 : if dup_end_lsn.is_valid() {
4160 0 : // We are writting segment with duplicates:
4161 0 : // place all remaining values of this key in separate segment
4162 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
4163 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
4164 2009970 : }
4165 2009970 : break;
4166 30 : }
4167 30 : key_values_total_size += next_size;
4168 30 : // Check if it is time to split segment: if total keys size is larger than target file size.
4169 30 : // We need to avoid generation of empty segments if next_size > target_file_size.
4170 30 : if key_values_total_size > target_file_size && lsn != next_lsn {
4171 : // Split key between multiple layers: such layer can contain only single key
4172 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
4173 0 : dup_end_lsn // new segment with duplicates starts where old one stops
4174 : } else {
4175 0 : lsn // start with the first LSN for this key
4176 : };
4177 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
4178 0 : break;
4179 30 : }
4180 : }
4181 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
4182 2010000 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
4183 0 : dup_start_lsn = dup_end_lsn;
4184 0 : dup_end_lsn = lsn_range.end;
4185 2010000 : }
4186 2010000 : if writer.is_some() {
4187 2009970 : let written_size = writer.as_mut().unwrap().size();
4188 2009970 : let contains_hole =
4189 2009970 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
4190 : // check if key cause layer overflow or contains hole...
4191 2009970 : if is_dup_layer
4192 2009970 : || dup_end_lsn.is_valid()
4193 2009970 : || written_size + key_values_total_size > target_file_size
4194 2009970 : || contains_hole
4195 : {
4196 : // ... if so, flush previous layer and prepare to write new one
4197 0 : new_layers.push(
4198 0 : writer
4199 0 : .take()
4200 0 : .unwrap()
4201 0 : .finish(prev_key.unwrap().next(), self)
4202 0 : .await?,
4203 : );
4204 0 : writer = None;
4205 0 :
4206 0 : if contains_hole {
4207 0 : // skip hole
4208 0 : next_hole += 1;
4209 0 : }
4210 2009970 : }
4211 30 : }
4212 : // Remember size of key value because at next iteration we will access next item
4213 2010000 : key_values_total_size = next_key_size;
4214 92000 : }
4215 2102000 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
4216 0 : Err(CompactionError::Other(anyhow::anyhow!(
4217 0 : "failpoint delta-layer-writer-fail-before-finish"
4218 0 : )))
4219 2102000 : });
4220 :
4221 2102000 : if !self.shard_identity.is_key_disposable(&key) {
4222 2102000 : if writer.is_none() {
4223 30 : // Create writer if not initiaized yet
4224 30 : writer = Some(
4225 : DeltaLayerWriter::new(
4226 30 : self.conf,
4227 30 : self.timeline_id,
4228 30 : self.tenant_shard_id,
4229 30 : key,
4230 30 : if dup_end_lsn.is_valid() {
4231 : // this is a layer containing slice of values of the same key
4232 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
4233 0 : dup_start_lsn..dup_end_lsn
4234 : } else {
4235 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
4236 30 : lsn_range.clone()
4237 : },
4238 : )
4239 15 : .await?,
4240 : );
4241 2101970 : }
4242 :
4243 2102000 : writer.as_mut().unwrap().put_value(key, lsn, value).await?;
4244 : } else {
4245 0 : debug!(
4246 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
4247 0 : key,
4248 0 : self.shard_identity.get_shard_number(&key)
4249 0 : );
4250 : }
4251 :
4252 2102000 : if !new_layers.is_empty() {
4253 0 : fail_point!("after-timeline-compacted-first-L1");
4254 2102000 : }
4255 :
4256 2102000 : prev_key = Some(key);
4257 : }
4258 30 : if let Some(writer) = writer {
4259 61 : new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
4260 0 : }
4261 :
4262 : // Sync layers
4263 30 : if !new_layers.is_empty() {
4264 : // Print a warning if the created layer is larger than double the target size
4265 : // Add two pages for potential overhead. This should in theory be already
4266 : // accounted for in the target calculation, but for very small targets,
4267 : // we still might easily hit the limit otherwise.
4268 30 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
4269 30 : for layer in new_layers.iter() {
4270 30 : if layer.layer_desc().file_size > warn_limit {
4271 0 : warn!(
4272 0 : %layer,
4273 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
4274 0 : );
4275 30 : }
4276 : }
4277 :
4278 : // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
4279 30 : let layer_paths: Vec<Utf8PathBuf> = new_layers
4280 30 : .iter()
4281 30 : .map(|l| l.local_path().to_owned())
4282 30 : .collect();
4283 30 :
4284 30 : // Fsync all the layer files and directory using multiple threads to
4285 30 : // minimize latency.
4286 30 : par_fsync::par_fsync_async(&layer_paths)
4287 30 : .await
4288 30 : .context("fsync all new layers")?;
4289 :
4290 30 : let timeline_dir = self
4291 30 : .conf
4292 30 : .timeline_path(&self.tenant_shard_id, &self.timeline_id);
4293 30 :
4294 30 : par_fsync::par_fsync_async(&[timeline_dir])
4295 30 : .await
4296 30 : .context("fsync of timeline dir")?;
4297 0 : }
4298 :
4299 30 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
4300 30 : stats.new_deltas_count = Some(new_layers.len());
4301 30 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
4302 30 :
4303 30 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
4304 30 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
4305 : {
4306 30 : Ok(stats_json) => {
4307 30 : info!(
4308 30 : stats_json = stats_json.as_str(),
4309 30 : "compact_level0_phase1 stats available"
4310 30 : )
4311 : }
4312 0 : Err(e) => {
4313 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
4314 : }
4315 : }
4316 :
4317 30 : Ok(CompactLevel0Phase1Result {
4318 30 : new_layers,
4319 30 : deltas_to_compact: deltas_to_compact
4320 30 : .into_iter()
4321 300 : .map(|x| x.drop_eviction_guard())
4322 30 : .collect::<Vec<_>>(),
4323 30 : })
4324 408 : }
4325 :
4326 : ///
4327 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
4328 : /// as Level 1 files.
4329 : ///
4330 408 : async fn compact_level0(
4331 408 : self: &Arc<Self>,
4332 408 : target_file_size: u64,
4333 408 : ctx: &RequestContext,
4334 408 : ) -> Result<(), CompactionError> {
4335 : let CompactLevel0Phase1Result {
4336 408 : new_layers,
4337 408 : deltas_to_compact,
4338 : } = {
4339 408 : let phase1_span = info_span!("compact_level0_phase1");
4340 408 : let ctx = ctx.attached_child();
4341 408 : let mut stats = CompactLevel0Phase1StatsBuilder {
4342 408 : version: Some(2),
4343 408 : tenant_id: Some(self.tenant_shard_id),
4344 408 : timeline_id: Some(self.timeline_id),
4345 408 : ..Default::default()
4346 408 : };
4347 408 :
4348 408 : let begin = tokio::time::Instant::now();
4349 408 : let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
4350 408 : let now = tokio::time::Instant::now();
4351 408 : stats.read_lock_acquisition_micros =
4352 408 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
4353 408 : self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
4354 408 : .instrument(phase1_span)
4355 39560 : .await?
4356 : };
4357 :
4358 408 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
4359 : // nothing to do
4360 378 : return Ok(());
4361 30 : }
4362 30 :
4363 30 : self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
4364 0 : .await?;
4365 30 : Ok(())
4366 408 : }
4367 :
4368 30 : async fn finish_compact_batch(
4369 30 : self: &Arc<Self>,
4370 30 : new_deltas: &[ResidentLayer],
4371 30 : new_images: &[ResidentLayer],
4372 30 : layers_to_remove: &[Layer],
4373 30 : ) -> anyhow::Result<()> {
4374 30 : let mut guard = self.layers.write().await;
4375 :
4376 30 : let mut duplicated_layers = HashSet::new();
4377 30 :
4378 30 : let mut insert_layers = Vec::with_capacity(new_deltas.len());
4379 :
4380 60 : for l in new_deltas {
4381 30 : if guard.contains(l.as_ref()) {
4382 : // expected in tests
4383 0 : tracing::error!(layer=%l, "duplicated L1 layer");
4384 :
4385 : // good ways to cause a duplicate: we repeatedly error after taking the writelock
4386 : // `guard` on self.layers. as of writing this, there are no error returns except
4387 : // for compact_level0_phase1 creating an L0, which does not happen in practice
4388 : // because we have not implemented L0 => L0 compaction.
4389 0 : duplicated_layers.insert(l.layer_desc().key());
4390 30 : } else if LayerMap::is_l0(l.layer_desc()) {
4391 0 : bail!("compaction generates a L0 layer file as output, which will cause infinite compaction.");
4392 30 : } else {
4393 30 : insert_layers.push(l.clone());
4394 30 : }
4395 : }
4396 :
4397 : // only remove those inputs which were not outputs
4398 30 : let remove_layers: Vec<Layer> = layers_to_remove
4399 30 : .iter()
4400 300 : .filter(|l| !duplicated_layers.contains(&l.layer_desc().key()))
4401 30 : .cloned()
4402 30 : .collect();
4403 30 :
4404 30 : if !new_images.is_empty() {
4405 0 : guard.track_new_image_layers(new_images, &self.metrics);
4406 30 : }
4407 :
4408 : // deletion will happen later, the layer file manager calls garbage_collect_on_drop
4409 30 : guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
4410 :
4411 30 : if let Some(remote_client) = self.remote_client.as_ref() {
4412 30 : remote_client.schedule_compaction_update(&remove_layers, new_deltas)?;
4413 0 : }
4414 :
4415 30 : drop_wlock(guard);
4416 30 :
4417 30 : Ok(())
4418 30 : }
4419 :
4420 : /// Update information about which layer files need to be retained on
4421 : /// garbage collection. This is separate from actually performing the GC,
4422 : /// and is updated more frequently, so that compaction can remove obsolete
4423 : /// page versions more aggressively.
4424 : ///
4425 : /// TODO: that's wishful thinking, compaction doesn't actually do that
4426 : /// currently.
4427 : ///
4428 : /// The caller specifies how much history is needed with the 3 arguments:
4429 : ///
4430 : /// retain_lsns: keep a version of each page at these LSNs
4431 : /// cutoff_horizon: also keep everything newer than this LSN
4432 : /// pitr: the time duration required to keep data for PITR
4433 : ///
4434 : /// The 'retain_lsns' list is currently used to prevent removing files that
4435 : /// are needed by child timelines. In the future, the user might be able to
4436 : /// name additional points in time to retain. The caller is responsible for
4437 : /// collecting that information.
4438 : ///
4439 : /// The 'cutoff_horizon' point is used to retain recent versions that might still be
4440 : /// needed by read-only nodes. (As of this writing, the caller just passes
4441 : /// the latest LSN subtracted by a constant, and doesn't do anything smart
4442 : /// to figure out what read-only nodes might actually need.)
4443 : ///
4444 : /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
4445 : /// whether a record is needed for PITR.
4446 : ///
4447 : /// NOTE: This function holds a short-lived lock to protect the 'gc_info'
4448 : /// field, so that the three values passed as argument are stored
4449 : /// atomically. But the caller is responsible for ensuring that no new
4450 : /// branches are created that would need to be included in 'retain_lsns',
4451 : /// for example. The caller should hold `Tenant::gc_cs` lock to ensure
4452 : /// that.
4453 : ///
4454 816 : #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
4455 : pub(super) async fn update_gc_info(
4456 : &self,
4457 : retain_lsns: Vec<Lsn>,
4458 : cutoff_horizon: Lsn,
4459 : pitr: Duration,
4460 : cancel: &CancellationToken,
4461 : ctx: &RequestContext,
4462 : ) -> anyhow::Result<()> {
4463 : // First, calculate pitr_cutoff_timestamp and then convert it to LSN.
4464 : //
4465 : // Some unit tests depend on garbage-collection working even when
4466 : // CLOG data is missing, so that find_lsn_for_timestamp() doesn't
4467 : // work, so avoid calling it altogether if time-based retention is not
4468 : // configured. It would be pointless anyway.
4469 : let pitr_cutoff = if pitr != Duration::ZERO {
4470 : let now = SystemTime::now();
4471 : if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
4472 : let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
4473 :
4474 : match self
4475 : .find_lsn_for_timestamp(pitr_timestamp, cancel, ctx)
4476 : .await?
4477 : {
4478 : LsnForTimestamp::Present(lsn) => lsn,
4479 : LsnForTimestamp::Future(lsn) => {
4480 : // The timestamp is in the future. That sounds impossible,
4481 : // but what it really means is that there hasn't been
4482 : // any commits since the cutoff timestamp.
4483 0 : debug!("future({})", lsn);
4484 : cutoff_horizon
4485 : }
4486 : LsnForTimestamp::Past(lsn) => {
4487 0 : debug!("past({})", lsn);
4488 : // conservative, safe default is to remove nothing, when we
4489 : // have no commit timestamp data available
4490 : *self.get_latest_gc_cutoff_lsn()
4491 : }
4492 : LsnForTimestamp::NoData(lsn) => {
4493 0 : debug!("nodata({})", lsn);
4494 : // conservative, safe default is to remove nothing, when we
4495 : // have no commit timestamp data available
4496 : *self.get_latest_gc_cutoff_lsn()
4497 : }
4498 : }
4499 : } else {
4500 : // If we don't have enough data to convert to LSN,
4501 : // play safe and don't remove any layers.
4502 : *self.get_latest_gc_cutoff_lsn()
4503 : }
4504 : } else {
4505 : // No time-based retention was configured. Set time-based cutoff to
4506 : // same as LSN based.
4507 : cutoff_horizon
4508 : };
4509 :
4510 : // Grab the lock and update the values
4511 : *self.gc_info.write().unwrap() = GcInfo {
4512 : retain_lsns,
4513 : horizon_cutoff: cutoff_horizon,
4514 : pitr_cutoff,
4515 : };
4516 :
4517 : Ok(())
4518 : }
4519 :
4520 : /// Garbage collect layer files on a timeline that are no longer needed.
4521 : ///
4522 : /// Currently, we don't make any attempt at removing unneeded page versions
4523 : /// within a layer file. We can only remove the whole file if it's fully
4524 : /// obsolete.
4525 408 : pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
4526 408 : // this is most likely the background tasks, but it might be the spawned task from
4527 408 : // immediate_gc
4528 408 : let cancel = crate::task_mgr::shutdown_token();
4529 408 : let _g = tokio::select! {
4530 408 : guard = self.gc_lock.lock() => guard,
4531 : _ = self.cancel.cancelled() => return Ok(GcResult::default()),
4532 : _ = cancel.cancelled() => return Ok(GcResult::default()),
4533 : };
4534 408 : let timer = self.metrics.garbage_collect_histo.start_timer();
4535 :
4536 0 : fail_point!("before-timeline-gc");
4537 :
4538 : // Is the timeline being deleted?
4539 408 : if self.is_stopping() {
4540 0 : anyhow::bail!("timeline is Stopping");
4541 408 : }
4542 408 :
4543 408 : let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
4544 408 : let gc_info = self.gc_info.read().unwrap();
4545 408 :
4546 408 : let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
4547 408 : let pitr_cutoff = gc_info.pitr_cutoff;
4548 408 : let retain_lsns = gc_info.retain_lsns.clone();
4549 408 : (horizon_cutoff, pitr_cutoff, retain_lsns)
4550 408 : };
4551 408 :
4552 408 : let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
4553 :
4554 408 : let res = self
4555 408 : .gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
4556 408 : .instrument(
4557 408 : info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
4558 : )
4559 1 : .await?;
4560 :
4561 : // only record successes
4562 408 : timer.stop_and_record();
4563 408 :
4564 408 : Ok(res)
4565 408 : }
4566 :
4567 408 : async fn gc_timeline(
4568 408 : &self,
4569 408 : horizon_cutoff: Lsn,
4570 408 : pitr_cutoff: Lsn,
4571 408 : retain_lsns: Vec<Lsn>,
4572 408 : new_gc_cutoff: Lsn,
4573 408 : ) -> anyhow::Result<GcResult> {
4574 408 : let now = SystemTime::now();
4575 408 : let mut result: GcResult = GcResult::default();
4576 408 :
4577 408 : // Nothing to GC. Return early.
4578 408 : let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
4579 408 : if latest_gc_cutoff >= new_gc_cutoff {
4580 0 : info!(
4581 0 : "Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
4582 0 : );
4583 0 : return Ok(result);
4584 408 : }
4585 :
4586 : // We need to ensure that no one tries to read page versions or create
4587 : // branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
4588 : // for details. This will block until the old value is no longer in use.
4589 : //
4590 : // The GC cutoff should only ever move forwards.
4591 408 : let waitlist = {
4592 408 : let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
4593 408 : ensure!(
4594 408 : *write_guard <= new_gc_cutoff,
4595 0 : "Cannot move GC cutoff LSN backwards (was {}, new {})",
4596 0 : *write_guard,
4597 : new_gc_cutoff
4598 : );
4599 408 : write_guard.store_and_unlock(new_gc_cutoff)
4600 408 : };
4601 408 : waitlist.wait().await;
4602 :
4603 408 : info!("GC starting");
4604 :
4605 0 : debug!("retain_lsns: {:?}", retain_lsns);
4606 :
4607 408 : let mut layers_to_remove = Vec::new();
4608 :
4609 : // Scan all layers in the timeline (remote or on-disk).
4610 : //
4611 : // Garbage collect the layer if all conditions are satisfied:
4612 : // 1. it is older than cutoff LSN;
4613 : // 2. it is older than PITR interval;
4614 : // 3. it doesn't need to be retained for 'retain_lsns';
4615 : // 4. newer on-disk image layers cover the layer's whole key range
4616 : //
4617 : // TODO holding a write lock is too agressive and avoidable
4618 408 : let mut guard = self.layers.write().await;
4619 408 : let layers = guard.layer_map();
4620 2404 : 'outer: for l in layers.iter_historic_layers() {
4621 2404 : result.layers_total += 1;
4622 2404 :
4623 2404 : // 1. Is it newer than GC horizon cutoff point?
4624 2404 : if l.get_lsn_range().end > horizon_cutoff {
4625 0 : debug!(
4626 0 : "keeping {} because it's newer than horizon_cutoff {}",
4627 0 : l.filename(),
4628 0 : horizon_cutoff,
4629 0 : );
4630 408 : result.layers_needed_by_cutoff += 1;
4631 408 : continue 'outer;
4632 1996 : }
4633 1996 :
4634 1996 : // 2. It is newer than PiTR cutoff point?
4635 1996 : if l.get_lsn_range().end > pitr_cutoff {
4636 0 : debug!(
4637 0 : "keeping {} because it's newer than pitr_cutoff {}",
4638 0 : l.filename(),
4639 0 : pitr_cutoff,
4640 0 : );
4641 0 : result.layers_needed_by_pitr += 1;
4642 0 : continue 'outer;
4643 1996 : }
4644 :
4645 : // 3. Is it needed by a child branch?
4646 : // NOTE With that we would keep data that
4647 : // might be referenced by child branches forever.
4648 : // We can track this in child timeline GC and delete parent layers when
4649 : // they are no longer needed. This might be complicated with long inheritance chains.
4650 : //
4651 : // TODO Vec is not a great choice for `retain_lsns`
4652 1996 : for retain_lsn in &retain_lsns {
4653 : // start_lsn is inclusive
4654 12 : if &l.get_lsn_range().start <= retain_lsn {
4655 0 : debug!(
4656 0 : "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
4657 0 : l.filename(),
4658 0 : retain_lsn,
4659 0 : l.is_incremental(),
4660 0 : );
4661 12 : result.layers_needed_by_branches += 1;
4662 12 : continue 'outer;
4663 0 : }
4664 : }
4665 :
4666 : // 4. Is there a later on-disk layer for this relation?
4667 : //
4668 : // The end-LSN is exclusive, while disk_consistent_lsn is
4669 : // inclusive. For example, if disk_consistent_lsn is 100, it is
4670 : // OK for a delta layer to have end LSN 101, but if the end LSN
4671 : // is 102, then it might not have been fully flushed to disk
4672 : // before crash.
4673 : //
4674 : // For example, imagine that the following layers exist:
4675 : //
4676 : // 1000 - image (A)
4677 : // 1000-2000 - delta (B)
4678 : // 2000 - image (C)
4679 : // 2000-3000 - delta (D)
4680 : // 3000 - image (E)
4681 : //
4682 : // If GC horizon is at 2500, we can remove layers A and B, but
4683 : // we cannot remove C, even though it's older than 2500, because
4684 : // the delta layer 2000-3000 depends on it.
4685 1984 : if !layers
4686 1984 : .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
4687 : {
4688 0 : debug!("keeping {} because it is the latest layer", l.filename());
4689 1984 : result.layers_not_updated += 1;
4690 1984 : continue 'outer;
4691 0 : }
4692 :
4693 : // We didn't find any reason to keep this file, so remove it.
4694 0 : debug!(
4695 0 : "garbage collecting {} is_dropped: xx is_incremental: {}",
4696 0 : l.filename(),
4697 0 : l.is_incremental(),
4698 0 : );
4699 0 : layers_to_remove.push(l);
4700 : }
4701 :
4702 408 : if !layers_to_remove.is_empty() {
4703 : // Persist the new GC cutoff value before we actually remove anything.
4704 : // This unconditionally schedules also an index_part.json update, even though, we will
4705 : // be doing one a bit later with the unlinked gc'd layers.
4706 0 : let disk_consistent_lsn = self.disk_consistent_lsn.load();
4707 0 : self.schedule_uploads(disk_consistent_lsn, None)?;
4708 :
4709 0 : let gc_layers = layers_to_remove
4710 0 : .iter()
4711 0 : .map(|x| guard.get_from_desc(x))
4712 0 : .collect::<Vec<Layer>>();
4713 0 :
4714 0 : result.layers_removed = gc_layers.len() as u64;
4715 :
4716 0 : if let Some(remote_client) = self.remote_client.as_ref() {
4717 0 : remote_client.schedule_gc_update(&gc_layers)?;
4718 0 : }
4719 :
4720 0 : guard.finish_gc_timeline(&gc_layers);
4721 0 :
4722 0 : #[cfg(feature = "testing")]
4723 0 : {
4724 0 : result.doomed_layers = gc_layers;
4725 0 : }
4726 408 : }
4727 :
4728 408 : info!(
4729 408 : "GC completed removing {} layers, cutoff {}",
4730 408 : result.layers_removed, new_gc_cutoff
4731 408 : );
4732 :
4733 408 : result.elapsed = now.elapsed()?;
4734 408 : Ok(result)
4735 408 : }
4736 :
4737 : /// Reconstruct a value, using the given base image and WAL records in 'data'.
4738 502736 : async fn reconstruct_value(
4739 502736 : &self,
4740 502736 : key: Key,
4741 502736 : request_lsn: Lsn,
4742 502736 : mut data: ValueReconstructState,
4743 502736 : ) -> Result<Bytes, PageReconstructError> {
4744 502736 : // Perform WAL redo if needed
4745 502736 : data.records.reverse();
4746 502736 :
4747 502736 : // If we have a page image, and no WAL, we're all set
4748 502736 : if data.records.is_empty() {
4749 502730 : if let Some((img_lsn, img)) = &data.img {
4750 0 : trace!(
4751 0 : "found page image for key {} at {}, no WAL redo required, req LSN {}",
4752 0 : key,
4753 0 : img_lsn,
4754 0 : request_lsn,
4755 0 : );
4756 502730 : Ok(img.clone())
4757 : } else {
4758 0 : Err(PageReconstructError::from(anyhow!(
4759 0 : "base image for {key} at {request_lsn} not found"
4760 0 : )))
4761 : }
4762 : } else {
4763 : // We need to do WAL redo.
4764 : //
4765 : // If we don't have a base image, then the oldest WAL record better initialize
4766 : // the page
4767 6 : if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
4768 0 : Err(PageReconstructError::from(anyhow!(
4769 0 : "Base image for {} at {} not found, but got {} WAL records",
4770 0 : key,
4771 0 : request_lsn,
4772 0 : data.records.len()
4773 0 : )))
4774 : } else {
4775 6 : if data.img.is_some() {
4776 0 : trace!(
4777 0 : "found {} WAL records and a base image for {} at {}, performing WAL redo",
4778 0 : data.records.len(),
4779 0 : key,
4780 0 : request_lsn
4781 0 : );
4782 : } else {
4783 0 : trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
4784 : };
4785 :
4786 6 : let last_rec_lsn = data.records.last().unwrap().0;
4787 :
4788 6 : let img = match self
4789 6 : .walredo_mgr
4790 6 : .as_ref()
4791 6 : .context("timeline has no walredo manager")
4792 6 : .map_err(PageReconstructError::WalRedo)?
4793 6 : .request_redo(key, request_lsn, data.img, data.records, self.pg_version)
4794 0 : .await
4795 6 : .context("reconstruct a page image")
4796 : {
4797 6 : Ok(img) => img,
4798 0 : Err(e) => return Err(PageReconstructError::WalRedo(e)),
4799 : };
4800 :
4801 6 : if img.len() == page_cache::PAGE_SZ {
4802 0 : let cache = page_cache::get();
4803 0 : if let Err(e) = cache
4804 0 : .memorize_materialized_page(
4805 0 : self.tenant_shard_id,
4806 0 : self.timeline_id,
4807 0 : key,
4808 0 : last_rec_lsn,
4809 0 : &img,
4810 0 : )
4811 0 : .await
4812 0 : .context("Materialized page memoization failed")
4813 : {
4814 0 : return Err(PageReconstructError::from(e));
4815 0 : }
4816 6 : }
4817 :
4818 6 : Ok(img)
4819 : }
4820 : }
4821 502736 : }
4822 :
4823 0 : pub(crate) async fn spawn_download_all_remote_layers(
4824 0 : self: Arc<Self>,
4825 0 : request: DownloadRemoteLayersTaskSpawnRequest,
4826 0 : ) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
4827 0 : use pageserver_api::models::DownloadRemoteLayersTaskState;
4828 0 :
4829 0 : // this is not really needed anymore; it has tests which really check the return value from
4830 0 : // http api. it would be better not to maintain this anymore.
4831 0 :
4832 0 : let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
4833 0 : if let Some(st) = &*status_guard {
4834 0 : match &st.state {
4835 : DownloadRemoteLayersTaskState::Running => {
4836 0 : return Err(st.clone());
4837 : }
4838 : DownloadRemoteLayersTaskState::ShutDown
4839 0 : | DownloadRemoteLayersTaskState::Completed => {
4840 0 : *status_guard = None;
4841 0 : }
4842 : }
4843 0 : }
4844 :
4845 0 : let self_clone = Arc::clone(&self);
4846 0 : let task_id = task_mgr::spawn(
4847 0 : task_mgr::BACKGROUND_RUNTIME.handle(),
4848 0 : task_mgr::TaskKind::DownloadAllRemoteLayers,
4849 0 : Some(self.tenant_shard_id),
4850 0 : Some(self.timeline_id),
4851 0 : "download all remote layers task",
4852 : false,
4853 0 : async move {
4854 0 : self_clone.download_all_remote_layers(request).await;
4855 0 : let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
4856 0 : match &mut *status_guard {
4857 : None => {
4858 0 : warn!("tasks status is supposed to be Some(), since we are running");
4859 : }
4860 0 : Some(st) => {
4861 0 : let exp_task_id = format!("{}", task_mgr::current_task_id().unwrap());
4862 0 : if st.task_id != exp_task_id {
4863 0 : warn!("task id changed while we were still running, expecting {} but have {}", exp_task_id, st.task_id);
4864 0 : } else {
4865 0 : st.state = DownloadRemoteLayersTaskState::Completed;
4866 0 : }
4867 : }
4868 : };
4869 0 : Ok(())
4870 0 : }
4871 0 : .instrument(info_span!(parent: None, "download_all_remote_layers", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
4872 : );
4873 :
4874 0 : let initial_info = DownloadRemoteLayersTaskInfo {
4875 0 : task_id: format!("{task_id}"),
4876 0 : state: DownloadRemoteLayersTaskState::Running,
4877 0 : total_layer_count: 0,
4878 0 : successful_download_count: 0,
4879 0 : failed_download_count: 0,
4880 0 : };
4881 0 : *status_guard = Some(initial_info.clone());
4882 0 :
4883 0 : Ok(initial_info)
4884 0 : }
4885 :
4886 0 : async fn download_all_remote_layers(
4887 0 : self: &Arc<Self>,
4888 0 : request: DownloadRemoteLayersTaskSpawnRequest,
4889 0 : ) {
4890 : use pageserver_api::models::DownloadRemoteLayersTaskState;
4891 :
4892 0 : let remaining = {
4893 0 : let guard = self.layers.read().await;
4894 0 : guard
4895 0 : .layer_map()
4896 0 : .iter_historic_layers()
4897 0 : .map(|desc| guard.get_from_desc(&desc))
4898 0 : .collect::<Vec<_>>()
4899 0 : };
4900 0 : let total_layer_count = remaining.len();
4901 0 :
4902 0 : macro_rules! lock_status {
4903 0 : ($st:ident) => {
4904 0 : let mut st = self.download_all_remote_layers_task_info.write().unwrap();
4905 0 : let st = st
4906 0 : .as_mut()
4907 0 : .expect("this function is only called after the task has been spawned");
4908 0 : assert_eq!(
4909 0 : st.task_id,
4910 0 : format!(
4911 0 : "{}",
4912 0 : task_mgr::current_task_id().expect("we run inside a task_mgr task")
4913 0 : )
4914 0 : );
4915 0 : let $st = st;
4916 0 : };
4917 0 : }
4918 0 :
4919 0 : {
4920 0 : lock_status!(st);
4921 0 : st.total_layer_count = total_layer_count as u64;
4922 0 : }
4923 0 :
4924 0 : let mut remaining = remaining.into_iter();
4925 0 : let mut have_remaining = true;
4926 0 : let mut js = tokio::task::JoinSet::new();
4927 0 :
4928 0 : let cancel = task_mgr::shutdown_token();
4929 0 :
4930 0 : let limit = request.max_concurrent_downloads;
4931 :
4932 : loop {
4933 0 : while js.len() < limit.get() && have_remaining && !cancel.is_cancelled() {
4934 0 : let Some(next) = remaining.next() else {
4935 0 : have_remaining = false;
4936 0 : break;
4937 : };
4938 :
4939 0 : let span = tracing::info_span!("download", layer = %next);
4940 :
4941 0 : js.spawn(
4942 0 : async move {
4943 0 : let res = next.download().await;
4944 0 : (next, res)
4945 0 : }
4946 0 : .instrument(span),
4947 0 : );
4948 : }
4949 :
4950 0 : while let Some(res) = js.join_next().await {
4951 0 : match res {
4952 : Ok((_, Ok(_))) => {
4953 0 : lock_status!(st);
4954 0 : st.successful_download_count += 1;
4955 : }
4956 0 : Ok((layer, Err(e))) => {
4957 0 : tracing::error!(%layer, "download failed: {e:#}");
4958 0 : lock_status!(st);
4959 0 : st.failed_download_count += 1;
4960 : }
4961 0 : Err(je) if je.is_cancelled() => unreachable!("not used here"),
4962 0 : Err(je) if je.is_panic() => {
4963 0 : lock_status!(st);
4964 0 : st.failed_download_count += 1;
4965 : }
4966 0 : Err(je) => tracing::warn!("unknown joinerror: {je:?}"),
4967 : }
4968 : }
4969 :
4970 0 : if js.is_empty() && (!have_remaining || cancel.is_cancelled()) {
4971 0 : break;
4972 0 : }
4973 : }
4974 :
4975 : {
4976 0 : lock_status!(st);
4977 0 : st.state = DownloadRemoteLayersTaskState::Completed;
4978 0 : }
4979 0 : }
4980 :
4981 0 : pub(crate) fn get_download_all_remote_layers_task_info(
4982 0 : &self,
4983 0 : ) -> Option<DownloadRemoteLayersTaskInfo> {
4984 0 : self.download_all_remote_layers_task_info
4985 0 : .read()
4986 0 : .unwrap()
4987 0 : .clone()
4988 0 : }
4989 : }
4990 :
4991 : impl Timeline {
4992 : /// Returns non-remote layers for eviction.
4993 0 : pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
4994 0 : let guard = self.layers.read().await;
4995 0 : let mut max_layer_size: Option<u64> = None;
4996 :
4997 0 : let resident_layers = guard
4998 0 : .resident_layers()
4999 0 : .map(|layer| {
5000 0 : let file_size = layer.layer_desc().file_size;
5001 0 : max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
5002 0 :
5003 0 : let last_activity_ts = layer.access_stats().latest_activity_or_now();
5004 0 :
5005 0 : EvictionCandidate {
5006 0 : layer: layer.into(),
5007 0 : last_activity_ts,
5008 0 : relative_last_activity: finite_f32::FiniteF32::ZERO,
5009 0 : }
5010 0 : })
5011 0 : .collect()
5012 0 : .await;
5013 :
5014 0 : DiskUsageEvictionInfo {
5015 0 : max_layer_size,
5016 0 : resident_layers,
5017 0 : }
5018 0 : }
5019 :
5020 560 : pub(crate) fn get_shard_index(&self) -> ShardIndex {
5021 560 : ShardIndex {
5022 560 : shard_number: self.tenant_shard_id.shard_number,
5023 560 : shard_count: self.tenant_shard_id.shard_count,
5024 560 : }
5025 560 : }
5026 : }
5027 :
5028 : type TraversalPathItem = (
5029 : ValueReconstructResult,
5030 : Lsn,
5031 : Box<dyn Send + FnOnce() -> TraversalId>,
5032 : );
5033 :
5034 : /// Helper function for get_reconstruct_data() to add the path of layers traversed
5035 : /// to an error, as anyhow context information.
5036 106 : fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructError {
5037 106 : // We want the original 'msg' to be the outermost context. The outermost context
5038 106 : // is the most high-level information, which also gets propagated to the client.
5039 106 : let mut msg_iter = path
5040 106 : .into_iter()
5041 108 : .map(|(r, c, l)| {
5042 108 : format!(
5043 108 : "layer traversal: result {:?}, cont_lsn {}, layer: {}",
5044 108 : r,
5045 108 : c,
5046 108 : l(),
5047 108 : )
5048 108 : })
5049 106 : .chain(std::iter::once(msg));
5050 106 : // Construct initial message from the first traversed layer
5051 106 : let err = anyhow!(msg_iter.next().unwrap());
5052 106 :
5053 106 : // Append all subsequent traversals, and the error message 'msg', as contexts.
5054 108 : let msg = msg_iter.fold(err, |err, msg| err.context(msg));
5055 106 : PageReconstructError::from(msg)
5056 106 : }
5057 :
5058 : /// Various functions to mutate the timeline.
5059 : // TODO Currently, Deref is used to allow easy access to read methods from this trait.
5060 : // This is probably considered a bad practice in Rust and should be fixed eventually,
5061 : // but will cause large code changes.
5062 : pub(crate) struct TimelineWriter<'a> {
5063 : tl: &'a Timeline,
5064 : _write_guard: tokio::sync::MutexGuard<'a, ()>,
5065 : }
5066 :
5067 : impl Deref for TimelineWriter<'_> {
5068 : type Target = Timeline;
5069 :
5070 2424 : fn deref(&self) -> &Self::Target {
5071 2424 : self.tl
5072 2424 : }
5073 : }
5074 :
5075 : impl<'a> TimelineWriter<'a> {
5076 : /// Put a new page version that can be constructed from a WAL record
5077 : ///
5078 : /// This will implicitly extend the relation, if the page is beyond the
5079 : /// current end-of-file.
5080 2214102 : pub(crate) async fn put(
5081 2214102 : &self,
5082 2214102 : key: Key,
5083 2214102 : lsn: Lsn,
5084 2214102 : value: &Value,
5085 2214102 : ctx: &RequestContext,
5086 2214102 : ) -> anyhow::Result<()> {
5087 2214102 : self.tl.put_value(key, lsn, value, ctx).await
5088 2214102 : }
5089 :
5090 413940 : pub(crate) async fn put_batch(
5091 413940 : &self,
5092 413940 : batch: &HashMap<Key, Vec<(Lsn, Value)>>,
5093 413940 : ctx: &RequestContext,
5094 413940 : ) -> anyhow::Result<()> {
5095 413940 : self.tl.put_values(batch, ctx).await
5096 413940 : }
5097 :
5098 2 : pub(crate) async fn delete_batch(&self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
5099 2 : self.tl.put_tombstones(batch).await
5100 2 : }
5101 :
5102 : /// Track the end of the latest digested WAL record.
5103 : /// Remember the (end of) last valid WAL record remembered in the timeline.
5104 : ///
5105 : /// Call this after you have finished writing all the WAL up to 'lsn'.
5106 : ///
5107 : /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for
5108 : /// the 'lsn' or anything older. The previous last record LSN is stored alongside
5109 : /// the latest and can be read.
5110 3102912 : pub(crate) fn finish_write(&self, new_lsn: Lsn) {
5111 3102912 : self.tl.finish_write(new_lsn);
5112 3102912 : }
5113 :
5114 270570 : pub(crate) fn update_current_logical_size(&self, delta: i64) {
5115 270570 : self.tl.update_current_logical_size(delta)
5116 270570 : }
5117 : }
5118 :
5119 : // We need TimelineWriter to be send in upcoming conversion of
5120 : // Timeline::layers to tokio::sync::RwLock.
5121 2 : #[test]
5122 2 : fn is_send() {
5123 2 : fn _assert_send<T: Send>() {}
5124 2 : _assert_send::<TimelineWriter<'_>>();
5125 2 : }
5126 :
5127 : /// Add a suffix to a layer file's name: .{num}.old
5128 : /// Uses the first available num (starts at 0)
5129 0 : fn rename_to_backup(path: &Utf8Path) -> anyhow::Result<()> {
5130 0 : let filename = path
5131 0 : .file_name()
5132 0 : .ok_or_else(|| anyhow!("Path {path} don't have a file name"))?;
5133 0 : let mut new_path = path.to_owned();
5134 :
5135 0 : for i in 0u32.. {
5136 0 : new_path.set_file_name(format!("{filename}.{i}.old"));
5137 0 : if !new_path.exists() {
5138 0 : std::fs::rename(path, &new_path)
5139 0 : .with_context(|| format!("rename {path:?} to {new_path:?}"))?;
5140 0 : return Ok(());
5141 0 : }
5142 : }
5143 :
5144 0 : bail!("couldn't find an unused backup number for {:?}", path)
5145 0 : }
5146 :
5147 : #[cfg(test)]
5148 : mod tests {
5149 : use utils::{id::TimelineId, lsn::Lsn};
5150 :
5151 : use crate::tenant::{
5152 : harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline,
5153 : };
5154 :
5155 2 : #[tokio::test]
5156 2 : async fn two_layer_eviction_attempts_at_the_same_time() {
5157 2 : let harness =
5158 2 : TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
5159 2 :
5160 2 : let ctx = any_context();
5161 2 : let tenant = harness.do_try_load(&ctx).await.unwrap();
5162 2 : let timeline = tenant
5163 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
5164 6 : .await
5165 2 : .unwrap();
5166 2 :
5167 2 : let layer = find_some_layer(&timeline).await;
5168 2 : let layer = layer
5169 2 : .keep_resident()
5170 2 : .await
5171 2 : .expect("no download => no downloading errors")
5172 2 : .expect("should had been resident")
5173 2 : .drop_eviction_guard();
5174 2 :
5175 2 : let first = async { layer.evict_and_wait().await };
5176 2 : let second = async { layer.evict_and_wait().await };
5177 2 :
5178 3 : let (first, second) = tokio::join!(first, second);
5179 2 :
5180 2 : let res = layer.keep_resident().await;
5181 2 : assert!(matches!(res, Ok(None)), "{res:?}");
5182 2 :
5183 2 : match (first, second) {
5184 2 : (Ok(()), Ok(())) => {
5185 0 : // because there are no more timeline locks being taken on eviction path, we can
5186 0 : // witness all three outcomes here.
5187 0 : }
5188 2 : (Ok(()), Err(EvictionError::NotFound)) | (Err(EvictionError::NotFound), Ok(())) => {
5189 2 : // if one completes before the other, this is fine just as well.
5190 2 : }
5191 2 : other => unreachable!("unexpected {:?}", other),
5192 2 : }
5193 2 : }
5194 :
5195 2 : fn any_context() -> crate::context::RequestContext {
5196 2 : use crate::context::*;
5197 2 : use crate::task_mgr::*;
5198 2 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
5199 2 : }
5200 :
5201 2 : async fn find_some_layer(timeline: &Timeline) -> Layer {
5202 2 : let layers = timeline.layers.read().await;
5203 2 : let desc = layers
5204 2 : .layer_map()
5205 2 : .iter_historic_layers()
5206 2 : .next()
5207 2 : .expect("must find one layer to evict");
5208 2 :
5209 2 : layers.get_from_desc(&desc)
5210 2 : }
5211 : }
|