Line data Source code
1 : pub mod delete;
2 : mod eviction_task;
3 : mod init;
4 : pub mod layer_manager;
5 : pub(crate) mod logical_size;
6 : pub mod span;
7 : pub mod uninit;
8 : mod walreceiver;
9 :
10 : use anyhow::{anyhow, bail, ensure, Context, Result};
11 : use bytes::Bytes;
12 : use camino::{Utf8Path, Utf8PathBuf};
13 : use enumset::EnumSet;
14 : use fail::fail_point;
15 : use itertools::Itertools;
16 : use pageserver_api::{
17 : keyspace::{key_range_size, KeySpaceAccum},
18 : models::{
19 : DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
20 : LayerMapInfo, TimelineState,
21 : },
22 : reltag::BlockNumber,
23 : shard::{ShardIdentity, TenantShardId},
24 : };
25 : use rand::Rng;
26 : use serde_with::serde_as;
27 : use storage_broker::BrokerClientChannel;
28 : use tokio::{
29 : runtime::Handle,
30 : sync::{oneshot, watch},
31 : };
32 : use tokio_util::sync::CancellationToken;
33 : use tracing::*;
34 : use utils::sync::gate::Gate;
35 :
36 : use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet};
37 : use std::ops::{Deref, Range};
38 : use std::pin::pin;
39 : use std::sync::atomic::Ordering as AtomicOrdering;
40 : use std::sync::{Arc, Mutex, RwLock, Weak};
41 : use std::time::{Duration, Instant, SystemTime};
42 : use std::{
43 : cmp::{max, min, Ordering},
44 : ops::ControlFlow,
45 : };
46 :
47 : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
48 : use crate::tenant::{
49 : layer_map::{LayerMap, SearchResult},
50 : metadata::{save_metadata, TimelineMetadata},
51 : par_fsync,
52 : };
53 : use crate::{
54 : context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder},
55 : disk_usage_eviction_task::DiskUsageEvictionInfo,
56 : };
57 : use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
58 : use crate::{
59 : disk_usage_eviction_task::finite_f32,
60 : tenant::storage_layer::{
61 : AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
62 : LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
63 : ValueReconstructState,
64 : },
65 : };
66 : use crate::{
67 : disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
68 : };
69 : use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
70 :
71 : use crate::config::PageServerConf;
72 : use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
73 : use crate::metrics::{
74 : TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
75 : };
76 : use crate::pgdatadir_mapping::CalculateLogicalSizeError;
77 : use crate::tenant::config::TenantConfOpt;
78 : use pageserver_api::key::{is_inherited_key, is_rel_fsm_block_key, is_rel_vm_block_key};
79 : use pageserver_api::reltag::RelTag;
80 : use pageserver_api::shard::ShardIndex;
81 :
82 : use postgres_connection::PgConnectionConfig;
83 : use postgres_ffi::to_pg_timestamp;
84 : use utils::{
85 : completion,
86 : generation::Generation,
87 : id::TimelineId,
88 : lsn::{AtomicLsn, Lsn, RecordLsn},
89 : seqwait::SeqWait,
90 : simple_rcu::{Rcu, RcuReadGuard},
91 : };
92 :
93 : use crate::page_cache;
94 : use crate::repository::GcResult;
95 : use crate::repository::{Key, Value};
96 : use crate::task_mgr;
97 : use crate::task_mgr::TaskKind;
98 : use crate::ZERO_PAGE;
99 :
100 : use self::delete::DeleteTimelineFlow;
101 : pub(super) use self::eviction_task::EvictionTaskTenantState;
102 : use self::eviction_task::EvictionTaskTimelineState;
103 : use self::layer_manager::LayerManager;
104 : use self::logical_size::LogicalSize;
105 : use self::walreceiver::{WalReceiver, WalReceiverConf};
106 :
107 : use super::config::TenantConf;
108 : use super::remote_timeline_client::index::{IndexLayerMetadata, IndexPart};
109 : use super::remote_timeline_client::RemoteTimelineClient;
110 : use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
111 : use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
112 :
113 44 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
114 : pub(super) enum FlushLoopState {
115 : NotStarted,
116 : Running {
117 : #[cfg(test)]
118 : expect_initdb_optimization: bool,
119 : #[cfg(test)]
120 : initdb_optimization_count: usize,
121 : },
122 : Exited,
123 : }
124 :
125 : /// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
126 0 : #[derive(Debug, Clone, PartialEq, Eq)]
127 : pub(crate) struct Hole {
128 : key_range: Range<Key>,
129 : coverage_size: usize,
130 : }
131 :
132 : impl Ord for Hole {
133 266 : fn cmp(&self, other: &Self) -> Ordering {
134 266 : other.coverage_size.cmp(&self.coverage_size) // inverse order
135 266 : }
136 : }
137 :
138 : impl PartialOrd for Hole {
139 266 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
140 266 : Some(self.cmp(other))
141 266 : }
142 : }
143 :
144 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
145 : /// Can be removed after all refactors are done.
146 293 : fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
147 293 : drop(rlock)
148 293 : }
149 :
150 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
151 : /// Can be removed after all refactors are done.
152 1892 : fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
153 1892 : drop(rlock)
154 1892 : }
155 :
156 : /// The outward-facing resources required to build a Timeline
157 : pub struct TimelineResources {
158 : pub remote_client: Option<RemoteTimelineClient>,
159 : pub deletion_queue_client: DeletionQueueClient,
160 : }
161 :
162 : pub struct Timeline {
163 : conf: &'static PageServerConf,
164 : tenant_conf: Arc<RwLock<AttachedTenantConf>>,
165 :
166 : myself: Weak<Self>,
167 :
168 : pub(crate) tenant_shard_id: TenantShardId,
169 : pub timeline_id: TimelineId,
170 :
171 : /// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
172 : /// Never changes for the lifetime of this [`Timeline`] object.
173 : ///
174 : /// This duplicates the generation stored in LocationConf, but that structure is mutable:
175 : /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
176 : pub(crate) generation: Generation,
177 :
178 : /// The detailed sharding information from our parent Tenant. This enables us to map keys
179 : /// to shards, and is constant through the lifetime of this Timeline.
180 : shard_identity: ShardIdentity,
181 :
182 : pub pg_version: u32,
183 :
184 : /// The tuple has two elements.
185 : /// 1. `LayerFileManager` keeps track of the various physical representations of the layer files (inmem, local, remote).
186 : /// 2. `LayerMap`, the acceleration data structure for `get_reconstruct_data`.
187 : ///
188 : /// `LayerMap` maps out the `(PAGE,LSN) / (KEY,LSN)` space, which is composed of `(KeyRange, LsnRange)` rectangles.
189 : /// We describe these rectangles through the `PersistentLayerDesc` struct.
190 : ///
191 : /// When we want to reconstruct a page, we first find the `PersistentLayerDesc`'s that we need for page reconstruction,
192 : /// using `LayerMap`. Then, we use `LayerFileManager` to get the `PersistentLayer`'s that correspond to the
193 : /// `PersistentLayerDesc`'s.
194 : ///
195 : /// Hence, it's important to keep things coherent. The `LayerFileManager` must always have an entry for all
196 : /// `PersistentLayerDesc`'s in the `LayerMap`. If it doesn't, `LayerFileManager::get_from_desc` will panic at
197 : /// runtime, e.g., during page reconstruction.
198 : ///
199 : /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
200 : /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
201 : pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
202 :
203 : /// Set of key ranges which should be covered by image layers to
204 : /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
205 : /// It is used by compaction task when it checks if new image layer should be created.
206 : /// Newly created image layer doesn't help to remove the delta layer, until the
207 : /// newly created image layer falls off the PITR horizon. So on next GC cycle,
208 : /// gc_timeline may still want the new image layer to be created. To avoid redundant
209 : /// image layers creation we should check if image layer exists but beyond PITR horizon.
210 : /// This is why we need remember GC cutoff LSN.
211 : ///
212 : wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
213 :
214 : last_freeze_at: AtomicLsn,
215 : // Atomic would be more appropriate here.
216 : last_freeze_ts: RwLock<Instant>,
217 :
218 : pub(crate) standby_horizon: AtomicLsn,
219 :
220 : // WAL redo manager. `None` only for broken tenants.
221 : walredo_mgr: Option<Arc<super::WalRedoManager>>,
222 :
223 : /// Remote storage client.
224 : /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
225 : pub remote_client: Option<Arc<RemoteTimelineClient>>,
226 :
227 : // What page versions do we hold in the repository? If we get a
228 : // request > last_record_lsn, we need to wait until we receive all
229 : // the WAL up to the request. The SeqWait provides functions for
230 : // that. TODO: If we get a request for an old LSN, such that the
231 : // versions have already been garbage collected away, we should
232 : // throw an error, but we don't track that currently.
233 : //
234 : // last_record_lsn.load().last points to the end of last processed WAL record.
235 : //
236 : // We also remember the starting point of the previous record in
237 : // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
238 : // first WAL record when the node is started up. But here, we just
239 : // keep track of it.
240 : last_record_lsn: SeqWait<RecordLsn, Lsn>,
241 :
242 : // All WAL records have been processed and stored durably on files on
243 : // local disk, up to this LSN. On crash and restart, we need to re-process
244 : // the WAL starting from this point.
245 : //
246 : // Some later WAL records might have been processed and also flushed to disk
247 : // already, so don't be surprised to see some, but there's no guarantee on
248 : // them yet.
249 : disk_consistent_lsn: AtomicLsn,
250 :
251 : // Parent timeline that this timeline was branched from, and the LSN
252 : // of the branch point.
253 : ancestor_timeline: Option<Arc<Timeline>>,
254 : ancestor_lsn: Lsn,
255 :
256 : pub(super) metrics: TimelineMetrics,
257 :
258 : // `Timeline` doesn't write these metrics itself, but it manages the lifetime. Code
259 : // in `crate::page_service` writes these metrics.
260 : pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
261 :
262 : /// Ensures layers aren't frozen by checkpointer between
263 : /// [`Timeline::get_layer_for_write`] and layer reads.
264 : /// Locked automatically by [`TimelineWriter`] and checkpointer.
265 : /// Must always be acquired before the layer map/individual layer lock
266 : /// to avoid deadlock.
267 : write_lock: tokio::sync::Mutex<()>,
268 :
269 : /// Used to avoid multiple `flush_loop` tasks running
270 : pub(super) flush_loop_state: Mutex<FlushLoopState>,
271 :
272 : /// layer_flush_start_tx can be used to wake up the layer-flushing task.
273 : /// The value is a counter, incremented every time a new flush cycle is requested.
274 : /// The flush cycle counter is sent back on the layer_flush_done channel when
275 : /// the flush finishes. You can use that to wait for the flush to finish.
276 : layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
277 : /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
278 : layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
279 :
280 : // Needed to ensure that we can't create a branch at a point that was already garbage collected
281 : pub latest_gc_cutoff_lsn: Rcu<Lsn>,
282 :
283 : // List of child timelines and their branch points. This is needed to avoid
284 : // garbage collecting data that is still needed by the child timelines.
285 : pub gc_info: std::sync::RwLock<GcInfo>,
286 :
287 : // It may change across major versions so for simplicity
288 : // keep it after running initdb for a timeline.
289 : // It is needed in checks when we want to error on some operations
290 : // when they are requested for pre-initdb lsn.
291 : // It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn",
292 : // though let's keep them both for better error visibility.
293 : pub initdb_lsn: Lsn,
294 :
295 : /// When did we last calculate the partitioning?
296 : partitioning: Mutex<(KeyPartitioning, Lsn)>,
297 :
298 : /// Configuration: how often should the partitioning be recalculated.
299 : repartition_threshold: u64,
300 :
301 : /// Current logical size of the "datadir", at the last LSN.
302 : current_logical_size: LogicalSize,
303 :
304 : /// Information about the last processed message by the WAL receiver,
305 : /// or None if WAL receiver has not received anything for this timeline
306 : /// yet.
307 : pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
308 : pub walreceiver: Mutex<Option<WalReceiver>>,
309 :
310 : /// Relation size cache
311 : pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
312 :
313 : download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
314 :
315 : state: watch::Sender<TimelineState>,
316 :
317 : /// Prevent two tasks from deleting the timeline at the same time. If held, the
318 : /// timeline is being deleted. If 'true', the timeline has already been deleted.
319 : pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
320 :
321 : eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
322 :
323 : /// Load or creation time information about the disk_consistent_lsn and when the loading
324 : /// happened. Used for consumption metrics.
325 : pub(crate) loaded_at: (Lsn, SystemTime),
326 :
327 : /// Gate to prevent shutdown completing while I/O is still happening to this timeline's data
328 : pub(crate) gate: Gate,
329 :
330 : /// Cancellation token scoped to this timeline: anything doing long-running work relating
331 : /// to the timeline should drop out when this token fires.
332 : pub(crate) cancel: CancellationToken,
333 :
334 : /// Make sure we only have one running compaction at a time in tests.
335 : ///
336 : /// Must only be taken in two places:
337 : /// - [`Timeline::compact`] (this file)
338 : /// - [`delete::delete_local_layer_files`]
339 : ///
340 : /// Timeline deletion will acquire both compaction and gc locks in whatever order.
341 : compaction_lock: tokio::sync::Mutex<()>,
342 :
343 : /// Make sure we only have one running gc at a time.
344 : ///
345 : /// Must only be taken in two places:
346 : /// - [`Timeline::gc`] (this file)
347 : /// - [`delete::delete_local_layer_files`]
348 : ///
349 : /// Timeline deletion will acquire both compaction and gc locks in whatever order.
350 : gc_lock: tokio::sync::Mutex<()>,
351 : }
352 :
353 : pub struct WalReceiverInfo {
354 : pub wal_source_connconf: PgConnectionConfig,
355 : pub last_received_msg_lsn: Lsn,
356 : pub last_received_msg_ts: u128,
357 : }
358 :
359 : ///
360 : /// Information about how much history needs to be retained, needed by
361 : /// Garbage Collection.
362 : ///
363 : pub struct GcInfo {
364 : /// Specific LSNs that are needed.
365 : ///
366 : /// Currently, this includes all points where child branches have
367 : /// been forked off from. In the future, could also include
368 : /// explicit user-defined snapshot points.
369 : pub retain_lsns: Vec<Lsn>,
370 :
371 : /// In addition to 'retain_lsns', keep everything newer than this
372 : /// point.
373 : ///
374 : /// This is calculated by subtracting 'gc_horizon' setting from
375 : /// last-record LSN
376 : ///
377 : /// FIXME: is this inclusive or exclusive?
378 : pub horizon_cutoff: Lsn,
379 :
380 : /// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this
381 : /// point.
382 : ///
383 : /// This is calculated by finding a number such that a record is needed for PITR
384 : /// if only if its LSN is larger than 'pitr_cutoff'.
385 : pub pitr_cutoff: Lsn,
386 : }
387 :
388 : /// An error happened in a get() operation.
389 70 : #[derive(thiserror::Error, Debug)]
390 : pub(crate) enum PageReconstructError {
391 : #[error(transparent)]
392 : Other(#[from] anyhow::Error),
393 :
394 : #[error("Ancestor LSN wait error: {0}")]
395 : AncestorLsnTimeout(#[from] WaitLsnError),
396 :
397 : #[error("timeline shutting down")]
398 : Cancelled,
399 :
400 : /// The ancestor of this is being stopped
401 : #[error("ancestor timeline {0} is being stopped")]
402 : AncestorStopping(TimelineId),
403 :
404 : /// An error happened replaying WAL records
405 : #[error(transparent)]
406 : WalRedo(anyhow::Error),
407 : }
408 :
409 : impl PageReconstructError {
410 : /// Returns true if this error indicates a tenant/timeline shutdown alike situation
411 0 : pub(crate) fn is_stopping(&self) -> bool {
412 0 : use PageReconstructError::*;
413 0 : match self {
414 0 : Other(_) => false,
415 0 : AncestorLsnTimeout(_) => false,
416 0 : Cancelled | AncestorStopping(_) => true,
417 0 : WalRedo(_) => false,
418 : }
419 0 : }
420 : }
421 :
422 0 : #[derive(thiserror::Error, Debug)]
423 : enum CreateImageLayersError {
424 : #[error("timeline shutting down")]
425 : Cancelled,
426 :
427 : #[error(transparent)]
428 : GetVectoredError(GetVectoredError),
429 :
430 : #[error(transparent)]
431 : PageReconstructError(PageReconstructError),
432 :
433 : #[error(transparent)]
434 : Other(#[from] anyhow::Error),
435 : }
436 :
437 0 : #[derive(thiserror::Error, Debug)]
438 : enum FlushLayerError {
439 : /// Timeline cancellation token was cancelled
440 : #[error("timeline shutting down")]
441 : Cancelled,
442 :
443 : #[error(transparent)]
444 : CreateImageLayersError(CreateImageLayersError),
445 :
446 : #[error(transparent)]
447 : Other(#[from] anyhow::Error),
448 : }
449 :
450 0 : #[derive(thiserror::Error, Debug)]
451 : pub(crate) enum GetVectoredError {
452 : #[error("timeline shutting down")]
453 : Cancelled,
454 :
455 : #[error("Requested too many keys: {0} > {}", Timeline::MAX_GET_VECTORED_KEYS)]
456 : Oversized(u64),
457 :
458 : #[error("Requested at invalid LSN: {0}")]
459 : InvalidLsn(Lsn),
460 : }
461 :
462 0 : #[derive(thiserror::Error, Debug)]
463 : pub(crate) enum GetReadyAncestorError {
464 : #[error("ancestor timeline {0} is being stopped")]
465 : AncestorStopping(TimelineId),
466 :
467 : #[error("Ancestor LSN wait error: {0}")]
468 : AncestorLsnTimeout(#[from] WaitLsnError),
469 :
470 : #[error("Cancelled")]
471 : Cancelled,
472 :
473 : #[error(transparent)]
474 : Other(#[from] anyhow::Error),
475 : }
476 :
477 0 : #[derive(Clone, Copy)]
478 : pub enum LogicalSizeCalculationCause {
479 : Initial,
480 : ConsumptionMetricsSyntheticSize,
481 : EvictionTaskImitation,
482 : TenantSizeHandler,
483 : }
484 :
485 : pub enum GetLogicalSizePriority {
486 : User,
487 : Background,
488 : }
489 :
490 727 : #[derive(enumset::EnumSetType)]
491 : pub(crate) enum CompactFlags {
492 : ForceRepartition,
493 : }
494 :
495 : impl std::fmt::Debug for Timeline {
496 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
497 0 : write!(f, "Timeline<{}>", self.timeline_id)
498 0 : }
499 : }
500 :
501 48 : #[derive(thiserror::Error, Debug)]
502 : pub(crate) enum WaitLsnError {
503 : // Called on a timeline which is shutting down
504 : #[error("Shutdown")]
505 : Shutdown,
506 :
507 : // Called on an timeline not in active state or shutting down
508 : #[error("Bad state (not active)")]
509 : BadState,
510 :
511 : // Timeout expired while waiting for LSN to catch up with goal.
512 : #[error("{0}")]
513 : Timeout(String),
514 : }
515 :
516 : // The impls below achieve cancellation mapping for errors.
517 : // Perhaps there's a way of achieving this with less cruft.
518 :
519 : impl From<CreateImageLayersError> for CompactionError {
520 0 : fn from(e: CreateImageLayersError) -> Self {
521 0 : match e {
522 0 : CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
523 0 : _ => CompactionError::Other(e.into()),
524 : }
525 0 : }
526 : }
527 :
528 : impl From<CreateImageLayersError> for FlushLayerError {
529 0 : fn from(e: CreateImageLayersError) -> Self {
530 0 : match e {
531 0 : CreateImageLayersError::Cancelled => FlushLayerError::Cancelled,
532 0 : any => FlushLayerError::CreateImageLayersError(any),
533 : }
534 0 : }
535 : }
536 :
537 : impl From<PageReconstructError> for CreateImageLayersError {
538 0 : fn from(e: PageReconstructError) -> Self {
539 0 : match e {
540 0 : PageReconstructError::Cancelled => CreateImageLayersError::Cancelled,
541 0 : _ => CreateImageLayersError::PageReconstructError(e),
542 : }
543 0 : }
544 : }
545 :
546 : impl From<GetVectoredError> for CreateImageLayersError {
547 0 : fn from(e: GetVectoredError) -> Self {
548 0 : match e {
549 0 : GetVectoredError::Cancelled => CreateImageLayersError::Cancelled,
550 0 : _ => CreateImageLayersError::GetVectoredError(e),
551 : }
552 0 : }
553 : }
554 :
555 : impl From<GetReadyAncestorError> for PageReconstructError {
556 4 : fn from(e: GetReadyAncestorError) -> Self {
557 4 : use GetReadyAncestorError::*;
558 4 : match e {
559 2 : AncestorStopping(tid) => PageReconstructError::AncestorStopping(tid),
560 0 : AncestorLsnTimeout(wait_err) => PageReconstructError::AncestorLsnTimeout(wait_err),
561 0 : Cancelled => PageReconstructError::Cancelled,
562 2 : Other(other) => PageReconstructError::Other(other),
563 : }
564 4 : }
565 : }
566 :
567 : /// Public interface functions
568 : impl Timeline {
569 : /// Get the LSN where this branch was created
570 3659 : pub(crate) fn get_ancestor_lsn(&self) -> Lsn {
571 3659 : self.ancestor_lsn
572 3659 : }
573 :
574 : /// Get the ancestor's timeline id
575 4999 : pub(crate) fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
576 4999 : self.ancestor_timeline
577 4999 : .as_ref()
578 4999 : .map(|ancestor| ancestor.timeline_id)
579 4999 : }
580 :
581 : /// Lock and get timeline's GC cutoff
582 5623582 : pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
583 5623582 : self.latest_gc_cutoff_lsn.read()
584 5623582 : }
585 :
586 : /// Look up given page version.
587 : ///
588 : /// If a remote layer file is needed, it is downloaded as part of this
589 : /// call.
590 : ///
591 : /// NOTE: It is considered an error to 'get' a key that doesn't exist. The
592 : /// abstraction above this needs to store suitable metadata to track what
593 : /// data exists with what keys, in separate metadata entries. If a
594 : /// non-existent key is requested, we may incorrectly return a value from
595 : /// an ancestor branch, for example, or waste a lot of cycles chasing the
596 : /// non-existing key.
597 : ///
598 : /// # Cancel-Safety
599 : ///
600 : /// This method is cancellation-safe.
601 8625195 : pub(crate) async fn get(
602 8625195 : &self,
603 8625195 : key: Key,
604 8625195 : lsn: Lsn,
605 8625195 : ctx: &RequestContext,
606 8625197 : ) -> Result<Bytes, PageReconstructError> {
607 8625197 : if !lsn.is_valid() {
608 1 : return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
609 8625196 : }
610 :
611 : // This check is debug-only because of the cost of hashing, and because it's a double-check: we
612 : // already checked the key against the shard_identity when looking up the Timeline from
613 : // page_service.
614 8625196 : debug_assert!(!self.shard_identity.is_key_disposable(&key));
615 :
616 : // XXX: structured stats collection for layer eviction here.
617 0 : trace!(
618 0 : "get page request for {}@{} from task kind {:?}",
619 0 : key,
620 0 : lsn,
621 0 : ctx.task_kind()
622 0 : );
623 :
624 : // Check the page cache. We will get back the most recent page with lsn <= `lsn`.
625 : // The cached image can be returned directly if there is no WAL between the cached image
626 : // and requested LSN. The cached image can also be used to reduce the amount of WAL needed
627 : // for redo.
628 8625196 : let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
629 1776743 : Some((cached_lsn, cached_img)) => {
630 1776743 : match cached_lsn.cmp(&lsn) {
631 1776726 : Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
632 : Ordering::Equal => {
633 17 : MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
634 17 : return Ok(cached_img); // exact LSN match, return the image
635 : }
636 : Ordering::Greater => {
637 0 : unreachable!("the returned lsn should never be after the requested lsn")
638 : }
639 : }
640 1776726 : Some((cached_lsn, cached_img))
641 : }
642 6848452 : None => None,
643 : };
644 :
645 8625178 : let mut reconstruct_state = ValueReconstructState {
646 8625178 : records: Vec::new(),
647 8625178 : img: cached_page_img,
648 8625178 : };
649 8625178 :
650 8625178 : let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
651 8625178 : let path = self
652 8625178 : .get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
653 2485493 : .await?;
654 8623839 : timer.stop_and_record();
655 8623839 :
656 8623839 : let start = Instant::now();
657 8623839 : let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
658 8623839 : let elapsed = start.elapsed();
659 8623839 : crate::metrics::RECONSTRUCT_TIME
660 8623839 : .for_result(&res)
661 8623839 : .observe(elapsed.as_secs_f64());
662 8623839 :
663 8623839 : if cfg!(feature = "testing") && res.is_err() {
664 : // it can only be walredo issue
665 : use std::fmt::Write;
666 :
667 0 : let mut msg = String::new();
668 0 :
669 0 : path.into_iter().for_each(|(res, cont_lsn, layer)| {
670 0 : writeln!(
671 0 : msg,
672 0 : "- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
673 0 : layer(),
674 0 : )
675 0 : .expect("string grows")
676 0 : });
677 :
678 : // this is to rule out or provide evidence that we could in some cases read a duplicate
679 : // walrecord
680 0 : tracing::info!("walredo failed, path:\n{msg}");
681 8623839 : }
682 :
683 8623839 : res
684 8625185 : }
685 :
686 : pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
687 :
688 : /// Look up multiple page versions at a given LSN
689 : ///
690 : /// This naive implementation will be replaced with a more efficient one
691 : /// which actually vectorizes the read path.
692 72383 : pub(crate) async fn get_vectored(
693 72383 : &self,
694 72383 : key_ranges: &[Range<Key>],
695 72383 : lsn: Lsn,
696 72383 : ctx: &RequestContext,
697 72383 : ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
698 72383 : if !lsn.is_valid() {
699 0 : return Err(GetVectoredError::InvalidLsn(lsn));
700 72383 : }
701 72383 :
702 72383 : let key_count = key_ranges
703 72383 : .iter()
704 73586 : .map(|range| key_range_size(range) as u64)
705 72383 : .sum();
706 72383 : if key_count > Timeline::MAX_GET_VECTORED_KEYS {
707 0 : return Err(GetVectoredError::Oversized(key_count));
708 72383 : }
709 72383 :
710 72383 : let _timer = crate::metrics::GET_VECTORED_LATENCY
711 72383 : .for_task_kind(ctx.task_kind())
712 72383 : .map(|t| t.start_timer());
713 72383 :
714 72383 : let mut values = BTreeMap::new();
715 145969 : for range in key_ranges {
716 73586 : let mut key = range.start;
717 305982 : while key != range.end {
718 232396 : assert!(!self.shard_identity.is_key_disposable(&key));
719 :
720 232396 : let block = self.get(key, lsn, ctx).await;
721 :
722 : if matches!(
723 0 : block,
724 : Err(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_))
725 : ) {
726 0 : return Err(GetVectoredError::Cancelled);
727 232396 : }
728 232396 :
729 232396 : values.insert(key, block);
730 232396 : key = key.next();
731 : }
732 : }
733 :
734 72383 : Ok(values)
735 72383 : }
736 :
737 : /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
738 10862121 : pub(crate) fn get_last_record_lsn(&self) -> Lsn {
739 10862121 : self.last_record_lsn.load().last
740 10862121 : }
741 :
742 3042 : pub(crate) fn get_prev_record_lsn(&self) -> Lsn {
743 3042 : self.last_record_lsn.load().prev
744 3042 : }
745 :
746 : /// Atomically get both last and prev.
747 1084 : pub(crate) fn get_last_record_rlsn(&self) -> RecordLsn {
748 1084 : self.last_record_lsn.load()
749 1084 : }
750 :
751 800801 : pub(crate) fn get_disk_consistent_lsn(&self) -> Lsn {
752 800801 : self.disk_consistent_lsn.load()
753 800801 : }
754 :
755 : /// remote_consistent_lsn from the perspective of the tenant's current generation,
756 : /// not validated with control plane yet.
757 : /// See [`Self::get_remote_consistent_lsn_visible`].
758 3042 : pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
759 3042 : if let Some(remote_client) = &self.remote_client {
760 3042 : remote_client.remote_consistent_lsn_projected()
761 : } else {
762 0 : None
763 : }
764 3042 : }
765 :
766 : /// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
767 : /// i.e. a value of remote_consistent_lsn_projected which has undergone
768 : /// generation validation in the deletion queue.
769 798512 : pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
770 798512 : if let Some(remote_client) = &self.remote_client {
771 798512 : remote_client.remote_consistent_lsn_visible()
772 : } else {
773 0 : None
774 : }
775 798512 : }
776 :
777 : /// The sum of the file size of all historic layers in the layer map.
778 : /// This method makes no distinction between local and remote layers.
779 : /// Hence, the result **does not represent local filesystem usage**.
780 3513 : pub(crate) async fn layer_size_sum(&self) -> u64 {
781 3513 : let guard = self.layers.read().await;
782 3513 : let layer_map = guard.layer_map();
783 3513 : let mut size = 0;
784 336931 : for l in layer_map.iter_historic_layers() {
785 336931 : size += l.file_size();
786 336931 : }
787 3513 : size
788 3513 : }
789 :
790 15 : pub(crate) fn resident_physical_size(&self) -> u64 {
791 15 : self.metrics.resident_physical_size_get()
792 15 : }
793 :
794 : ///
795 : /// Wait until WAL has been received and processed up to this LSN.
796 : ///
797 : /// You should call this before any of the other get_* or list_* functions. Calling
798 : /// those functions with an LSN that has been processed yet is an error.
799 : ///
800 1358246 : pub(crate) async fn wait_lsn(
801 1358246 : &self,
802 1358246 : lsn: Lsn,
803 1358246 : _ctx: &RequestContext, /* Prepare for use by cancellation */
804 1358246 : ) -> Result<(), WaitLsnError> {
805 1358246 : if self.cancel.is_cancelled() {
806 0 : return Err(WaitLsnError::Shutdown);
807 1358246 : } else if !self.is_active() {
808 0 : return Err(WaitLsnError::BadState);
809 1358246 : }
810 :
811 : // This should never be called from the WAL receiver, because that could lead
812 : // to a deadlock.
813 : debug_assert!(
814 1358246 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
815 0 : "wait_lsn cannot be called in WAL receiver"
816 : );
817 : debug_assert!(
818 1358246 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
819 0 : "wait_lsn cannot be called in WAL receiver"
820 : );
821 : debug_assert!(
822 1358246 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
823 0 : "wait_lsn cannot be called in WAL receiver"
824 : );
825 :
826 1358246 : let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
827 1358246 :
828 1358246 : match self
829 1358246 : .last_record_lsn
830 1358246 : .wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
831 112555 : .await
832 : {
833 1358221 : Ok(()) => Ok(()),
834 24 : Err(e) => {
835 24 : use utils::seqwait::SeqWaitError::*;
836 24 : match e {
837 0 : Shutdown => Err(WaitLsnError::Shutdown),
838 : Timeout => {
839 : // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
840 24 : drop(_timer);
841 24 : let walreceiver_status = self.walreceiver_status();
842 24 : Err(WaitLsnError::Timeout(format!(
843 24 : "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
844 24 : lsn,
845 24 : self.get_last_record_lsn(),
846 24 : self.get_disk_consistent_lsn(),
847 24 : walreceiver_status,
848 24 : )))
849 : }
850 : }
851 : }
852 : }
853 1358245 : }
854 :
855 3066 : pub(crate) fn walreceiver_status(&self) -> String {
856 3066 : match &*self.walreceiver.lock().unwrap() {
857 113 : None => "stopping or stopped".to_string(),
858 2953 : Some(walreceiver) => match walreceiver.status() {
859 1794 : Some(status) => status.to_human_readable_string(),
860 1159 : None => "Not active".to_string(),
861 : },
862 : }
863 3066 : }
864 :
865 : /// Check that it is valid to request operations with that lsn.
866 672 : pub(crate) fn check_lsn_is_in_scope(
867 672 : &self,
868 672 : lsn: Lsn,
869 672 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
870 672 : ) -> anyhow::Result<()> {
871 672 : ensure!(
872 672 : lsn >= **latest_gc_cutoff_lsn,
873 16 : "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
874 16 : lsn,
875 16 : **latest_gc_cutoff_lsn,
876 : );
877 656 : Ok(())
878 672 : }
879 :
880 : /// Flush to disk all data that was written with the put_* functions
881 1801 : #[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
882 : pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
883 : self.freeze_inmem_layer(false).await;
884 : self.flush_frozen_layers_and_wait().await
885 : }
886 :
887 : /// Outermost timeline compaction operation; downloads needed layers.
888 1523 : pub(crate) async fn compact(
889 1523 : self: &Arc<Self>,
890 1523 : cancel: &CancellationToken,
891 1523 : flags: EnumSet<CompactFlags>,
892 1523 : ctx: &RequestContext,
893 1523 : ) -> Result<(), CompactionError> {
894 1523 : // most likely the cancellation token is from background task, but in tests it could be the
895 1523 : // request task as well.
896 1523 :
897 1523 : let prepare = async move {
898 1523 : let guard = self.compaction_lock.lock().await;
899 :
900 1523 : let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
901 1523 : BackgroundLoopKind::Compaction,
902 1523 : ctx,
903 1523 : )
904 0 : .await;
905 :
906 1523 : (guard, permit)
907 1523 : };
908 :
909 : // this wait probably never needs any "long time spent" logging, because we already nag if
910 : // compaction task goes over it's period (20s) which is quite often in production.
911 1523 : let (_guard, _permit) = tokio::select! {
912 1523 : tuple = prepare => { tuple },
913 : _ = self.cancel.cancelled() => return Ok(()),
914 : _ = cancel.cancelled() => return Ok(()),
915 : };
916 :
917 1523 : let last_record_lsn = self.get_last_record_lsn();
918 1523 :
919 1523 : // Last record Lsn could be zero in case the timeline was just created
920 1523 : if !last_record_lsn.is_valid() {
921 0 : warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
922 0 : return Ok(());
923 1523 : }
924 1523 :
925 1523 : // High level strategy for compaction / image creation:
926 1523 : //
927 1523 : // 1. First, calculate the desired "partitioning" of the
928 1523 : // currently in-use key space. The goal is to partition the
929 1523 : // key space into roughly fixed-size chunks, but also take into
930 1523 : // account any existing image layers, and try to align the
931 1523 : // chunk boundaries with the existing image layers to avoid
932 1523 : // too much churn. Also try to align chunk boundaries with
933 1523 : // relation boundaries. In principle, we don't know about
934 1523 : // relation boundaries here, we just deal with key-value
935 1523 : // pairs, and the code in pgdatadir_mapping.rs knows how to
936 1523 : // map relations into key-value pairs. But in practice we know
937 1523 : // that 'field6' is the block number, and the fields 1-5
938 1523 : // identify a relation. This is just an optimization,
939 1523 : // though.
940 1523 : //
941 1523 : // 2. Once we know the partitioning, for each partition,
942 1523 : // decide if it's time to create a new image layer. The
943 1523 : // criteria is: there has been too much "churn" since the last
944 1523 : // image layer? The "churn" is fuzzy concept, it's a
945 1523 : // combination of too many delta files, or too much WAL in
946 1523 : // total in the delta file. Or perhaps: if creating an image
947 1523 : // file would allow to delete some older files.
948 1523 : //
949 1523 : // 3. After that, we compact all level0 delta files if there
950 1523 : // are too many of them. While compacting, we also garbage
951 1523 : // collect any page versions that are no longer needed because
952 1523 : // of the new image layers we created in step 2.
953 1523 : //
954 1523 : // TODO: This high level strategy hasn't been implemented yet.
955 1523 : // Below are functions compact_level0() and create_image_layers()
956 1523 : // but they are a bit ad hoc and don't quite work like it's explained
957 1523 : // above. Rewrite it.
958 1523 :
959 1523 : // Is the timeline being deleted?
960 1523 : if self.is_stopping() {
961 0 : trace!("Dropping out of compaction on timeline shutdown");
962 0 : return Err(CompactionError::ShuttingDown);
963 1523 : }
964 1523 :
965 1523 : let target_file_size = self.get_checkpoint_distance();
966 1523 :
967 1523 : // Define partitioning schema if needed
968 1523 :
969 1523 : // FIXME: the match should only cover repartitioning, not the next steps
970 1523 : match self
971 1523 : .repartition(
972 1523 : self.get_last_record_lsn(),
973 1523 : self.get_compaction_target_size(),
974 1523 : flags,
975 1523 : ctx,
976 1523 : )
977 213034 : .await
978 : {
979 1519 : Ok((partitioning, lsn)) => {
980 1519 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
981 1519 : let image_ctx = RequestContextBuilder::extend(ctx)
982 1519 : .access_stats_behavior(AccessStatsBehavior::Skip)
983 1519 : .build();
984 1519 :
985 1519 : // 2. Compact
986 1519 : let timer = self.metrics.compact_time_histo.start_timer();
987 331578 : self.compact_level0(target_file_size, ctx).await?;
988 1518 : timer.stop_and_record();
989 :
990 : // 3. Create new image layers for partitions that have been modified
991 : // "enough".
992 1518 : let layers = self
993 1518 : .create_image_layers(&partitioning, lsn, false, &image_ctx)
994 60779 : .await
995 1517 : .map_err(anyhow::Error::from)?;
996 1517 : if let Some(remote_client) = &self.remote_client {
997 7466 : for layer in layers {
998 5949 : remote_client.schedule_layer_file_upload(layer)?;
999 : }
1000 0 : }
1001 :
1002 1517 : if let Some(remote_client) = &self.remote_client {
1003 : // should any new image layer been created, not uploading index_part will
1004 : // result in a mismatch between remote_physical_size and layermap calculated
1005 : // size, which will fail some tests, but should not be an issue otherwise.
1006 1517 : remote_client.schedule_index_upload_for_file_changes()?;
1007 0 : }
1008 : }
1009 3 : Err(err) => {
1010 3 : // no partitioning? This is normal, if the timeline was just created
1011 3 : // as an empty timeline. Also in unit tests, when we use the timeline
1012 3 : // as a simple key-value store, ignoring the datadir layout. Log the
1013 3 : // error but continue.
1014 3 : //
1015 3 : // Suppress error when it's due to cancellation
1016 3 : if !self.cancel.is_cancelled() {
1017 1 : error!("could not compact, repartitioning keyspace failed: {err:?}");
1018 2 : }
1019 : }
1020 : };
1021 :
1022 1520 : Ok(())
1023 1520 : }
1024 :
1025 : /// Mutate the timeline with a [`TimelineWriter`].
1026 3355115 : pub(crate) async fn writer(&self) -> TimelineWriter<'_> {
1027 3355115 : TimelineWriter {
1028 3355115 : tl: self,
1029 3355115 : _write_guard: self.write_lock.lock().await,
1030 : }
1031 3355115 : }
1032 :
1033 : /// Check if more than 'checkpoint_distance' of WAL has been accumulated in
1034 : /// the in-memory layer, and initiate flushing it if so.
1035 : ///
1036 : /// Also flush after a period of time without new data -- it helps
1037 : /// safekeepers to regard pageserver as caught up and suspend activity.
1038 1401451 : pub(crate) async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
1039 1401451 : let last_lsn = self.get_last_record_lsn();
1040 1400622 : let open_layer_size = {
1041 1401451 : let guard = self.layers.read().await;
1042 1401451 : let layers = guard.layer_map();
1043 1401451 : let Some(open_layer) = layers.open_layer.as_ref() else {
1044 829 : return Ok(());
1045 : };
1046 1400622 : open_layer.size().await?
1047 : };
1048 1400622 : let last_freeze_at = self.last_freeze_at.load();
1049 1400622 : let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
1050 1400622 : let distance = last_lsn.widening_sub(last_freeze_at);
1051 1400622 : // Checkpointing the open layer can be triggered by layer size or LSN range.
1052 1400622 : // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
1053 1400622 : // we want to stay below that with a big margin. The LSN distance determines how
1054 1400622 : // much WAL the safekeepers need to store.
1055 1400622 : if distance >= self.get_checkpoint_distance().into()
1056 1399838 : || open_layer_size > self.get_checkpoint_distance()
1057 1396934 : || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
1058 : {
1059 3688 : info!(
1060 3688 : "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
1061 3688 : distance,
1062 3688 : open_layer_size,
1063 3688 : last_freeze_ts.elapsed()
1064 3688 : );
1065 :
1066 3688 : self.freeze_inmem_layer(true).await;
1067 3688 : self.last_freeze_at.store(last_lsn);
1068 3688 : *(self.last_freeze_ts.write().unwrap()) = Instant::now();
1069 3688 :
1070 3688 : // Wake up the layer flusher
1071 3688 : self.flush_frozen_layers();
1072 1396934 : }
1073 1400622 : Ok(())
1074 1401451 : }
1075 :
1076 1235 : pub(crate) fn activate(
1077 1235 : self: &Arc<Self>,
1078 1235 : broker_client: BrokerClientChannel,
1079 1235 : background_jobs_can_start: Option<&completion::Barrier>,
1080 1235 : ctx: &RequestContext,
1081 1235 : ) {
1082 1235 : if self.tenant_shard_id.is_zero() {
1083 1205 : // Logical size is only maintained accurately on shard zero.
1084 1205 : self.spawn_initial_logical_size_computation_task(ctx);
1085 1205 : }
1086 1235 : self.launch_wal_receiver(ctx, broker_client);
1087 1235 : self.set_state(TimelineState::Active);
1088 1235 : self.launch_eviction_task(background_jobs_can_start);
1089 1235 : }
1090 :
1091 : /// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
1092 : /// also to remote storage. This method can easily take multiple seconds for a busy timeline.
1093 : ///
1094 : /// While we are flushing, we continue to accept read I/O.
1095 249 : pub(crate) async fn flush_and_shutdown(&self) {
1096 249 : debug_assert_current_span_has_tenant_and_timeline_id();
1097 :
1098 : // Stop ingesting data, so that we are not still writing to an InMemoryLayer while
1099 : // trying to flush
1100 0 : tracing::debug!("Waiting for WalReceiverManager...");
1101 249 : task_mgr::shutdown_tasks(
1102 249 : Some(TaskKind::WalReceiverManager),
1103 249 : Some(self.tenant_shard_id),
1104 249 : Some(self.timeline_id),
1105 249 : )
1106 133 : .await;
1107 :
1108 : // Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
1109 249 : self.last_record_lsn.shutdown();
1110 249 :
1111 249 : // now all writers to InMemory layer are gone, do the final flush if requested
1112 249 : match self.freeze_and_flush().await {
1113 : Ok(_) => {
1114 : // drain the upload queue
1115 205 : if let Some(client) = self.remote_client.as_ref() {
1116 : // if we did not wait for completion here, it might be our shutdown process
1117 : // didn't wait for remote uploads to complete at all, as new tasks can forever
1118 : // be spawned.
1119 : //
1120 : // what is problematic is the shutting down of RemoteTimelineClient, because
1121 : // obviously it does not make sense to stop while we wait for it, but what
1122 : // about corner cases like s3 suddenly hanging up?
1123 205 : if let Err(e) = client.shutdown().await {
1124 : // Non-fatal. Shutdown is infallible. Failures to flush just mean that
1125 : // we have some extra WAL replay to do next time the timeline starts.
1126 0 : warn!("failed to flush to remote storage: {e:#}");
1127 205 : }
1128 0 : }
1129 : }
1130 44 : Err(e) => {
1131 : // Non-fatal. Shutdown is infallible. Failures to flush just mean that
1132 : // we have some extra WAL replay to do next time the timeline starts.
1133 44 : warn!("failed to freeze and flush: {e:#}");
1134 : }
1135 : }
1136 :
1137 422 : self.shutdown().await;
1138 249 : }
1139 :
1140 : /// Shut down immediately, without waiting for any open layers to flush to disk. This is a subset of
1141 : /// the graceful [`Timeline::flush_and_shutdown`] function.
1142 577 : pub(crate) async fn shutdown(&self) {
1143 577 : debug_assert_current_span_has_tenant_and_timeline_id();
1144 :
1145 : // Signal any subscribers to our cancellation token to drop out
1146 0 : tracing::debug!("Cancelling CancellationToken");
1147 577 : self.cancel.cancel();
1148 577 :
1149 577 : // Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
1150 577 : // while doing so.
1151 577 : self.last_record_lsn.shutdown();
1152 577 :
1153 577 : // Shut down the layer flush task before the remote client, as one depends on the other
1154 577 : task_mgr::shutdown_tasks(
1155 577 : Some(TaskKind::LayerFlushTask),
1156 577 : Some(self.tenant_shard_id),
1157 577 : Some(self.timeline_id),
1158 577 : )
1159 491 : .await;
1160 :
1161 : // Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
1162 : // case our caller wants to use that for a deletion
1163 577 : if let Some(remote_client) = self.remote_client.as_ref() {
1164 577 : match remote_client.stop() {
1165 577 : Ok(()) => {}
1166 0 : Err(StopError::QueueUninitialized) => {
1167 0 : // Shutting down during initialization is legal
1168 0 : }
1169 : }
1170 0 : }
1171 :
1172 0 : tracing::debug!("Waiting for tasks...");
1173 :
1174 701 : task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;
1175 :
1176 : // Finally wait until any gate-holders are complete
1177 577 : self.gate.close().await;
1178 577 : }
1179 :
1180 2243 : pub(crate) fn set_state(&self, new_state: TimelineState) {
1181 2243 : match (self.current_state(), new_state) {
1182 2243 : (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
1183 126 : info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
1184 : }
1185 0 : (st, TimelineState::Loading) => {
1186 0 : error!("ignoring transition from {st:?} into Loading state");
1187 : }
1188 7 : (TimelineState::Broken { .. }, new_state) => {
1189 7 : error!("Ignoring state update {new_state:?} for broken timeline");
1190 : }
1191 : (TimelineState::Stopping, TimelineState::Active) => {
1192 0 : error!("Not activating a Stopping timeline");
1193 : }
1194 2110 : (_, new_state) => {
1195 2110 : self.state.send_replace(new_state);
1196 2110 : }
1197 : }
1198 2243 : }
1199 :
1200 19 : pub(crate) fn set_broken(&self, reason: String) {
1201 19 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
1202 19 : let broken_state = TimelineState::Broken {
1203 19 : reason,
1204 19 : backtrace: backtrace_str,
1205 19 : };
1206 19 : self.set_state(broken_state);
1207 19 :
1208 19 : // Although the Broken state is not equivalent to shutdown() (shutdown will be called
1209 19 : // later when this tenant is detach or the process shuts down), firing the cancellation token
1210 19 : // here avoids the need for other tasks to watch for the Broken state explicitly.
1211 19 : self.cancel.cancel();
1212 19 : }
1213 :
1214 2108734 : pub(crate) fn current_state(&self) -> TimelineState {
1215 2108734 : self.state.borrow().clone()
1216 2108734 : }
1217 :
1218 908 : pub(crate) fn is_broken(&self) -> bool {
1219 908 : matches!(&*self.state.borrow(), TimelineState::Broken { .. })
1220 908 : }
1221 :
1222 1373205 : pub(crate) fn is_active(&self) -> bool {
1223 1373205 : self.current_state() == TimelineState::Active
1224 1373205 : }
1225 :
1226 2638 : pub(crate) fn is_stopping(&self) -> bool {
1227 2638 : self.current_state() == TimelineState::Stopping
1228 2638 : }
1229 :
1230 1233 : pub(crate) fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
1231 1233 : self.state.subscribe()
1232 1233 : }
1233 :
1234 1299949 : pub(crate) async fn wait_to_become_active(
1235 1299949 : &self,
1236 1299949 : _ctx: &RequestContext, // Prepare for use by cancellation
1237 1299949 : ) -> Result<(), TimelineState> {
1238 1299949 : let mut receiver = self.state.subscribe();
1239 1299974 : loop {
1240 1299974 : let current_state = receiver.borrow().clone();
1241 1299974 : match current_state {
1242 : TimelineState::Loading => {
1243 25 : receiver
1244 25 : .changed()
1245 25 : .await
1246 25 : .expect("holding a reference to self");
1247 : }
1248 : TimelineState::Active { .. } => {
1249 1299943 : return Ok(());
1250 : }
1251 : TimelineState::Broken { .. } | TimelineState::Stopping => {
1252 : // There's no chance the timeline can transition back into ::Active
1253 6 : return Err(current_state);
1254 : }
1255 : }
1256 : }
1257 1299949 : }
1258 :
1259 96 : pub(crate) async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
1260 96 : let guard = self.layers.read().await;
1261 96 : let layer_map = guard.layer_map();
1262 96 : let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
1263 96 : if let Some(open_layer) = &layer_map.open_layer {
1264 4 : in_memory_layers.push(open_layer.info());
1265 92 : }
1266 96 : for frozen_layer in &layer_map.frozen_layers {
1267 0 : in_memory_layers.push(frozen_layer.info());
1268 0 : }
1269 :
1270 96 : let mut historic_layers = Vec::new();
1271 3014 : for historic_layer in layer_map.iter_historic_layers() {
1272 3014 : let historic_layer = guard.get_from_desc(&historic_layer);
1273 3014 : historic_layers.push(historic_layer.info(reset).await);
1274 : }
1275 :
1276 96 : LayerMapInfo {
1277 96 : in_memory_layers,
1278 96 : historic_layers,
1279 96 : }
1280 96 : }
1281 :
1282 2 : #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
1283 : pub(crate) async fn download_layer(
1284 : &self,
1285 : layer_file_name: &str,
1286 : ) -> anyhow::Result<Option<bool>> {
1287 : let Some(layer) = self.find_layer(layer_file_name).await else {
1288 : return Ok(None);
1289 : };
1290 :
1291 : if self.remote_client.is_none() {
1292 : return Ok(Some(false));
1293 : }
1294 :
1295 : layer.download().await?;
1296 :
1297 : Ok(Some(true))
1298 : }
1299 :
1300 : /// Evict just one layer.
1301 : ///
1302 : /// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`.
1303 2252 : pub(crate) async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
1304 2252 : let _gate = self
1305 2252 : .gate
1306 2252 : .enter()
1307 2252 : .map_err(|_| anyhow::anyhow!("Shutting down"))?;
1308 :
1309 2252 : let Some(local_layer) = self.find_layer(layer_file_name).await else {
1310 0 : return Ok(None);
1311 : };
1312 :
1313 2252 : match local_layer.evict_and_wait().await {
1314 2252 : Ok(()) => Ok(Some(true)),
1315 0 : Err(EvictionError::NotFound) => Ok(Some(false)),
1316 0 : Err(EvictionError::Downloaded) => Ok(Some(false)),
1317 : }
1318 2252 : }
1319 : }
1320 :
1321 : /// Number of times we will compute partition within a checkpoint distance.
1322 : const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
1323 :
1324 : // Private functions
1325 : impl Timeline {
1326 609 : pub(crate) fn get_lazy_slru_download(&self) -> bool {
1327 609 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1328 609 : tenant_conf
1329 609 : .lazy_slru_download
1330 609 : .unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
1331 609 : }
1332 :
1333 2803551 : fn get_checkpoint_distance(&self) -> u64 {
1334 2803551 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1335 2803551 : tenant_conf
1336 2803551 : .checkpoint_distance
1337 2803551 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
1338 2803551 : }
1339 :
1340 1396934 : fn get_checkpoint_timeout(&self) -> Duration {
1341 1396934 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1342 1396934 : tenant_conf
1343 1396934 : .checkpoint_timeout
1344 1396934 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
1345 1396934 : }
1346 :
1347 1603 : fn get_compaction_target_size(&self) -> u64 {
1348 1603 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1349 1603 : tenant_conf
1350 1603 : .compaction_target_size
1351 1603 : .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
1352 1603 : }
1353 :
1354 1519 : fn get_compaction_threshold(&self) -> usize {
1355 1519 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1356 1519 : tenant_conf
1357 1519 : .compaction_threshold
1358 1519 : .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
1359 1519 : }
1360 :
1361 37180 : fn get_image_creation_threshold(&self) -> usize {
1362 37180 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1363 37180 : tenant_conf
1364 37180 : .image_creation_threshold
1365 37180 : .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
1366 37180 : }
1367 :
1368 2588 : fn get_eviction_policy(&self) -> EvictionPolicy {
1369 2588 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1370 2588 : tenant_conf
1371 2588 : .eviction_policy
1372 2588 : .unwrap_or(self.conf.default_tenant_conf.eviction_policy)
1373 2588 : }
1374 :
1375 1621 : fn get_evictions_low_residence_duration_metric_threshold(
1376 1621 : tenant_conf: &TenantConfOpt,
1377 1621 : default_tenant_conf: &TenantConf,
1378 1621 : ) -> Duration {
1379 1621 : tenant_conf
1380 1621 : .evictions_low_residence_duration_metric_threshold
1381 1621 : .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
1382 1621 : }
1383 :
1384 13566 : fn get_gc_feedback(&self) -> bool {
1385 13566 : let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
1386 13566 : tenant_conf
1387 13566 : .gc_feedback
1388 13566 : .unwrap_or(self.conf.default_tenant_conf.gc_feedback)
1389 13566 : }
1390 :
1391 53 : pub(super) fn tenant_conf_updated(&self) {
1392 53 : // NB: Most tenant conf options are read by background loops, so,
1393 53 : // changes will automatically be picked up.
1394 53 :
1395 53 : // The threshold is embedded in the metric. So, we need to update it.
1396 53 : {
1397 53 : let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
1398 53 : &self.tenant_conf.read().unwrap().tenant_conf,
1399 53 : &self.conf.default_tenant_conf,
1400 53 : );
1401 53 :
1402 53 : let tenant_id_str = self.tenant_shard_id.tenant_id.to_string();
1403 53 : let shard_id_str = format!("{}", self.tenant_shard_id.shard_slug());
1404 53 :
1405 53 : let timeline_id_str = self.timeline_id.to_string();
1406 53 : self.metrics
1407 53 : .evictions_with_low_residence_duration
1408 53 : .write()
1409 53 : .unwrap()
1410 53 : .change_threshold(
1411 53 : &tenant_id_str,
1412 53 : &shard_id_str,
1413 53 : &timeline_id_str,
1414 53 : new_threshold,
1415 53 : );
1416 53 : }
1417 53 : }
1418 :
1419 : /// Open a Timeline handle.
1420 : ///
1421 : /// Loads the metadata for the timeline into memory, but not the layer map.
1422 : #[allow(clippy::too_many_arguments)]
1423 1568 : pub(super) fn new(
1424 1568 : conf: &'static PageServerConf,
1425 1568 : tenant_conf: Arc<RwLock<AttachedTenantConf>>,
1426 1568 : metadata: &TimelineMetadata,
1427 1568 : ancestor: Option<Arc<Timeline>>,
1428 1568 : timeline_id: TimelineId,
1429 1568 : tenant_shard_id: TenantShardId,
1430 1568 : generation: Generation,
1431 1568 : shard_identity: ShardIdentity,
1432 1568 : walredo_mgr: Option<Arc<super::WalRedoManager>>,
1433 1568 : resources: TimelineResources,
1434 1568 : pg_version: u32,
1435 1568 : state: TimelineState,
1436 1568 : cancel: CancellationToken,
1437 1568 : ) -> Arc<Self> {
1438 1568 : let disk_consistent_lsn = metadata.disk_consistent_lsn();
1439 1568 : let (state, _) = watch::channel(state);
1440 1568 :
1441 1568 : let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
1442 1568 : let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
1443 1568 :
1444 1568 : let tenant_conf_guard = tenant_conf.read().unwrap();
1445 1568 :
1446 1568 : let evictions_low_residence_duration_metric_threshold =
1447 1568 : Self::get_evictions_low_residence_duration_metric_threshold(
1448 1568 : &tenant_conf_guard.tenant_conf,
1449 1568 : &conf.default_tenant_conf,
1450 1568 : );
1451 1568 : drop(tenant_conf_guard);
1452 1568 :
1453 1568 : Arc::new_cyclic(|myself| {
1454 1568 : let mut result = Timeline {
1455 1568 : conf,
1456 1568 : tenant_conf,
1457 1568 : myself: myself.clone(),
1458 1568 : timeline_id,
1459 1568 : tenant_shard_id,
1460 1568 : generation,
1461 1568 : shard_identity,
1462 1568 : pg_version,
1463 1568 : layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())),
1464 1568 : wanted_image_layers: Mutex::new(None),
1465 1568 :
1466 1568 : walredo_mgr,
1467 1568 : walreceiver: Mutex::new(None),
1468 1568 :
1469 1568 : remote_client: resources.remote_client.map(Arc::new),
1470 1568 :
1471 1568 : // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
1472 1568 : last_record_lsn: SeqWait::new(RecordLsn {
1473 1568 : last: disk_consistent_lsn,
1474 1568 : prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
1475 1568 : }),
1476 1568 : disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
1477 1568 :
1478 1568 : last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
1479 1568 : last_freeze_ts: RwLock::new(Instant::now()),
1480 1568 :
1481 1568 : loaded_at: (disk_consistent_lsn, SystemTime::now()),
1482 1568 :
1483 1568 : ancestor_timeline: ancestor,
1484 1568 : ancestor_lsn: metadata.ancestor_lsn(),
1485 1568 :
1486 1568 : metrics: TimelineMetrics::new(
1487 1568 : &tenant_shard_id,
1488 1568 : &timeline_id,
1489 1568 : crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
1490 1568 : "mtime",
1491 1568 : evictions_low_residence_duration_metric_threshold,
1492 1568 : ),
1493 1568 : ),
1494 1568 :
1495 1568 : query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new(
1496 1568 : &tenant_shard_id,
1497 1568 : &timeline_id,
1498 1568 : ),
1499 1568 :
1500 1568 : flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
1501 1568 :
1502 1568 : layer_flush_start_tx,
1503 1568 : layer_flush_done_tx,
1504 1568 :
1505 1568 : write_lock: tokio::sync::Mutex::new(()),
1506 1568 :
1507 1568 : gc_info: std::sync::RwLock::new(GcInfo {
1508 1568 : retain_lsns: Vec::new(),
1509 1568 : horizon_cutoff: Lsn(0),
1510 1568 : pitr_cutoff: Lsn(0),
1511 1568 : }),
1512 1568 :
1513 1568 : latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
1514 1568 : initdb_lsn: metadata.initdb_lsn(),
1515 1568 :
1516 1568 : current_logical_size: if disk_consistent_lsn.is_valid() {
1517 : // we're creating timeline data with some layer files existing locally,
1518 : // need to recalculate timeline's logical size based on data in the layers.
1519 890 : LogicalSize::deferred_initial(disk_consistent_lsn)
1520 : } else {
1521 : // we're creating timeline data without any layers existing locally,
1522 : // initial logical size is 0.
1523 678 : LogicalSize::empty_initial()
1524 : },
1525 1568 : partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
1526 1568 : repartition_threshold: 0,
1527 1568 :
1528 1568 : last_received_wal: Mutex::new(None),
1529 1568 : rel_size_cache: RwLock::new(HashMap::new()),
1530 1568 :
1531 1568 : download_all_remote_layers_task_info: RwLock::new(None),
1532 1568 :
1533 1568 : state,
1534 1568 :
1535 1568 : eviction_task_timeline_state: tokio::sync::Mutex::new(
1536 1568 : EvictionTaskTimelineState::default(),
1537 1568 : ),
1538 1568 : delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
1539 1568 :
1540 1568 : cancel,
1541 1568 : gate: Gate::default(),
1542 1568 :
1543 1568 : compaction_lock: tokio::sync::Mutex::default(),
1544 1568 : gc_lock: tokio::sync::Mutex::default(),
1545 1568 : standby_horizon: AtomicLsn::new(0),
1546 1568 : };
1547 1568 : result.repartition_threshold =
1548 1568 : result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
1549 1568 : result
1550 1568 : .metrics
1551 1568 : .last_record_gauge
1552 1568 : .set(disk_consistent_lsn.0 as i64);
1553 1568 : result
1554 1568 : })
1555 1568 : }
1556 :
1557 2218 : pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
1558 2218 : let Ok(guard) = self.gate.enter() else {
1559 0 : info!("cannot start flush loop when the timeline gate has already been closed");
1560 0 : return;
1561 : };
1562 2218 : let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
1563 2218 : match *flush_loop_state {
1564 1551 : FlushLoopState::NotStarted => (),
1565 : FlushLoopState::Running { .. } => {
1566 667 : info!(
1567 667 : "skipping attempt to start flush_loop twice {}/{}",
1568 667 : self.tenant_shard_id, self.timeline_id
1569 667 : );
1570 667 : return;
1571 : }
1572 : FlushLoopState::Exited => {
1573 0 : warn!(
1574 0 : "ignoring attempt to restart exited flush_loop {}/{}",
1575 0 : self.tenant_shard_id, self.timeline_id
1576 0 : );
1577 0 : return;
1578 : }
1579 : }
1580 :
1581 1551 : let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
1582 1551 : let self_clone = Arc::clone(self);
1583 1551 :
1584 1551 : debug!("spawning flush loop");
1585 1551 : *flush_loop_state = FlushLoopState::Running {
1586 1551 : #[cfg(test)]
1587 1551 : expect_initdb_optimization: false,
1588 1551 : #[cfg(test)]
1589 1551 : initdb_optimization_count: 0,
1590 1551 : };
1591 1551 : task_mgr::spawn(
1592 1551 : task_mgr::BACKGROUND_RUNTIME.handle(),
1593 1551 : task_mgr::TaskKind::LayerFlushTask,
1594 1551 : Some(self.tenant_shard_id),
1595 1551 : Some(self.timeline_id),
1596 1551 : "layer flush task",
1597 : false,
1598 1551 : async move {
1599 1551 : let _guard = guard;
1600 1551 : let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
1601 27756 : self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
1602 576 : let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
1603 576 : assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
1604 576 : *flush_loop_state = FlushLoopState::Exited;
1605 576 : Ok(())
1606 576 : }
1607 1551 : .instrument(info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
1608 : );
1609 2218 : }
1610 :
1611 : /// Creates and starts the wal receiver.
1612 : ///
1613 : /// This function is expected to be called at most once per Timeline's lifecycle
1614 : /// when the timeline is activated.
1615 1235 : fn launch_wal_receiver(
1616 1235 : self: &Arc<Self>,
1617 1235 : ctx: &RequestContext,
1618 1235 : broker_client: BrokerClientChannel,
1619 1235 : ) {
1620 1235 : info!(
1621 1235 : "launching WAL receiver for timeline {} of tenant {}",
1622 1235 : self.timeline_id, self.tenant_shard_id
1623 1235 : );
1624 :
1625 1235 : let tenant_conf_guard = self.tenant_conf.read().unwrap();
1626 1235 : let wal_connect_timeout = tenant_conf_guard
1627 1235 : .tenant_conf
1628 1235 : .walreceiver_connect_timeout
1629 1235 : .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
1630 1235 : let lagging_wal_timeout = tenant_conf_guard
1631 1235 : .tenant_conf
1632 1235 : .lagging_wal_timeout
1633 1235 : .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
1634 1235 : let max_lsn_wal_lag = tenant_conf_guard
1635 1235 : .tenant_conf
1636 1235 : .max_lsn_wal_lag
1637 1235 : .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
1638 1235 : drop(tenant_conf_guard);
1639 1235 :
1640 1235 : let mut guard = self.walreceiver.lock().unwrap();
1641 1235 : assert!(
1642 1235 : guard.is_none(),
1643 0 : "multiple launches / re-launches of WAL receiver are not supported"
1644 : );
1645 1235 : *guard = Some(WalReceiver::start(
1646 1235 : Arc::clone(self),
1647 1235 : WalReceiverConf {
1648 1235 : wal_connect_timeout,
1649 1235 : lagging_wal_timeout,
1650 1235 : max_lsn_wal_lag,
1651 1235 : auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
1652 1235 : availability_zone: self.conf.availability_zone.clone(),
1653 1235 : ingest_batch_size: self.conf.ingest_batch_size,
1654 1235 : },
1655 1235 : broker_client,
1656 1235 : ctx,
1657 1235 : ));
1658 1235 : }
1659 :
1660 : /// Initialize with an empty layer map. Used when creating a new timeline.
1661 1144 : pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
1662 1144 : let mut layers = self.layers.try_write().expect(
1663 1144 : "in the context where we call this function, no other task has access to the object",
1664 1144 : );
1665 1144 : layers.initialize_empty(Lsn(start_lsn.0));
1666 1144 : }
1667 :
1668 : /// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
1669 : /// files.
1670 412 : pub(super) async fn load_layer_map(
1671 412 : &self,
1672 412 : disk_consistent_lsn: Lsn,
1673 412 : index_part: Option<IndexPart>,
1674 412 : ) -> anyhow::Result<()> {
1675 : use init::{Decision::*, Discovered, DismissedLayer};
1676 : use LayerFileName::*;
1677 :
1678 412 : let mut guard = self.layers.write().await;
1679 :
1680 412 : let timer = self.metrics.load_layer_map_histo.start_timer();
1681 412 :
1682 412 : // Scan timeline directory and create ImageFileName and DeltaFilename
1683 412 : // structs representing all files on disk
1684 412 : let timeline_path = self
1685 412 : .conf
1686 412 : .timeline_path(&self.tenant_shard_id, &self.timeline_id);
1687 412 : let conf = self.conf;
1688 412 : let span = tracing::Span::current();
1689 412 :
1690 412 : // Copy to move into the task we're about to spawn
1691 412 : let generation = self.generation;
1692 412 : let shard = self.get_shard_index();
1693 412 : let this = self.myself.upgrade().expect("&self method holds the arc");
1694 :
1695 412 : let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
1696 412 : move || {
1697 412 : let _g = span.entered();
1698 412 : let discovered = init::scan_timeline_dir(&timeline_path)?;
1699 412 : let mut discovered_layers = Vec::with_capacity(discovered.len());
1700 412 : let mut unrecognized_files = Vec::new();
1701 412 :
1702 412 : let mut path = timeline_path;
1703 :
1704 14223 : for discovered in discovered {
1705 13811 : let (name, kind) = match discovered {
1706 13348 : Discovered::Layer(file_name, file_size) => {
1707 13348 : discovered_layers.push((file_name, file_size));
1708 13348 : continue;
1709 : }
1710 : Discovered::Metadata | Discovered::IgnoredBackup => {
1711 412 : continue;
1712 : }
1713 0 : Discovered::Unknown(file_name) => {
1714 0 : // we will later error if there are any
1715 0 : unrecognized_files.push(file_name);
1716 0 : continue;
1717 : }
1718 47 : Discovered::Ephemeral(name) => (name, "old ephemeral file"),
1719 1 : Discovered::Temporary(name) => (name, "temporary timeline file"),
1720 3 : Discovered::TemporaryDownload(name) => (name, "temporary download"),
1721 : };
1722 51 : path.push(Utf8Path::new(&name));
1723 51 : init::cleanup(&path, kind)?;
1724 51 : path.pop();
1725 : }
1726 :
1727 412 : if !unrecognized_files.is_empty() {
1728 : // assume that if there are any there are many many.
1729 0 : let n = unrecognized_files.len();
1730 0 : let first = &unrecognized_files[..n.min(10)];
1731 0 : anyhow::bail!(
1732 0 : "unrecognized files in timeline dir (total {n}), first 10: {first:?}"
1733 0 : );
1734 412 : }
1735 412 :
1736 412 : let decided = init::reconcile(
1737 412 : discovered_layers,
1738 412 : index_part.as_ref(),
1739 412 : disk_consistent_lsn,
1740 412 : generation,
1741 412 : shard,
1742 412 : );
1743 412 :
1744 412 : let mut loaded_layers = Vec::new();
1745 412 : let mut needs_cleanup = Vec::new();
1746 412 : let mut total_physical_size = 0;
1747 :
1748 57877 : for (name, decision) in decided {
1749 57465 : let decision = match decision {
1750 12212 : Ok(UseRemote { local, remote }) => {
1751 12212 : // Remote is authoritative, but we may still choose to retain
1752 12212 : // the local file if the contents appear to match
1753 12212 : if local.file_size() == remote.file_size() {
1754 : // Use the local file, but take the remote metadata so that we pick up
1755 : // the correct generation.
1756 12211 : UseLocal(remote)
1757 : } else {
1758 1 : path.push(name.file_name());
1759 1 : init::cleanup_local_file_for_remote(&path, &local, &remote)?;
1760 1 : path.pop();
1761 1 : UseRemote { local, remote }
1762 : }
1763 : }
1764 44616 : Ok(decision) => decision,
1765 2 : Err(DismissedLayer::Future { local }) => {
1766 2 : if local.is_some() {
1767 1 : path.push(name.file_name());
1768 1 : init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
1769 1 : path.pop();
1770 1 : }
1771 2 : needs_cleanup.push(name);
1772 2 : continue;
1773 : }
1774 635 : Err(DismissedLayer::LocalOnly(local)) => {
1775 635 : path.push(name.file_name());
1776 635 : init::cleanup_local_only_file(&path, &name, &local)?;
1777 635 : path.pop();
1778 635 : // this file never existed remotely, we will have to do rework
1779 635 : continue;
1780 : }
1781 : };
1782 :
1783 56828 : match &name {
1784 23503 : Delta(d) => assert!(d.lsn_range.end <= disk_consistent_lsn + 1),
1785 33325 : Image(i) => assert!(i.lsn <= disk_consistent_lsn),
1786 : }
1787 :
1788 56828 : tracing::debug!(layer=%name, ?decision, "applied");
1789 :
1790 56828 : let layer = match decision {
1791 12711 : UseLocal(m) => {
1792 12711 : total_physical_size += m.file_size();
1793 12711 : Layer::for_resident(conf, &this, name, m).drop_eviction_guard()
1794 : }
1795 44116 : Evicted(remote) | UseRemote { remote, .. } => {
1796 44117 : Layer::for_evicted(conf, &this, name, remote)
1797 : }
1798 : };
1799 :
1800 56828 : loaded_layers.push(layer);
1801 : }
1802 412 : Ok((loaded_layers, needs_cleanup, total_physical_size))
1803 412 : }
1804 412 : })
1805 408 : .await
1806 412 : .map_err(anyhow::Error::new)
1807 412 : .and_then(|x| x)?;
1808 :
1809 412 : let num_layers = loaded_layers.len();
1810 412 :
1811 412 : guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
1812 :
1813 412 : if let Some(rtc) = self.remote_client.as_ref() {
1814 412 : rtc.schedule_layer_file_deletion(&needs_cleanup)?;
1815 412 : rtc.schedule_index_upload_for_file_changes()?;
1816 : // This barrier orders above DELETEs before any later operations.
1817 : // This is critical because code executing after the barrier might
1818 : // create again objects with the same key that we just scheduled for deletion.
1819 : // For example, if we just scheduled deletion of an image layer "from the future",
1820 : // later compaction might run again and re-create the same image layer.
1821 : // "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
1822 : // "same" here means same key range and LSN.
1823 : //
1824 : // Without a barrier between above DELETEs and the re-creation's PUTs,
1825 : // the upload queue may execute the PUT first, then the DELETE.
1826 : // In our example, we will end up with an IndexPart referencing a non-existent object.
1827 : //
1828 : // 1. a future image layer is created and uploaded
1829 : // 2. ps restart
1830 : // 3. the future layer from (1) is deleted during load layer map
1831 : // 4. image layer is re-created and uploaded
1832 : // 5. deletion queue would like to delete (1) but actually deletes (4)
1833 : // 6. delete by name works as expected, but it now deletes the wrong (later) version
1834 : //
1835 : // See https://github.com/neondatabase/neon/issues/5878
1836 : //
1837 : // NB: generation numbers naturally protect against this because they disambiguate
1838 : // (1) and (4)
1839 412 : rtc.schedule_barrier()?;
1840 : // Tenant::create_timeline will wait for these uploads to happen before returning, or
1841 : // on retry.
1842 0 : }
1843 :
1844 412 : info!(
1845 412 : "loaded layer map with {} layers at {}, total physical size: {}",
1846 412 : num_layers, disk_consistent_lsn, total_physical_size
1847 412 : );
1848 :
1849 412 : timer.stop_and_record();
1850 412 : Ok(())
1851 412 : }
1852 :
1853 : /// Retrieve current logical size of the timeline.
1854 : ///
1855 : /// The size could be lagging behind the actual number, in case
1856 : /// the initial size calculation has not been run (gets triggered on the first size access).
1857 : ///
1858 : /// return size and boolean flag that shows if the size is exact
1859 710214 : pub(crate) fn get_current_logical_size(
1860 710214 : self: &Arc<Self>,
1861 710214 : priority: GetLogicalSizePriority,
1862 710214 : ctx: &RequestContext,
1863 710214 : ) -> logical_size::CurrentLogicalSize {
1864 710214 : if !self.tenant_shard_id.is_zero() {
1865 : // Logical size is only accurately maintained on shard zero: when called elsewhere, for example
1866 : // when HTTP API is serving a GET for timeline zero, return zero
1867 70 : return logical_size::CurrentLogicalSize::Approximate(logical_size::Approximate::zero());
1868 710144 : }
1869 710144 :
1870 710144 : let current_size = self.current_logical_size.current_size();
1871 710144 : debug!("Current size: {current_size:?}");
1872 :
1873 710144 : match (current_size.accuracy(), priority) {
1874 709383 : (logical_size::Accuracy::Exact, _) => (), // nothing to do
1875 156 : (logical_size::Accuracy::Approximate, GetLogicalSizePriority::Background) => {
1876 156 : // background task will eventually deliver an exact value, we're in no rush
1877 156 : }
1878 : (logical_size::Accuracy::Approximate, GetLogicalSizePriority::User) => {
1879 : // background task is not ready, but user is asking for it now;
1880 : // => make the background task skip the line
1881 : // (The alternative would be to calculate the size here, but,
1882 : // it can actually take a long time if the user has a lot of rels.
1883 : // And we'll inevitable need it again; So, let the background task do the work.)
1884 605 : match self
1885 605 : .current_logical_size
1886 605 : .cancel_wait_for_background_loop_concurrency_limit_semaphore
1887 605 : .get()
1888 : {
1889 605 : Some(cancel) => cancel.cancel(),
1890 : None => {
1891 0 : let state = self.current_state();
1892 0 : if matches!(
1893 0 : state,
1894 : TimelineState::Broken { .. } | TimelineState::Stopping
1895 0 : ) {
1896 0 :
1897 0 : // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
1898 0 : // Don't make noise.
1899 0 : } else {
1900 0 : warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work");
1901 : }
1902 : }
1903 : };
1904 : }
1905 : }
1906 :
1907 710144 : if let CurrentLogicalSize::Approximate(_) = ¤t_size {
1908 761 : if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler {
1909 327 : let first = self
1910 327 : .current_logical_size
1911 327 : .did_return_approximate_to_walreceiver
1912 327 : .compare_exchange(
1913 327 : false,
1914 327 : true,
1915 327 : AtomicOrdering::Relaxed,
1916 327 : AtomicOrdering::Relaxed,
1917 327 : )
1918 327 : .is_ok();
1919 327 : if first {
1920 60 : crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc();
1921 267 : }
1922 434 : }
1923 709383 : }
1924 :
1925 710144 : current_size
1926 710214 : }
1927 :
1928 1205 : fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &RequestContext) {
1929 1205 : let Some(initial_part_end) = self.current_logical_size.initial_part_end else {
1930 : // nothing to do for freshly created timelines;
1931 570 : assert_eq!(
1932 570 : self.current_logical_size.current_size().accuracy(),
1933 570 : logical_size::Accuracy::Exact,
1934 570 : );
1935 570 : self.current_logical_size.initialized.add_permits(1);
1936 570 : return;
1937 : };
1938 :
1939 635 : let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new();
1940 635 : let token = cancel_wait_for_background_loop_concurrency_limit_semaphore.clone();
1941 635 : self.current_logical_size
1942 635 : .cancel_wait_for_background_loop_concurrency_limit_semaphore.set(token)
1943 635 : .expect("initial logical size calculation task must be spawned exactly once per Timeline object");
1944 635 :
1945 635 : let self_clone = Arc::clone(self);
1946 635 : let background_ctx = ctx.detached_child(
1947 635 : TaskKind::InitialLogicalSizeCalculation,
1948 635 : DownloadBehavior::Download,
1949 635 : );
1950 635 : task_mgr::spawn(
1951 635 : task_mgr::BACKGROUND_RUNTIME.handle(),
1952 635 : task_mgr::TaskKind::InitialLogicalSizeCalculation,
1953 635 : Some(self.tenant_shard_id),
1954 635 : Some(self.timeline_id),
1955 635 : "initial size calculation",
1956 : false,
1957 : // NB: don't log errors here, task_mgr will do that.
1958 635 : async move {
1959 635 : let cancel = task_mgr::shutdown_token();
1960 635 : self_clone
1961 635 : .initial_logical_size_calculation_task(
1962 635 : initial_part_end,
1963 635 : cancel_wait_for_background_loop_concurrency_limit_semaphore,
1964 635 : cancel,
1965 635 : background_ctx,
1966 635 : )
1967 103140 : .await;
1968 631 : Ok(())
1969 631 : }
1970 635 : .instrument(info_span!(parent: None, "initial_size_calculation", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id)),
1971 : );
1972 1205 : }
1973 :
1974 635 : async fn initial_logical_size_calculation_task(
1975 635 : self: Arc<Self>,
1976 635 : initial_part_end: Lsn,
1977 635 : skip_concurrency_limiter: CancellationToken,
1978 635 : cancel: CancellationToken,
1979 635 : background_ctx: RequestContext,
1980 635 : ) {
1981 631 : scopeguard::defer! {
1982 631 : // Irrespective of the outcome of this operation, we should unblock anyone waiting for it.
1983 631 : self.current_logical_size.initialized.add_permits(1);
1984 631 : }
1985 :
1986 : enum BackgroundCalculationError {
1987 : Cancelled,
1988 : Other(anyhow::Error),
1989 : }
1990 :
1991 635 : let try_once = |attempt: usize| {
1992 635 : let background_ctx = &background_ctx;
1993 635 : let self_ref = &self;
1994 635 : let skip_concurrency_limiter = &skip_concurrency_limiter;
1995 635 : async move {
1996 635 : let cancel = task_mgr::shutdown_token();
1997 635 : let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
1998 635 : BackgroundLoopKind::InitialLogicalSizeCalculation,
1999 635 : background_ctx,
2000 635 : );
2001 :
2002 : use crate::metrics::initial_logical_size::StartCircumstances;
2003 635 : let (_maybe_permit, circumstances) = tokio::select! {
2004 215 : permit = wait_for_permit => {
2005 : (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit)
2006 : }
2007 : _ = self_ref.cancel.cancelled() => {
2008 : return Err(BackgroundCalculationError::Cancelled);
2009 : }
2010 : _ = cancel.cancelled() => {
2011 : return Err(BackgroundCalculationError::Cancelled);
2012 : },
2013 : () = skip_concurrency_limiter.cancelled() => {
2014 : // Some action that is part of a end user interaction requested logical size
2015 : // => break out of the rate limit
2016 : // TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime;
2017 : // but then again what happens if they cancel; also, we should just be using
2018 : // one runtime across the entire process, so, let's leave this for now.
2019 : (None, StartCircumstances::SkippedConcurrencyLimiter)
2020 : }
2021 : };
2022 :
2023 623 : let metrics_guard = if attempt == 1 {
2024 623 : crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances)
2025 : } else {
2026 0 : crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
2027 : };
2028 :
2029 623 : match self_ref
2030 623 : .logical_size_calculation_task(
2031 623 : initial_part_end,
2032 623 : LogicalSizeCalculationCause::Initial,
2033 623 : background_ctx,
2034 623 : )
2035 102635 : .await
2036 : {
2037 580 : Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
2038 : Err(CalculateLogicalSizeError::Cancelled) => {
2039 31 : Err(BackgroundCalculationError::Cancelled)
2040 : }
2041 2 : Err(CalculateLogicalSizeError::Other(err)) => {
2042 1 : if let Some(PageReconstructError::AncestorStopping(_)) =
2043 2 : err.root_cause().downcast_ref()
2044 : {
2045 0 : Err(BackgroundCalculationError::Cancelled)
2046 : } else {
2047 2 : Err(BackgroundCalculationError::Other(err))
2048 : }
2049 : }
2050 : }
2051 625 : }
2052 635 : };
2053 :
2054 635 : let retrying = async {
2055 635 : let mut attempt = 0;
2056 635 : loop {
2057 635 : attempt += 1;
2058 635 :
2059 103137 : match try_once(attempt).await {
2060 580 : Ok(res) => return ControlFlow::Continue(res),
2061 43 : Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
2062 2 : Err(BackgroundCalculationError::Other(e)) => {
2063 2 : warn!(attempt, "initial size calculation failed: {e:?}");
2064 : // exponential back-off doesn't make sense at these long intervals;
2065 : // use fixed retry interval with generous jitter instead
2066 2 : let sleep_duration = Duration::from_secs(
2067 2 : u64::try_from(
2068 2 : // 1hour base
2069 2 : (60_i64 * 60_i64)
2070 2 : // 10min jitter
2071 2 : + rand::thread_rng().gen_range(-10 * 60..10 * 60),
2072 2 : )
2073 2 : .expect("10min < 1hour"),
2074 2 : );
2075 2 : tokio::time::sleep(sleep_duration).await;
2076 : }
2077 : }
2078 : }
2079 623 : };
2080 :
2081 635 : let (calculated_size, metrics_guard) = tokio::select! {
2082 623 : res = retrying => {
2083 : match res {
2084 : ControlFlow::Continue(calculated_size) => calculated_size,
2085 : ControlFlow::Break(()) => return,
2086 : }
2087 : }
2088 : _ = cancel.cancelled() => {
2089 : return;
2090 : }
2091 : };
2092 :
2093 : // we cannot query current_logical_size.current_size() to know the current
2094 : // *negative* value, only truncated to u64.
2095 580 : let added = self
2096 580 : .current_logical_size
2097 580 : .size_added_after_initial
2098 580 : .load(AtomicOrdering::Relaxed);
2099 580 :
2100 580 : let sum = calculated_size.saturating_add_signed(added);
2101 580 :
2102 580 : // set the gauge value before it can be set in `update_current_logical_size`.
2103 580 : self.metrics.current_logical_size_gauge.set(sum);
2104 580 :
2105 580 : self.current_logical_size
2106 580 : .initial_logical_size
2107 580 : .set((calculated_size, metrics_guard.calculation_result_saved()))
2108 580 : .ok()
2109 580 : .expect("only this task sets it");
2110 631 : }
2111 :
2112 35 : pub(crate) fn spawn_ondemand_logical_size_calculation(
2113 35 : self: &Arc<Self>,
2114 35 : lsn: Lsn,
2115 35 : cause: LogicalSizeCalculationCause,
2116 35 : ctx: RequestContext,
2117 35 : ) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
2118 35 : let (sender, receiver) = oneshot::channel();
2119 35 : let self_clone = Arc::clone(self);
2120 35 : // XXX if our caller loses interest, i.e., ctx is cancelled,
2121 35 : // we should stop the size calculation work and return an error.
2122 35 : // That would require restructuring this function's API to
2123 35 : // return the result directly, instead of a Receiver for the result.
2124 35 : let ctx = ctx.detached_child(
2125 35 : TaskKind::OndemandLogicalSizeCalculation,
2126 35 : DownloadBehavior::Download,
2127 35 : );
2128 35 : task_mgr::spawn(
2129 35 : task_mgr::BACKGROUND_RUNTIME.handle(),
2130 35 : task_mgr::TaskKind::OndemandLogicalSizeCalculation,
2131 35 : Some(self.tenant_shard_id),
2132 35 : Some(self.timeline_id),
2133 35 : "ondemand logical size calculation",
2134 35 : false,
2135 35 : async move {
2136 35 : let res = self_clone
2137 35 : .logical_size_calculation_task(lsn, cause, &ctx)
2138 2789 : .await;
2139 35 : let _ = sender.send(res).ok();
2140 35 : Ok(()) // Receiver is responsible for handling errors
2141 35 : }
2142 35 : .in_current_span(),
2143 35 : );
2144 35 : receiver
2145 35 : }
2146 :
2147 : /// # Cancel-Safety
2148 : ///
2149 : /// This method is cancellation-safe.
2150 0 : #[instrument(skip_all)]
2151 : async fn logical_size_calculation_task(
2152 : self: &Arc<Self>,
2153 : lsn: Lsn,
2154 : cause: LogicalSizeCalculationCause,
2155 : ctx: &RequestContext,
2156 : ) -> Result<u64, CalculateLogicalSizeError> {
2157 : crate::span::debug_assert_current_span_has_tenant_and_timeline_id();
2158 : // We should never be calculating logical sizes on shard !=0, because these shards do not have
2159 : // accurate relation sizes, and they do not emit consumption metrics.
2160 : debug_assert!(self.tenant_shard_id.is_zero());
2161 :
2162 : let _guard = self.gate.enter();
2163 :
2164 : let self_calculation = Arc::clone(self);
2165 :
2166 658 : let mut calculation = pin!(async {
2167 658 : let ctx = ctx.attached_child();
2168 658 : self_calculation
2169 658 : .calculate_logical_size(lsn, cause, &ctx)
2170 105431 : .await
2171 648 : });
2172 :
2173 106073 : tokio::select! {
2174 646 : res = &mut calculation => { res }
2175 : _ = self.cancel.cancelled() => {
2176 0 : debug!("cancelling logical size calculation for timeline shutdown");
2177 : calculation.await
2178 : }
2179 : _ = task_mgr::shutdown_watcher() => {
2180 0 : debug!("cancelling logical size calculation for task shutdown");
2181 : calculation.await
2182 : }
2183 : }
2184 : }
2185 :
2186 : /// Calculate the logical size of the database at the latest LSN.
2187 : ///
2188 : /// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
2189 : /// especially if we need to download remote layers.
2190 : ///
2191 : /// # Cancel-Safety
2192 : ///
2193 : /// This method is cancellation-safe.
2194 665 : async fn calculate_logical_size(
2195 665 : &self,
2196 665 : up_to_lsn: Lsn,
2197 665 : cause: LogicalSizeCalculationCause,
2198 665 : ctx: &RequestContext,
2199 665 : ) -> Result<u64, CalculateLogicalSizeError> {
2200 665 : info!(
2201 665 : "Calculating logical size for timeline {} at {}",
2202 665 : self.timeline_id, up_to_lsn
2203 665 : );
2204 : // These failpoints are used by python tests to ensure that we don't delete
2205 : // the timeline while the logical size computation is ongoing.
2206 : // The first failpoint is used to make this function pause.
2207 : // Then the python test initiates timeline delete operation in a thread.
2208 : // It waits for a few seconds, then arms the second failpoint and disables
2209 : // the first failpoint. The second failpoint prints an error if the timeline
2210 : // delete code has deleted the on-disk state while we're still running here.
2211 : // It shouldn't do that. If it does it anyway, the error will be caught
2212 : // by the test suite, highlighting the problem.
2213 0 : fail::fail_point!("timeline-calculate-logical-size-pause");
2214 665 : fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
2215 2 : if !self
2216 2 : .conf
2217 2 : .metadata_path(&self.tenant_shard_id, &self.timeline_id)
2218 2 : .exists()
2219 : {
2220 0 : error!("timeline-calculate-logical-size-pre metadata file does not exist")
2221 2 : }
2222 : // need to return something
2223 2 : Ok(0)
2224 665 : });
2225 : // See if we've already done the work for initial size calculation.
2226 : // This is a short-cut for timelines that are mostly unused.
2227 663 : if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
2228 5 : return Ok(size);
2229 658 : }
2230 658 : let storage_time_metrics = match cause {
2231 : LogicalSizeCalculationCause::Initial
2232 : | LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
2233 645 : | LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
2234 : LogicalSizeCalculationCause::EvictionTaskImitation => {
2235 13 : &self.metrics.imitate_logical_size_histo
2236 : }
2237 : };
2238 658 : let timer = storage_time_metrics.start_timer();
2239 658 : let logical_size = self
2240 658 : .get_current_logical_size_non_incremental(up_to_lsn, ctx)
2241 106262 : .await?;
2242 0 : debug!("calculated logical size: {logical_size}");
2243 615 : timer.stop_and_record();
2244 615 : Ok(logical_size)
2245 655 : }
2246 :
2247 : /// Update current logical size, adding `delta' to the old value.
2248 642421 : fn update_current_logical_size(&self, delta: i64) {
2249 642421 : let logical_size = &self.current_logical_size;
2250 642421 : logical_size.increment_size(delta);
2251 642421 :
2252 642421 : // Also set the value in the prometheus gauge. Note that
2253 642421 : // there is a race condition here: if this is is called by two
2254 642421 : // threads concurrently, the prometheus gauge might be set to
2255 642421 : // one value while current_logical_size is set to the
2256 642421 : // other.
2257 642421 : match logical_size.current_size() {
2258 642359 : CurrentLogicalSize::Exact(ref new_current_size) => self
2259 642359 : .metrics
2260 642359 : .current_logical_size_gauge
2261 642359 : .set(new_current_size.into()),
2262 62 : CurrentLogicalSize::Approximate(_) => {
2263 62 : // don't update the gauge yet, this allows us not to update the gauge back and
2264 62 : // forth between the initial size calculation task.
2265 62 : }
2266 : }
2267 642421 : }
2268 :
2269 2254 : async fn find_layer(&self, layer_file_name: &str) -> Option<Layer> {
2270 2254 : let guard = self.layers.read().await;
2271 800061 : for historic_layer in guard.layer_map().iter_historic_layers() {
2272 800061 : let historic_layer_name = historic_layer.filename().file_name();
2273 800061 : if layer_file_name == historic_layer_name {
2274 2254 : return Some(guard.get_from_desc(&historic_layer));
2275 797807 : }
2276 : }
2277 :
2278 0 : None
2279 2254 : }
2280 :
2281 : /// The timeline heatmap is a hint to secondary locations from the primary location,
2282 : /// indicating which layers are currently on-disk on the primary.
2283 : ///
2284 : /// None is returned if the Timeline is in a state where uploading a heatmap
2285 : /// doesn't make sense, such as shutting down or initializing. The caller
2286 : /// should treat this as a cue to simply skip doing any heatmap uploading
2287 : /// for this timeline.
2288 8 : pub(crate) async fn generate_heatmap(&self) -> Option<HeatMapTimeline> {
2289 17 : let eviction_info = self.get_local_layers_for_disk_usage_eviction().await;
2290 :
2291 8 : let remote_client = match &self.remote_client {
2292 8 : Some(c) => c,
2293 0 : None => return None,
2294 : };
2295 :
2296 8 : let layer_file_names = eviction_info
2297 8 : .resident_layers
2298 8 : .iter()
2299 2493 : .map(|l| l.layer.get_name())
2300 8 : .collect::<Vec<_>>();
2301 :
2302 8 : let decorated = match remote_client.get_layers_metadata(layer_file_names) {
2303 8 : Ok(d) => d,
2304 : Err(_) => {
2305 : // Getting metadata only fails on Timeline in bad state.
2306 0 : return None;
2307 : }
2308 : };
2309 :
2310 8 : let heatmap_layers = std::iter::zip(
2311 8 : eviction_info.resident_layers.into_iter(),
2312 8 : decorated.into_iter(),
2313 8 : )
2314 2493 : .filter_map(|(layer, remote_info)| {
2315 2493 : remote_info.map(|remote_info| {
2316 2493 : HeatMapLayer::new(
2317 2493 : layer.layer.get_name(),
2318 2493 : IndexLayerMetadata::from(remote_info),
2319 2493 : layer.last_activity_ts,
2320 2493 : )
2321 2493 : })
2322 2493 : });
2323 8 :
2324 8 : Some(HeatMapTimeline::new(
2325 8 : self.timeline_id,
2326 8 : heatmap_layers.collect(),
2327 8 : ))
2328 8 : }
2329 : }
2330 :
2331 : type TraversalId = String;
2332 :
2333 : trait TraversalLayerExt {
2334 : fn traversal_id(&self) -> TraversalId;
2335 : }
2336 :
2337 : impl TraversalLayerExt for Layer {
2338 1051 : fn traversal_id(&self) -> TraversalId {
2339 1051 : self.local_path().to_string()
2340 1051 : }
2341 : }
2342 :
2343 : impl TraversalLayerExt for Arc<InMemoryLayer> {
2344 337 : fn traversal_id(&self) -> TraversalId {
2345 337 : format!("timeline {} in-memory {self}", self.get_timeline_id())
2346 337 : }
2347 : }
2348 :
2349 : impl Timeline {
2350 : ///
2351 : /// Get a handle to a Layer for reading.
2352 : ///
2353 : /// The returned Layer might be from an ancestor timeline, if the
2354 : /// segment hasn't been updated on this timeline yet.
2355 : ///
2356 : /// This function takes the current timeline's locked LayerMap as an argument,
2357 : /// so callers can avoid potential race conditions.
2358 : ///
2359 : /// # Cancel-Safety
2360 : ///
2361 : /// This method is cancellation-safe.
2362 8625177 : async fn get_reconstruct_data(
2363 8625177 : &self,
2364 8625177 : key: Key,
2365 8625177 : request_lsn: Lsn,
2366 8625177 : reconstruct_state: &mut ValueReconstructState,
2367 8625177 : ctx: &RequestContext,
2368 8625179 : ) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
2369 8625179 : // Start from the current timeline.
2370 8625179 : let mut timeline_owned;
2371 8625179 : let mut timeline = self;
2372 8625179 :
2373 8625179 : let mut read_count = scopeguard::guard(0, |cnt| {
2374 8625175 : crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
2375 8625179 : });
2376 8625179 :
2377 8625179 : // For debugging purposes, collect the path of layers that we traversed
2378 8625179 : // through. It's included in the error message if we fail to find the key.
2379 8625179 : let mut traversal_path = Vec::<TraversalPathItem>::new();
2380 :
2381 8625179 : let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
2382 1776726 : *cached_lsn
2383 : } else {
2384 6848453 : Lsn(0)
2385 : };
2386 :
2387 : // 'prev_lsn' tracks the last LSN that we were at in our search. It's used
2388 : // to check that each iteration make some progress, to break infinite
2389 : // looping if something goes wrong.
2390 8625179 : let mut prev_lsn = Lsn(u64::MAX);
2391 8625179 :
2392 8625179 : let mut result = ValueReconstructResult::Continue;
2393 8625179 : let mut cont_lsn = Lsn(request_lsn.0 + 1);
2394 :
2395 40959817 : 'outer: loop {
2396 40959817 : if self.cancel.is_cancelled() {
2397 38 : return Err(PageReconstructError::Cancelled);
2398 40959779 : }
2399 40959779 :
2400 40959779 : // The function should have updated 'state'
2401 40959779 : //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
2402 40959779 : match result {
2403 6879484 : ValueReconstructResult::Complete => return Ok(traversal_path),
2404 : ValueReconstructResult::Continue => {
2405 : // If we reached an earlier cached page image, we're done.
2406 34080290 : if cont_lsn == cached_lsn + 1 {
2407 1744355 : MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
2408 1744355 : return Ok(traversal_path);
2409 32335935 : }
2410 32335935 : if prev_lsn <= cont_lsn {
2411 : // Didn't make any progress in last iteration. Error out to avoid
2412 : // getting stuck in the loop.
2413 1259 : return Err(layer_traversal_error(format!(
2414 1259 : "could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
2415 1259 : key,
2416 1259 : Lsn(cont_lsn.0 - 1),
2417 1259 : request_lsn,
2418 1259 : timeline.ancestor_lsn
2419 1259 : ), traversal_path));
2420 32334676 : }
2421 32334676 : prev_lsn = cont_lsn;
2422 : }
2423 : ValueReconstructResult::Missing => {
2424 : return Err(layer_traversal_error(
2425 5 : if cfg!(test) {
2426 4 : format!(
2427 4 : "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}\n{}",
2428 4 : key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
2429 4 : )
2430 : } else {
2431 1 : format!(
2432 1 : "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}",
2433 1 : key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn
2434 1 : )
2435 : },
2436 5 : traversal_path,
2437 : ));
2438 : }
2439 : }
2440 :
2441 : // Recurse into ancestor if needed
2442 32334676 : if is_inherited_key(key) && Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
2443 0 : trace!(
2444 0 : "going into ancestor {}, cont_lsn is {}",
2445 0 : timeline.ancestor_lsn,
2446 0 : cont_lsn
2447 0 : );
2448 :
2449 1298714 : timeline_owned = timeline.get_ready_ancestor_timeline(ctx).await?;
2450 1298710 : timeline = &*timeline_owned;
2451 1298710 : prev_lsn = Lsn(u64::MAX);
2452 1298710 : continue 'outer;
2453 31035962 : }
2454 :
2455 31035962 : let guard = timeline.layers.read().await;
2456 31035962 : let layers = guard.layer_map();
2457 :
2458 : // Check the open and frozen in-memory layers first, in order from newest
2459 : // to oldest.
2460 31035962 : if let Some(open_layer) = &layers.open_layer {
2461 26023624 : let start_lsn = open_layer.get_lsn_range().start;
2462 26023624 : if cont_lsn > start_lsn {
2463 : //info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
2464 : // Get all the data needed to reconstruct the page version from this layer.
2465 : // But if we have an older cached page image, no need to go past that.
2466 6700533 : let lsn_floor = max(cached_lsn + 1, start_lsn);
2467 6700533 : result = match open_layer
2468 6700533 : .get_value_reconstruct_data(
2469 6700533 : key,
2470 6700533 : lsn_floor..cont_lsn,
2471 6700533 : reconstruct_state,
2472 6700533 : ctx,
2473 6700533 : )
2474 669114 : .await
2475 : {
2476 6700533 : Ok(result) => result,
2477 0 : Err(e) => return Err(PageReconstructError::from(e)),
2478 : };
2479 6700533 : cont_lsn = lsn_floor;
2480 6700533 : // metrics: open_layer does not count as fs access, so we are not updating `read_count`
2481 6700533 : traversal_path.push((
2482 6700533 : result,
2483 6700533 : cont_lsn,
2484 6700533 : Box::new({
2485 6700533 : let open_layer = Arc::clone(open_layer);
2486 6700533 : move || open_layer.traversal_id()
2487 6700533 : }),
2488 6700533 : ));
2489 6700533 : continue 'outer;
2490 19323091 : }
2491 5012338 : }
2492 24335429 : for frozen_layer in layers.frozen_layers.iter().rev() {
2493 941050 : let start_lsn = frozen_layer.get_lsn_range().start;
2494 941050 : if cont_lsn > start_lsn {
2495 : //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
2496 261809 : let lsn_floor = max(cached_lsn + 1, start_lsn);
2497 261809 : result = match frozen_layer
2498 261809 : .get_value_reconstruct_data(
2499 261809 : key,
2500 261809 : lsn_floor..cont_lsn,
2501 261809 : reconstruct_state,
2502 261809 : ctx,
2503 261809 : )
2504 18969 : .await
2505 : {
2506 261809 : Ok(result) => result,
2507 0 : Err(e) => return Err(PageReconstructError::from(e)),
2508 : };
2509 261809 : cont_lsn = lsn_floor;
2510 261809 : // metrics: open_layer does not count as fs access, so we are not updating `read_count`
2511 261809 : traversal_path.push((
2512 261809 : result,
2513 261809 : cont_lsn,
2514 261809 : Box::new({
2515 261809 : let frozen_layer = Arc::clone(frozen_layer);
2516 261809 : move || frozen_layer.traversal_id()
2517 261809 : }),
2518 261809 : ));
2519 261809 : continue 'outer;
2520 679241 : }
2521 : }
2522 :
2523 24073620 : if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
2524 23934553 : let layer = guard.get_from_desc(&layer);
2525 23934553 : // Get all the data needed to reconstruct the page version from this layer.
2526 23934553 : // But if we have an older cached page image, no need to go past that.
2527 23934553 : let lsn_floor = max(cached_lsn + 1, lsn_floor);
2528 23934553 : result = match layer
2529 23934553 : .get_value_reconstruct_data(key, lsn_floor..cont_lsn, reconstruct_state, ctx)
2530 1207064 : .await
2531 : {
2532 23934519 : Ok(result) => result,
2533 22 : Err(e) => return Err(PageReconstructError::from(e)),
2534 : };
2535 23934519 : cont_lsn = lsn_floor;
2536 23934519 : *read_count += 1;
2537 23934519 : traversal_path.push((
2538 23934519 : result,
2539 23934519 : cont_lsn,
2540 23934519 : Box::new({
2541 23934519 : let layer = layer.to_owned();
2542 23934519 : move || layer.traversal_id()
2543 23934519 : }),
2544 23934519 : ));
2545 23934519 : continue 'outer;
2546 139067 : } else if timeline.ancestor_timeline.is_some() {
2547 : // Nothing on this timeline. Traverse to parent
2548 139066 : result = ValueReconstructResult::Continue;
2549 139066 : cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
2550 139066 : continue 'outer;
2551 : } else {
2552 : // Nothing found
2553 1 : result = ValueReconstructResult::Missing;
2554 1 : continue 'outer;
2555 : }
2556 : }
2557 8625167 : }
2558 :
2559 : /// # Cancel-safety
2560 : ///
2561 : /// This method is cancellation-safe.
2562 8625194 : async fn lookup_cached_page(
2563 8625194 : &self,
2564 8625194 : key: &Key,
2565 8625194 : lsn: Lsn,
2566 8625194 : ctx: &RequestContext,
2567 8625196 : ) -> Option<(Lsn, Bytes)> {
2568 8625196 : let cache = page_cache::get();
2569 :
2570 : // FIXME: It's pointless to check the cache for things that are not 8kB pages.
2571 : // We should look at the key to determine if it's a cacheable object
2572 8625196 : let (lsn, read_guard) = cache
2573 8625196 : .lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
2574 6848453 : .await?;
2575 1776743 : let img = Bytes::from(read_guard.to_vec());
2576 1776743 : Some((lsn, img))
2577 8625196 : }
2578 :
2579 1298714 : async fn get_ready_ancestor_timeline(
2580 1298714 : &self,
2581 1298714 : ctx: &RequestContext,
2582 1298714 : ) -> Result<Arc<Timeline>, GetReadyAncestorError> {
2583 1298714 : let ancestor = match self.get_ancestor_timeline() {
2584 1298714 : Ok(timeline) => timeline,
2585 0 : Err(e) => return Err(GetReadyAncestorError::from(e)),
2586 : };
2587 :
2588 : // It's possible that the ancestor timeline isn't active yet, or
2589 : // is active but hasn't yet caught up to the branch point. Wait
2590 : // for it.
2591 : //
2592 : // This cannot happen while the pageserver is running normally,
2593 : // because you cannot create a branch from a point that isn't
2594 : // present in the pageserver yet. However, we don't wait for the
2595 : // branch point to be uploaded to cloud storage before creating
2596 : // a branch. I.e., the branch LSN need not be remote consistent
2597 : // for the branching operation to succeed.
2598 : //
2599 : // Hence, if we try to load a tenant in such a state where
2600 : // 1. the existence of the branch was persisted (in IndexPart and/or locally)
2601 : // 2. but the ancestor state is behind branch_lsn because it was not yet persisted
2602 : // then we will need to wait for the ancestor timeline to
2603 : // re-stream WAL up to branch_lsn before we access it.
2604 : //
2605 : // How can a tenant get in such a state?
2606 : // - ungraceful pageserver process exit
2607 : // - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
2608 : //
2609 : // NB: this could be avoided by requiring
2610 : // branch_lsn >= remote_consistent_lsn
2611 : // during branch creation.
2612 1298714 : match ancestor.wait_to_become_active(ctx).await {
2613 1298710 : Ok(()) => {}
2614 : Err(TimelineState::Stopping) => {
2615 2 : return Err(GetReadyAncestorError::AncestorStopping(
2616 2 : ancestor.timeline_id,
2617 2 : ));
2618 : }
2619 2 : Err(state) => {
2620 2 : return Err(GetReadyAncestorError::Other(anyhow::anyhow!(
2621 2 : "Timeline {} will not become active. Current state: {:?}",
2622 2 : ancestor.timeline_id,
2623 2 : &state,
2624 2 : )));
2625 : }
2626 : }
2627 1298710 : ancestor
2628 1298710 : .wait_lsn(self.ancestor_lsn, ctx)
2629 6 : .await
2630 1298710 : .map_err(|e| match e {
2631 0 : e @ WaitLsnError::Timeout(_) => GetReadyAncestorError::AncestorLsnTimeout(e),
2632 0 : WaitLsnError::Shutdown => GetReadyAncestorError::Cancelled,
2633 0 : e @ WaitLsnError::BadState => GetReadyAncestorError::Other(anyhow::anyhow!(e)),
2634 1298710 : })?;
2635 :
2636 1298710 : Ok(ancestor)
2637 1298714 : }
2638 :
2639 1298714 : fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
2640 1298714 : let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
2641 0 : format!(
2642 0 : "Ancestor is missing. Timeline id: {} Ancestor id {:?}",
2643 0 : self.timeline_id,
2644 0 : self.get_ancestor_timeline_id(),
2645 0 : )
2646 1298714 : })?;
2647 1298714 : Ok(Arc::clone(ancestor))
2648 1298714 : }
2649 :
2650 1073801 : pub(crate) fn get_shard_identity(&self) -> &ShardIdentity {
2651 1073801 : &self.shard_identity
2652 1073801 : }
2653 :
2654 : ///
2655 : /// Get a handle to the latest layer for appending.
2656 : ///
2657 2951969 : async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
2658 2951969 : let mut guard = self.layers.write().await;
2659 2951969 : let layer = guard
2660 2951969 : .get_layer_for_write(
2661 2951969 : lsn,
2662 2951969 : self.get_last_record_lsn(),
2663 2951969 : self.conf,
2664 2951969 : self.timeline_id,
2665 2951969 : self.tenant_shard_id,
2666 2951969 : )
2667 269 : .await?;
2668 2951969 : Ok(layer)
2669 2951969 : }
2670 :
2671 1214102 : async fn put_value(
2672 1214102 : &self,
2673 1214102 : key: Key,
2674 1214102 : lsn: Lsn,
2675 1214102 : val: &Value,
2676 1214102 : ctx: &RequestContext,
2677 1214102 : ) -> anyhow::Result<()> {
2678 : //info!("PUT: key {} at {}", key, lsn);
2679 1214102 : let layer = self.get_layer_for_write(lsn).await?;
2680 1214102 : layer.put_value(key, lsn, val, ctx).await?;
2681 1214102 : Ok(())
2682 1214102 : }
2683 :
2684 1718651 : async fn put_values(
2685 1718651 : &self,
2686 1718651 : values: &HashMap<Key, Vec<(Lsn, Value)>>,
2687 1718651 : ctx: &RequestContext,
2688 1718651 : ) -> anyhow::Result<()> {
2689 : // Pick the first LSN in the batch to get the layer to write to.
2690 1718651 : for lsns in values.values() {
2691 1718651 : if let Some((lsn, _)) = lsns.first() {
2692 1718651 : let layer = self.get_layer_for_write(*lsn).await?;
2693 1718651 : layer.put_values(values, ctx).await?;
2694 1718650 : break;
2695 0 : }
2696 : }
2697 1718650 : Ok(())
2698 1718650 : }
2699 :
2700 19216 : async fn put_tombstones(&self, tombstones: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
2701 19216 : if let Some((_, lsn)) = tombstones.first() {
2702 19216 : let layer = self.get_layer_for_write(*lsn).await?;
2703 19216 : layer.put_tombstones(tombstones).await?;
2704 0 : }
2705 19216 : Ok(())
2706 19216 : }
2707 :
2708 76570323 : pub(crate) fn finish_write(&self, new_lsn: Lsn) {
2709 76570323 : assert!(new_lsn.is_aligned());
2710 :
2711 76570323 : self.metrics.last_record_gauge.set(new_lsn.0 as i64);
2712 76570323 : self.last_record_lsn.advance(new_lsn);
2713 76570323 : }
2714 :
2715 5489 : async fn freeze_inmem_layer(&self, write_lock_held: bool) {
2716 : // Freeze the current open in-memory layer. It will be written to disk on next
2717 : // iteration.
2718 5489 : let _write_guard = if write_lock_held {
2719 3688 : None
2720 : } else {
2721 1801 : Some(self.write_lock.lock().await)
2722 : };
2723 5489 : let mut guard = self.layers.write().await;
2724 5489 : guard
2725 5489 : .try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at)
2726 5 : .await;
2727 5489 : }
2728 :
2729 : /// Layer flusher task's main loop.
2730 1551 : async fn flush_loop(
2731 1551 : self: &Arc<Self>,
2732 1551 : mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
2733 1551 : ctx: &RequestContext,
2734 1551 : ) {
2735 1551 : info!("started flush loop");
2736 : loop {
2737 6673 : tokio::select! {
2738 12020 : _ = self.cancel.cancelled() => {
2739 12020 : info!("shutting down layer flush task");
2740 12020 : break;
2741 12020 : },
2742 12020 : _ = task_mgr::shutdown_watcher() => {
2743 12020 : info!("shutting down layer flush task");
2744 12020 : break;
2745 12020 : },
2746 12020 : _ = layer_flush_start_rx.changed() => {}
2747 12020 : }
2748 :
2749 0 : trace!("waking up");
2750 5129 : let timer = self.metrics.flush_time_histo.start_timer();
2751 5129 : let flush_counter = *layer_flush_start_rx.borrow();
2752 10388 : let result = loop {
2753 10388 : if self.cancel.is_cancelled() {
2754 0 : info!("dropping out of flush loop for timeline shutdown");
2755 : // Note: we do not bother transmitting into [`layer_flush_done_tx`], because
2756 : // anyone waiting on that will respect self.cancel as well: they will stop
2757 : // waiting at the same time we as drop out of this loop.
2758 0 : return;
2759 10388 : }
2760 :
2761 10388 : let layer_to_flush = {
2762 10388 : let guard = self.layers.read().await;
2763 10388 : guard.layer_map().frozen_layers.front().cloned()
2764 : // drop 'layers' lock to allow concurrent reads and writes
2765 : };
2766 10388 : let Some(layer_to_flush) = layer_to_flush else {
2767 5122 : break Ok(());
2768 : };
2769 22252 : match self.flush_frozen_layer(layer_to_flush, ctx).await {
2770 5259 : Ok(()) => {}
2771 : Err(FlushLayerError::Cancelled) => {
2772 1 : info!("dropping out of flush loop for timeline shutdown");
2773 1 : return;
2774 : }
2775 0 : err @ Err(
2776 : FlushLayerError::Other(_) | FlushLayerError::CreateImageLayersError(_),
2777 : ) => {
2778 0 : error!("could not flush frozen layer: {err:?}");
2779 0 : break err;
2780 : }
2781 : }
2782 : };
2783 : // Notify any listeners that we're done
2784 5122 : let _ = self
2785 5122 : .layer_flush_done_tx
2786 5122 : .send_replace((flush_counter, result));
2787 5122 :
2788 5122 : timer.stop_and_record();
2789 : }
2790 576 : }
2791 :
2792 1801 : async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
2793 1801 : let mut rx = self.layer_flush_done_tx.subscribe();
2794 1801 :
2795 1801 : // Increment the flush cycle counter and wake up the flush task.
2796 1801 : // Remember the new value, so that when we listen for the flush
2797 1801 : // to finish, we know when the flush that we initiated has
2798 1801 : // finished, instead of some other flush that was started earlier.
2799 1801 : let mut my_flush_request = 0;
2800 1801 :
2801 1801 : let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
2802 1801 : if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
2803 44 : anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
2804 1757 : }
2805 1757 :
2806 1757 : self.layer_flush_start_tx.send_modify(|counter| {
2807 1757 : my_flush_request = *counter + 1;
2808 1757 : *counter = my_flush_request;
2809 1757 : });
2810 :
2811 3513 : loop {
2812 3513 : {
2813 3513 : let (last_result_counter, last_result) = &*rx.borrow();
2814 3513 : if *last_result_counter >= my_flush_request {
2815 1756 : if let Err(_err) = last_result {
2816 : // We already logged the original error in
2817 : // flush_loop. We cannot propagate it to the caller
2818 : // here, because it might not be Cloneable
2819 0 : anyhow::bail!(
2820 0 : "Could not flush frozen layer. Request id: {}",
2821 0 : my_flush_request
2822 0 : );
2823 : } else {
2824 1756 : return Ok(());
2825 : }
2826 1757 : }
2827 : }
2828 0 : trace!("waiting for flush to complete");
2829 1757 : tokio::select! {
2830 1756 : rx_e = rx.changed() => {
2831 : rx_e?;
2832 : },
2833 : // Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring
2834 : // the notification from [`flush_loop`] that it completed.
2835 : _ = self.cancel.cancelled() => {
2836 1 : tracing::info!("Cancelled layer flush due on timeline shutdown");
2837 : return Ok(())
2838 : }
2839 : };
2840 0 : trace!("done")
2841 : }
2842 1801 : }
2843 :
2844 3688 : fn flush_frozen_layers(&self) {
2845 3688 : self.layer_flush_start_tx.send_modify(|val| *val += 1);
2846 3688 : }
2847 :
2848 : /// Flush one frozen in-memory layer to disk, as a new delta layer.
2849 5184 : #[instrument(skip_all, fields(layer=%frozen_layer))]
2850 : async fn flush_frozen_layer(
2851 : self: &Arc<Self>,
2852 : frozen_layer: Arc<InMemoryLayer>,
2853 : ctx: &RequestContext,
2854 : ) -> Result<(), FlushLayerError> {
2855 : debug_assert_current_span_has_tenant_and_timeline_id();
2856 : // As a special case, when we have just imported an image into the repository,
2857 : // instead of writing out a L0 delta layer, we directly write out image layer
2858 : // files instead. This is possible as long as *all* the data imported into the
2859 : // repository have the same LSN.
2860 : let lsn_range = frozen_layer.get_lsn_range();
2861 : let (layers_to_upload, delta_layer_to_add) =
2862 : if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
2863 : #[cfg(test)]
2864 : match &mut *self.flush_loop_state.lock().unwrap() {
2865 : FlushLoopState::NotStarted | FlushLoopState::Exited => {
2866 : panic!("flush loop not running")
2867 : }
2868 : FlushLoopState::Running {
2869 : initdb_optimization_count,
2870 : ..
2871 : } => {
2872 : *initdb_optimization_count += 1;
2873 : }
2874 : }
2875 : // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
2876 : // require downloading anything during initial import.
2877 : let (partitioning, _lsn) = self
2878 : .repartition(
2879 : self.initdb_lsn,
2880 : self.get_compaction_target_size(),
2881 : EnumSet::empty(),
2882 : ctx,
2883 : )
2884 : .await?;
2885 :
2886 : if self.cancel.is_cancelled() {
2887 : return Err(FlushLayerError::Cancelled);
2888 : }
2889 :
2890 : // For image layers, we add them immediately into the layer map.
2891 : (
2892 : self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
2893 : .await?,
2894 : None,
2895 : )
2896 : } else {
2897 : #[cfg(test)]
2898 : match &mut *self.flush_loop_state.lock().unwrap() {
2899 : FlushLoopState::NotStarted | FlushLoopState::Exited => {
2900 : panic!("flush loop not running")
2901 : }
2902 : FlushLoopState::Running {
2903 : expect_initdb_optimization,
2904 : ..
2905 : } => {
2906 : assert!(!*expect_initdb_optimization, "expected initdb optimization");
2907 : }
2908 : }
2909 : // Normal case, write out a L0 delta layer file.
2910 : // `create_delta_layer` will not modify the layer map.
2911 : // We will remove frozen layer and add delta layer in one atomic operation later.
2912 : let layer = self.create_delta_layer(&frozen_layer, ctx).await?;
2913 : (
2914 : // FIXME: even though we have a single image and single delta layer assumption
2915 : // we push them to vec
2916 : vec![layer.clone()],
2917 : Some(layer),
2918 : )
2919 : };
2920 :
2921 5264 : pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
2922 :
2923 : if self.cancel.is_cancelled() {
2924 : return Err(FlushLayerError::Cancelled);
2925 : }
2926 :
2927 : let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
2928 : let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
2929 :
2930 : // The new on-disk layers are now in the layer map. We can remove the
2931 : // in-memory layer from the map now. The flushed layer is stored in
2932 : // the mapping in `create_delta_layer`.
2933 : let metadata = {
2934 : let mut guard = self.layers.write().await;
2935 :
2936 : if self.cancel.is_cancelled() {
2937 : return Err(FlushLayerError::Cancelled);
2938 : }
2939 :
2940 : guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
2941 :
2942 : if disk_consistent_lsn != old_disk_consistent_lsn {
2943 : assert!(disk_consistent_lsn > old_disk_consistent_lsn);
2944 : self.disk_consistent_lsn.store(disk_consistent_lsn);
2945 :
2946 : // Schedule remote uploads that will reflect our new disk_consistent_lsn
2947 : Some(self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?)
2948 : } else {
2949 : None
2950 : }
2951 : // release lock on 'layers'
2952 : };
2953 :
2954 : // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
2955 : // a compaction can delete the file and then it won't be available for uploads any more.
2956 : // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
2957 : // race situation.
2958 : // See https://github.com/neondatabase/neon/issues/4526
2959 5263 : pausable_failpoint!("flush-frozen-pausable");
2960 :
2961 : // This failpoint is used by another test case `test_pageserver_recovery`.
2962 0 : fail_point!("flush-frozen-exit");
2963 :
2964 : // Update the metadata file, with new 'disk_consistent_lsn'
2965 : //
2966 : // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
2967 : // *all* the layers, to avoid fsyncing the file multiple times.
2968 :
2969 : // If we updated our disk_consistent_lsn, persist the updated metadata to local disk.
2970 : if let Some(metadata) = metadata {
2971 : save_metadata(
2972 : self.conf,
2973 : &self.tenant_shard_id,
2974 : &self.timeline_id,
2975 : &metadata,
2976 : )
2977 : .await
2978 : .context("save_metadata")?;
2979 : }
2980 : Ok(())
2981 : }
2982 :
2983 : /// Update metadata file
2984 5283 : fn schedule_uploads(
2985 5283 : &self,
2986 5283 : disk_consistent_lsn: Lsn,
2987 5283 : layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
2988 5283 : ) -> anyhow::Result<TimelineMetadata> {
2989 5283 : // We can only save a valid 'prev_record_lsn' value on disk if we
2990 5283 : // flushed *all* in-memory changes to disk. We only track
2991 5283 : // 'prev_record_lsn' in memory for the latest processed record, so we
2992 5283 : // don't remember what the correct value that corresponds to some old
2993 5283 : // LSN is. But if we flush everything, then the value corresponding
2994 5283 : // current 'last_record_lsn' is correct and we can store it on disk.
2995 5283 : let RecordLsn {
2996 5283 : last: last_record_lsn,
2997 5283 : prev: prev_record_lsn,
2998 5283 : } = self.last_record_lsn.load();
2999 5283 : let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
3000 1528 : Some(prev_record_lsn)
3001 : } else {
3002 3755 : None
3003 : };
3004 :
3005 5283 : let ancestor_timeline_id = self
3006 5283 : .ancestor_timeline
3007 5283 : .as_ref()
3008 5283 : .map(|ancestor| ancestor.timeline_id);
3009 5283 :
3010 5283 : let metadata = TimelineMetadata::new(
3011 5283 : disk_consistent_lsn,
3012 5283 : ondisk_prev_record_lsn,
3013 5283 : ancestor_timeline_id,
3014 5283 : self.ancestor_lsn,
3015 5283 : *self.latest_gc_cutoff_lsn.read(),
3016 5283 : self.initdb_lsn,
3017 5283 : self.pg_version,
3018 5283 : );
3019 5283 :
3020 5283 : fail_point!("checkpoint-before-saving-metadata", |x| bail!(
3021 0 : "{}",
3022 0 : x.unwrap()
3023 5283 : ));
3024 :
3025 5283 : if let Some(remote_client) = &self.remote_client {
3026 10546 : for layer in layers_to_upload {
3027 5263 : remote_client.schedule_layer_file_upload(layer)?;
3028 : }
3029 5283 : remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
3030 0 : }
3031 :
3032 5283 : Ok(metadata)
3033 5283 : }
3034 :
3035 20 : async fn update_metadata_file(
3036 20 : &self,
3037 20 : disk_consistent_lsn: Lsn,
3038 20 : layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
3039 20 : ) -> anyhow::Result<()> {
3040 20 : let metadata = self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?;
3041 :
3042 20 : save_metadata(
3043 20 : self.conf,
3044 20 : &self.tenant_shard_id,
3045 20 : &self.timeline_id,
3046 20 : &metadata,
3047 20 : )
3048 0 : .await
3049 20 : .context("save_metadata")?;
3050 :
3051 20 : Ok(())
3052 20 : }
3053 :
3054 2 : pub(crate) async fn preserve_initdb_archive(&self) -> anyhow::Result<()> {
3055 2 : if let Some(remote_client) = &self.remote_client {
3056 2 : remote_client
3057 2 : .preserve_initdb_archive(
3058 2 : &self.tenant_shard_id.tenant_id,
3059 2 : &self.timeline_id,
3060 2 : &self.cancel,
3061 2 : )
3062 2 : .await?;
3063 : } else {
3064 0 : bail!("No remote storage configured, but was asked to backup the initdb archive for {} / {}", self.tenant_shard_id.tenant_id, self.timeline_id);
3065 : }
3066 2 : Ok(())
3067 2 : }
3068 :
3069 : // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
3070 : // in layer map immediately. The caller is responsible to put it into the layer map.
3071 5186 : async fn create_delta_layer(
3072 5186 : self: &Arc<Self>,
3073 5186 : frozen_layer: &Arc<InMemoryLayer>,
3074 5186 : ctx: &RequestContext,
3075 5186 : ) -> anyhow::Result<ResidentLayer> {
3076 5186 : let span = tracing::info_span!("blocking");
3077 5186 : let new_delta: ResidentLayer = tokio::task::spawn_blocking({
3078 5186 : let self_clone = Arc::clone(self);
3079 5186 : let frozen_layer = Arc::clone(frozen_layer);
3080 5186 : let ctx = ctx.attached_child();
3081 5186 : move || {
3082 5186 : // Write it out
3083 5186 : // Keep this inside `spawn_blocking` and `Handle::current`
3084 5186 : // as long as the write path is still sync and the read impl
3085 5186 : // is still not fully async. Otherwise executor threads would
3086 5186 : // be blocked.
3087 5186 : let _g = span.entered();
3088 5186 : let new_delta =
3089 5186 : Handle::current().block_on(frozen_layer.write_to_disk(&self_clone, &ctx))?;
3090 5186 : let new_delta_path = new_delta.local_path().to_owned();
3091 5186 :
3092 5186 : // Sync it to disk.
3093 5186 : //
3094 5186 : // We must also fsync the timeline dir to ensure the directory entries for
3095 5186 : // new layer files are durable.
3096 5186 : //
3097 5186 : // NB: timeline dir must be synced _after_ the file contents are durable.
3098 5186 : // So, two separate fsyncs are required, they mustn't be batched.
3099 5186 : //
3100 5186 : // TODO: If we're running inside 'flush_frozen_layers' and there are multiple
3101 5186 : // files to flush, the fsync overhead can be reduces as follows:
3102 5186 : // 1. write them all to temporary file names
3103 5186 : // 2. fsync them
3104 5186 : // 3. rename to the final name
3105 5186 : // 4. fsync the parent directory.
3106 5186 : // Note that (1),(2),(3) today happen inside write_to_disk().
3107 5186 : //
3108 5186 : // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
3109 5186 : par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
3110 5186 : par_fsync::par_fsync(&[self_clone
3111 5186 : .conf
3112 5186 : .timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id)])
3113 5186 : .context("fsync of timeline dir")?;
3114 :
3115 5184 : anyhow::Ok(new_delta)
3116 5186 : }
3117 5186 : })
3118 5184 : .await
3119 5184 : .context("spawn_blocking")
3120 5184 : .and_then(|x| x)?;
3121 :
3122 5184 : Ok(new_delta)
3123 5184 : }
3124 :
3125 1603 : async fn repartition(
3126 1603 : &self,
3127 1603 : lsn: Lsn,
3128 1603 : partition_size: u64,
3129 1603 : flags: EnumSet<CompactFlags>,
3130 1603 : ctx: &RequestContext,
3131 1603 : ) -> anyhow::Result<(KeyPartitioning, Lsn)> {
3132 1603 : {
3133 1603 : let partitioning_guard = self.partitioning.lock().unwrap();
3134 1603 : let distance = lsn.0 - partitioning_guard.1 .0;
3135 1603 : if partitioning_guard.1 != Lsn(0)
3136 900 : && distance <= self.repartition_threshold
3137 719 : && !flags.contains(CompactFlags::ForceRepartition)
3138 : {
3139 0 : debug!(
3140 0 : distance,
3141 0 : threshold = self.repartition_threshold,
3142 0 : "no repartitioning needed"
3143 0 : );
3144 717 : return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
3145 886 : }
3146 : }
3147 213408 : let keyspace = self.collect_keyspace(lsn, ctx).await?;
3148 882 : let partitioning = keyspace.partition(partition_size);
3149 882 :
3150 882 : let mut partitioning_guard = self.partitioning.lock().unwrap();
3151 882 : if lsn > partitioning_guard.1 {
3152 882 : *partitioning_guard = (partitioning, lsn);
3153 882 : } else {
3154 0 : warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
3155 : }
3156 882 : Ok((partitioning_guard.0.clone(), partitioning_guard.1))
3157 1602 : }
3158 :
3159 : // Is it time to create a new image layer for the given partition?
3160 37180 : async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
3161 37180 : let threshold = self.get_image_creation_threshold();
3162 :
3163 37180 : let guard = self.layers.read().await;
3164 37180 : let layers = guard.layer_map();
3165 37180 :
3166 37180 : let mut max_deltas = 0;
3167 37180 : {
3168 37180 : let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
3169 37180 : if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
3170 6220 : let img_range =
3171 6220 : partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
3172 6220 : if wanted.overlaps(&img_range) {
3173 : //
3174 : // gc_timeline only pays attention to image layers that are older than the GC cutoff,
3175 : // but create_image_layers creates image layers at last-record-lsn.
3176 : // So it's possible that gc_timeline wants a new image layer to be created for a key range,
3177 : // but the range is already covered by image layers at more recent LSNs. Before we
3178 : // create a new image layer, check if the range is already covered at more recent LSNs.
3179 0 : if !layers
3180 0 : .image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))
3181 : {
3182 0 : debug!(
3183 0 : "Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
3184 0 : img_range.start, img_range.end, cutoff_lsn, lsn
3185 0 : );
3186 0 : return true;
3187 0 : }
3188 6220 : }
3189 30960 : }
3190 : }
3191 :
3192 2066684 : for part_range in &partition.ranges {
3193 2035454 : let image_coverage = layers.image_coverage(part_range, lsn);
3194 3884589 : for (img_range, last_img) in image_coverage {
3195 1855085 : let img_lsn = if let Some(last_img) = last_img {
3196 365293 : last_img.get_lsn_range().end
3197 : } else {
3198 1489792 : Lsn(0)
3199 : };
3200 : // Let's consider an example:
3201 : //
3202 : // delta layer with LSN range 71-81
3203 : // delta layer with LSN range 81-91
3204 : // delta layer with LSN range 91-101
3205 : // image layer at LSN 100
3206 : //
3207 : // If 'lsn' is still 100, i.e. no new WAL has been processed since the last image layer,
3208 : // there's no need to create a new one. We check this case explicitly, to avoid passing
3209 : // a bogus range to count_deltas below, with start > end. It's even possible that there
3210 : // are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
3211 : // after we read last_record_lsn, which is passed here in the 'lsn' argument.
3212 1855085 : if img_lsn < lsn {
3213 1820118 : let num_deltas =
3214 1820118 : layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold));
3215 1820118 :
3216 1820118 : max_deltas = max_deltas.max(num_deltas);
3217 1820118 : if num_deltas >= threshold {
3218 0 : debug!(
3219 0 : "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
3220 0 : img_range.start, img_range.end, num_deltas, img_lsn, lsn
3221 0 : );
3222 5949 : return true;
3223 1814168 : }
3224 34967 : }
3225 : }
3226 : }
3227 :
3228 0 : debug!(
3229 0 : max_deltas,
3230 0 : "none of the partitioned ranges had >= {threshold} deltas"
3231 0 : );
3232 31230 : false
3233 37179 : }
3234 :
3235 0 : #[tracing::instrument(skip_all, fields(%lsn, %force))]
3236 : async fn create_image_layers(
3237 : self: &Arc<Timeline>,
3238 : partitioning: &KeyPartitioning,
3239 : lsn: Lsn,
3240 : force: bool,
3241 : ctx: &RequestContext,
3242 : ) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
3243 : let timer = self.metrics.create_images_time_histo.start_timer();
3244 : let mut image_layers = Vec::new();
3245 :
3246 : // We need to avoid holes between generated image layers.
3247 : // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
3248 : // image layer with hole between them. In this case such layer can not be utilized by GC.
3249 : //
3250 : // How such hole between partitions can appear?
3251 : // if we have relation with relid=1 and size 100 and relation with relid=2 with size 200 then result of
3252 : // KeySpace::partition may contain partitions <100000000..100000099> and <200000000..200000199>.
3253 : // If there is delta layer <100000000..300000000> then it never be garbage collected because
3254 : // image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
3255 : let mut start = Key::MIN;
3256 :
3257 : for partition in partitioning.parts.iter() {
3258 : let img_range = start..partition.ranges.last().unwrap().end;
3259 : start = img_range.end;
3260 : if force || self.time_for_new_image_layer(partition, lsn).await {
3261 : let mut image_layer_writer = ImageLayerWriter::new(
3262 : self.conf,
3263 : self.timeline_id,
3264 : self.tenant_shard_id,
3265 : &img_range,
3266 : lsn,
3267 : )
3268 : .await?;
3269 :
3270 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
3271 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
3272 0 : "failpoint image-layer-writer-fail-before-finish"
3273 0 : )))
3274 0 : });
3275 :
3276 : let mut key_request_accum = KeySpaceAccum::new();
3277 : for range in &partition.ranges {
3278 : let mut key = range.start;
3279 : while key < range.end {
3280 : if self.shard_identity.is_key_disposable(&key) {
3281 0 : debug!(
3282 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
3283 0 : key,
3284 0 : self.shard_identity.get_shard_number(&key)
3285 0 : );
3286 : key = key.next();
3287 : continue;
3288 : }
3289 :
3290 : key_request_accum.add_key(key);
3291 : if key_request_accum.size() >= Timeline::MAX_GET_VECTORED_KEYS
3292 : || key.next() == range.end
3293 : {
3294 : let results = self
3295 : .get_vectored(
3296 : &key_request_accum.consume_keyspace().ranges,
3297 : lsn,
3298 : ctx,
3299 : )
3300 : .await?;
3301 :
3302 : for (img_key, img) in results {
3303 : let img = match img {
3304 : Ok(img) => img,
3305 : Err(err) => {
3306 : // If we fail to reconstruct a VM or FSM page, we can zero the
3307 : // page without losing any actual user data. That seems better
3308 : // than failing repeatedly and getting stuck.
3309 : //
3310 : // We had a bug at one point, where we truncated the FSM and VM
3311 : // in the pageserver, but the Postgres didn't know about that
3312 : // and continued to generate incremental WAL records for pages
3313 : // that didn't exist in the pageserver. Trying to replay those
3314 : // WAL records failed to find the previous image of the page.
3315 : // This special case allows us to recover from that situation.
3316 : // See https://github.com/neondatabase/neon/issues/2601.
3317 : //
3318 : // Unfortunately we cannot do this for the main fork, or for
3319 : // any metadata keys, keys, as that would lead to actual data
3320 : // loss.
3321 : if is_rel_fsm_block_key(img_key)
3322 : || is_rel_vm_block_key(img_key)
3323 : {
3324 0 : warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
3325 : ZERO_PAGE.clone()
3326 : } else {
3327 : return Err(
3328 : CreateImageLayersError::PageReconstructError(err),
3329 : );
3330 : }
3331 : }
3332 : };
3333 :
3334 : image_layer_writer.put_image(img_key, &img).await?;
3335 : }
3336 : }
3337 :
3338 : key = key.next();
3339 : }
3340 : }
3341 : let image_layer = image_layer_writer.finish(self).await?;
3342 : image_layers.push(image_layer);
3343 : }
3344 : }
3345 : // All layers that the GC wanted us to create have now been created.
3346 : //
3347 : // It's possible that another GC cycle happened while we were compacting, and added
3348 : // something new to wanted_image_layers, and we now clear that before processing it.
3349 : // That's OK, because the next GC iteration will put it back in.
3350 : *self.wanted_image_layers.lock().unwrap() = None;
3351 :
3352 : // Sync the new layer to disk before adding it to the layer map, to make sure
3353 : // we don't garbage collect something based on the new layer, before it has
3354 : // reached the disk.
3355 : //
3356 : // We must also fsync the timeline dir to ensure the directory entries for
3357 : // new layer files are durable
3358 : //
3359 : // Compaction creates multiple image layers. It would be better to create them all
3360 : // and fsync them all in parallel.
3361 : let all_paths = image_layers
3362 : .iter()
3363 6029 : .map(|layer| layer.local_path().to_owned())
3364 : .collect::<Vec<_>>();
3365 :
3366 : par_fsync::par_fsync_async(&all_paths)
3367 : .await
3368 : .context("fsync of newly created layer files")?;
3369 :
3370 : if !all_paths.is_empty() {
3371 : par_fsync::par_fsync_async(&[self
3372 : .conf
3373 : .timeline_path(&self.tenant_shard_id, &self.timeline_id)])
3374 : .await
3375 : .context("fsync of timeline dir")?;
3376 : }
3377 :
3378 : let mut guard = self.layers.write().await;
3379 :
3380 : // FIXME: we could add the images to be uploaded *before* returning from here, but right
3381 : // now they are being scheduled outside of write lock
3382 : guard.track_new_image_layers(&image_layers, &self.metrics);
3383 : drop_wlock(guard);
3384 : timer.stop_and_record();
3385 :
3386 : Ok(image_layers)
3387 : }
3388 :
3389 : /// Wait until the background initial logical size calculation is complete, or
3390 : /// this Timeline is shut down. Calling this function will cause the initial
3391 : /// logical size calculation to skip waiting for the background jobs barrier.
3392 245 : pub(crate) async fn await_initial_logical_size(self: Arc<Self>) {
3393 245 : if let Some(await_bg_cancel) = self
3394 245 : .current_logical_size
3395 245 : .cancel_wait_for_background_loop_concurrency_limit_semaphore
3396 245 : .get()
3397 239 : {
3398 239 : await_bg_cancel.cancel();
3399 239 : } else {
3400 : // We should not wait if we were not able to explicitly instruct
3401 : // the logical size cancellation to skip the concurrency limit semaphore.
3402 : // TODO: this is an unexpected case. We should restructure so that it
3403 : // can't happen.
3404 6 : tracing::info!(
3405 6 : "await_initial_logical_size: can't get semaphore cancel token, skipping"
3406 6 : );
3407 : }
3408 :
3409 245 : tokio::select!(
3410 476 : _ = self.current_logical_size.initialized.acquire() => {},
3411 476 : _ = self.cancel.cancelled() => {}
3412 476 : )
3413 237 : }
3414 : }
3415 :
3416 1223 : #[derive(Default)]
3417 : struct CompactLevel0Phase1Result {
3418 : new_layers: Vec<ResidentLayer>,
3419 : deltas_to_compact: Vec<Layer>,
3420 : }
3421 :
3422 : /// Top-level failure to compact.
3423 0 : #[derive(Debug, thiserror::Error)]
3424 : pub(crate) enum CompactionError {
3425 : #[error("The timeline or pageserver is shutting down")]
3426 : ShuttingDown,
3427 : /// Compaction cannot be done right now; page reconstruction and so on.
3428 : #[error(transparent)]
3429 : Other(#[from] anyhow::Error),
3430 : }
3431 :
3432 : #[serde_as]
3433 4088 : #[derive(serde::Serialize)]
3434 : struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
3435 :
3436 10633 : #[derive(Default)]
3437 : enum DurationRecorder {
3438 : #[default]
3439 : NotStarted,
3440 : Recorded(RecordedDuration, tokio::time::Instant),
3441 : }
3442 :
3443 : impl DurationRecorder {
3444 2983 : fn till_now(&self) -> DurationRecorder {
3445 2983 : match self {
3446 : DurationRecorder::NotStarted => {
3447 0 : panic!("must only call on recorded measurements")
3448 : }
3449 2983 : DurationRecorder::Recorded(_, ended) => {
3450 2983 : let now = tokio::time::Instant::now();
3451 2983 : DurationRecorder::Recorded(RecordedDuration(now - *ended), now)
3452 2983 : }
3453 2983 : }
3454 2983 : }
3455 2044 : fn into_recorded(self) -> Option<RecordedDuration> {
3456 2044 : match self {
3457 0 : DurationRecorder::NotStarted => None,
3458 2044 : DurationRecorder::Recorded(recorded, _) => Some(recorded),
3459 : }
3460 2044 : }
3461 : }
3462 :
3463 1519 : #[derive(Default)]
3464 : struct CompactLevel0Phase1StatsBuilder {
3465 : version: Option<u64>,
3466 : tenant_id: Option<TenantShardId>,
3467 : timeline_id: Option<TimelineId>,
3468 : read_lock_acquisition_micros: DurationRecorder,
3469 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
3470 : read_lock_held_key_sort_micros: DurationRecorder,
3471 : read_lock_held_prerequisites_micros: DurationRecorder,
3472 : read_lock_held_compute_holes_micros: DurationRecorder,
3473 : read_lock_drop_micros: DurationRecorder,
3474 : write_layer_files_micros: DurationRecorder,
3475 : level0_deltas_count: Option<usize>,
3476 : new_deltas_count: Option<usize>,
3477 : new_deltas_size: Option<u64>,
3478 : }
3479 :
3480 292 : #[derive(serde::Serialize)]
3481 : struct CompactLevel0Phase1Stats {
3482 : version: u64,
3483 : tenant_id: TenantShardId,
3484 : timeline_id: TimelineId,
3485 : read_lock_acquisition_micros: RecordedDuration,
3486 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
3487 : read_lock_held_key_sort_micros: RecordedDuration,
3488 : read_lock_held_prerequisites_micros: RecordedDuration,
3489 : read_lock_held_compute_holes_micros: RecordedDuration,
3490 : read_lock_drop_micros: RecordedDuration,
3491 : write_layer_files_micros: RecordedDuration,
3492 : level0_deltas_count: usize,
3493 : new_deltas_count: usize,
3494 : new_deltas_size: u64,
3495 : }
3496 :
3497 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
3498 : type Error = anyhow::Error;
3499 :
3500 292 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
3501 292 : Ok(Self {
3502 292 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
3503 292 : tenant_id: value
3504 292 : .tenant_id
3505 292 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
3506 292 : timeline_id: value
3507 292 : .timeline_id
3508 292 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
3509 292 : read_lock_acquisition_micros: value
3510 292 : .read_lock_acquisition_micros
3511 292 : .into_recorded()
3512 292 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
3513 292 : read_lock_held_spawn_blocking_startup_micros: value
3514 292 : .read_lock_held_spawn_blocking_startup_micros
3515 292 : .into_recorded()
3516 292 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
3517 292 : read_lock_held_key_sort_micros: value
3518 292 : .read_lock_held_key_sort_micros
3519 292 : .into_recorded()
3520 292 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
3521 292 : read_lock_held_prerequisites_micros: value
3522 292 : .read_lock_held_prerequisites_micros
3523 292 : .into_recorded()
3524 292 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
3525 292 : read_lock_held_compute_holes_micros: value
3526 292 : .read_lock_held_compute_holes_micros
3527 292 : .into_recorded()
3528 292 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
3529 292 : read_lock_drop_micros: value
3530 292 : .read_lock_drop_micros
3531 292 : .into_recorded()
3532 292 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
3533 292 : write_layer_files_micros: value
3534 292 : .write_layer_files_micros
3535 292 : .into_recorded()
3536 292 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
3537 292 : level0_deltas_count: value
3538 292 : .level0_deltas_count
3539 292 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
3540 292 : new_deltas_count: value
3541 292 : .new_deltas_count
3542 292 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
3543 292 : new_deltas_size: value
3544 292 : .new_deltas_size
3545 292 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
3546 : })
3547 292 : }
3548 : }
3549 :
3550 : impl Timeline {
3551 : /// Level0 files first phase of compaction, explained in the [`Self::compact`] comment.
3552 1519 : async fn compact_level0_phase1(
3553 1519 : self: &Arc<Self>,
3554 1519 : guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
3555 1519 : mut stats: CompactLevel0Phase1StatsBuilder,
3556 1519 : target_file_size: u64,
3557 1519 : ctx: &RequestContext,
3558 1519 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
3559 1519 : stats.read_lock_held_spawn_blocking_startup_micros =
3560 1519 : stats.read_lock_acquisition_micros.till_now(); // set by caller
3561 1519 : let layers = guard.layer_map();
3562 1519 : let level0_deltas = layers.get_level0_deltas()?;
3563 1519 : let mut level0_deltas = level0_deltas
3564 1519 : .into_iter()
3565 6971 : .map(|x| guard.get_from_desc(&x))
3566 1519 : .collect_vec();
3567 1519 : stats.level0_deltas_count = Some(level0_deltas.len());
3568 1519 : // Only compact if enough layers have accumulated.
3569 1519 : let threshold = self.get_compaction_threshold();
3570 1519 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
3571 0 : debug!(
3572 0 : level0_deltas = level0_deltas.len(),
3573 0 : threshold, "too few deltas to compact"
3574 0 : );
3575 1223 : return Ok(CompactLevel0Phase1Result::default());
3576 296 : }
3577 296 :
3578 296 : // This failpoint is used together with `test_duplicate_layers` integration test.
3579 296 : // It returns the compaction result exactly the same layers as input to compaction.
3580 296 : // We want to ensure that this will not cause any problem when updating the layer map
3581 296 : // after the compaction is finished.
3582 296 : //
3583 296 : // Currently, there are two rare edge cases that will cause duplicated layers being
3584 296 : // inserted.
3585 296 : // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
3586 296 : // is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
3587 296 : // map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
3588 296 : // point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
3589 296 : // and this causes an overwrite. This is acceptable because the content is the same, and we should do a
3590 296 : // layer replace instead of the normal remove / upload process.
3591 296 : // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
3592 296 : // size length. Compaction will likely create the same set of n files afterwards.
3593 296 : //
3594 296 : // This failpoint is a superset of both of the cases.
3595 296 : if cfg!(feature = "testing") {
3596 296 : let active = (|| {
3597 296 : ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
3598 296 : false
3599 296 : })();
3600 296 :
3601 296 : if active {
3602 3 : let mut new_layers = Vec::with_capacity(level0_deltas.len());
3603 43 : for delta in &level0_deltas {
3604 : // we are just faking these layers as being produced again for this failpoint
3605 40 : new_layers.push(
3606 40 : delta
3607 40 : .download_and_keep_resident()
3608 0 : .await
3609 40 : .context("download layer for failpoint")?,
3610 : );
3611 : }
3612 3 : tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
3613 3 : return Ok(CompactLevel0Phase1Result {
3614 3 : new_layers,
3615 3 : deltas_to_compact: level0_deltas,
3616 3 : });
3617 293 : }
3618 0 : }
3619 :
3620 : // Gather the files to compact in this iteration.
3621 : //
3622 : // Start with the oldest Level 0 delta file, and collect any other
3623 : // level 0 files that form a contiguous sequence, such that the end
3624 : // LSN of previous file matches the start LSN of the next file.
3625 : //
3626 : // Note that if the files don't form such a sequence, we might
3627 : // "compact" just a single file. That's a bit pointless, but it allows
3628 : // us to get rid of the level 0 file, and compact the other files on
3629 : // the next iteration. This could probably made smarter, but such
3630 : // "gaps" in the sequence of level 0 files should only happen in case
3631 : // of a crash, partial download from cloud storage, or something like
3632 : // that, so it's not a big deal in practice.
3633 8190 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
3634 293 : let mut level0_deltas_iter = level0_deltas.iter();
3635 293 :
3636 293 : let first_level0_delta = level0_deltas_iter.next().unwrap();
3637 293 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
3638 293 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
3639 293 :
3640 293 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
3641 4228 : for l in level0_deltas_iter {
3642 3935 : let lsn_range = &l.layer_desc().lsn_range;
3643 3935 :
3644 3935 : if lsn_range.start != prev_lsn_end {
3645 0 : break;
3646 3935 : }
3647 3935 : deltas_to_compact.push(l.download_and_keep_resident().await?);
3648 3935 : prev_lsn_end = lsn_range.end;
3649 : }
3650 293 : let lsn_range = Range {
3651 293 : start: deltas_to_compact
3652 293 : .first()
3653 293 : .unwrap()
3654 293 : .layer_desc()
3655 293 : .lsn_range
3656 293 : .start,
3657 293 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
3658 293 : };
3659 :
3660 293 : info!(
3661 293 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
3662 293 : lsn_range.start,
3663 293 : lsn_range.end,
3664 293 : deltas_to_compact.len(),
3665 293 : level0_deltas.len()
3666 293 : );
3667 :
3668 4228 : for l in deltas_to_compact.iter() {
3669 4228 : info!("compact includes {l}");
3670 : }
3671 :
3672 : // We don't need the original list of layers anymore. Drop it so that
3673 : // we don't accidentally use it later in the function.
3674 293 : drop(level0_deltas);
3675 293 :
3676 293 : stats.read_lock_held_prerequisites_micros = stats
3677 293 : .read_lock_held_spawn_blocking_startup_micros
3678 293 : .till_now();
3679 293 :
3680 293 : // Determine N largest holes where N is number of compacted layers.
3681 293 : let max_holes = deltas_to_compact.len();
3682 293 : let last_record_lsn = self.get_last_record_lsn();
3683 293 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
3684 293 : let min_hole_coverage_size = 3; // TODO: something more flexible?
3685 293 :
3686 293 : // min-heap (reserve space for one more element added before eviction)
3687 293 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
3688 293 : let mut prev: Option<Key> = None;
3689 293 :
3690 293 : let mut all_keys = Vec::new();
3691 :
3692 4228 : for l in deltas_to_compact.iter() {
3693 4228 : all_keys.extend(l.load_keys(ctx).await?);
3694 : }
3695 :
3696 : // FIXME: should spawn_blocking the rest of this function
3697 :
3698 : // The current stdlib sorting implementation is designed in a way where it is
3699 : // particularly fast where the slice is made up of sorted sub-ranges.
3700 156655146 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
3701 293 :
3702 293 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
3703 :
3704 17514710 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
3705 17514710 : if let Some(prev_key) = prev {
3706 : // just first fast filter
3707 17514417 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
3708 203121 : let key_range = prev_key..next_key;
3709 203121 : // Measuring hole by just subtraction of i128 representation of key range boundaries
3710 203121 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
3711 203121 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
3712 203121 : // That is why it is better to measure size of hole as number of covering image layers.
3713 203121 : let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
3714 203121 : if coverage_size >= min_hole_coverage_size {
3715 211 : heap.push(Hole {
3716 211 : key_range,
3717 211 : coverage_size,
3718 211 : });
3719 211 : if heap.len() > max_holes {
3720 123 : heap.pop(); // remove smallest hole
3721 123 : }
3722 202910 : }
3723 17311296 : }
3724 293 : }
3725 17514710 : prev = Some(next_key.next());
3726 : }
3727 293 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
3728 293 : drop_rlock(guard);
3729 293 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
3730 293 : let mut holes = heap.into_vec();
3731 293 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
3732 293 : let mut next_hole = 0; // index of next hole in holes vector
3733 293 :
3734 293 : // This iterator walks through all key-value pairs from all the layers
3735 293 : // we're compacting, in key, LSN order.
3736 293 : let all_values_iter = all_keys.iter();
3737 293 :
3738 293 : // This iterator walks through all keys and is needed to calculate size used by each key
3739 293 : let mut all_keys_iter = all_keys
3740 293 : .iter()
3741 17503159 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
3742 17502866 : .coalesce(|mut prev, cur| {
3743 17502866 : // Coalesce keys that belong to the same key pair.
3744 17502866 : // This ensures that compaction doesn't put them
3745 17502866 : // into different layer files.
3746 17502866 : // Still limit this by the target file size,
3747 17502866 : // so that we keep the size of the files in
3748 17502866 : // check.
3749 17502866 : if prev.0 == cur.0 && prev.2 < target_file_size {
3750 15129234 : prev.2 += cur.2;
3751 15129234 : Ok(prev)
3752 : } else {
3753 2373632 : Err((prev, cur))
3754 : }
3755 17502866 : });
3756 293 :
3757 293 : // Merge the contents of all the input delta layers into a new set
3758 293 : // of delta layers, based on the current partitioning.
3759 293 : //
3760 293 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
3761 293 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
3762 293 : // would be too large. In that case, we also split on the LSN dimension.
3763 293 : //
3764 293 : // LSN
3765 293 : // ^
3766 293 : // |
3767 293 : // | +-----------+ +--+--+--+--+
3768 293 : // | | | | | | | |
3769 293 : // | +-----------+ | | | | |
3770 293 : // | | | | | | | |
3771 293 : // | +-----------+ ==> | | | | |
3772 293 : // | | | | | | | |
3773 293 : // | +-----------+ | | | | |
3774 293 : // | | | | | | | |
3775 293 : // | +-----------+ +--+--+--+--+
3776 293 : // |
3777 293 : // +--------------> key
3778 293 : //
3779 293 : //
3780 293 : // If one key (X) has a lot of page versions:
3781 293 : //
3782 293 : // LSN
3783 293 : // ^
3784 293 : // | (X)
3785 293 : // | +-----------+ +--+--+--+--+
3786 293 : // | | | | | | | |
3787 293 : // | +-----------+ | | +--+ |
3788 293 : // | | | | | | | |
3789 293 : // | +-----------+ ==> | | | | |
3790 293 : // | | | | | +--+ |
3791 293 : // | +-----------+ | | | | |
3792 293 : // | | | | | | | |
3793 293 : // | +-----------+ +--+--+--+--+
3794 293 : // |
3795 293 : // +--------------> key
3796 293 : // TODO: this actually divides the layers into fixed-size chunks, not
3797 293 : // based on the partitioning.
3798 293 : //
3799 293 : // TODO: we should also opportunistically materialize and
3800 293 : // garbage collect what we can.
3801 293 : let mut new_layers = Vec::new();
3802 293 : let mut prev_key: Option<Key> = None;
3803 293 : let mut writer: Option<DeltaLayerWriter> = None;
3804 293 : let mut key_values_total_size = 0u64;
3805 293 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
3806 293 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
3807 :
3808 : for &DeltaEntry {
3809 17503158 : key, lsn, ref val, ..
3810 17503450 : } in all_values_iter
3811 : {
3812 17503158 : let value = val.load(ctx).await?;
3813 17503157 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
3814 17503157 : // We need to check key boundaries once we reach next key or end of layer with the same key
3815 17503157 : if !same_key || lsn == dup_end_lsn {
3816 2373923 : let mut next_key_size = 0u64;
3817 2373923 : let is_dup_layer = dup_end_lsn.is_valid();
3818 2373923 : dup_start_lsn = Lsn::INVALID;
3819 2373923 : if !same_key {
3820 2373885 : dup_end_lsn = Lsn::INVALID;
3821 2373885 : }
3822 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
3823 2373924 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
3824 2373924 : next_key_size = next_size;
3825 2373924 : if key != next_key {
3826 2373593 : if dup_end_lsn.is_valid() {
3827 14 : // We are writting segment with duplicates:
3828 14 : // place all remaining values of this key in separate segment
3829 14 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
3830 14 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
3831 2373579 : }
3832 2373593 : break;
3833 331 : }
3834 331 : key_values_total_size += next_size;
3835 331 : // Check if it is time to split segment: if total keys size is larger than target file size.
3836 331 : // We need to avoid generation of empty segments if next_size > target_file_size.
3837 331 : if key_values_total_size > target_file_size && lsn != next_lsn {
3838 : // Split key between multiple layers: such layer can contain only single key
3839 38 : dup_start_lsn = if dup_end_lsn.is_valid() {
3840 24 : dup_end_lsn // new segment with duplicates starts where old one stops
3841 : } else {
3842 14 : lsn // start with the first LSN for this key
3843 : };
3844 38 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
3845 38 : break;
3846 293 : }
3847 : }
3848 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
3849 2373923 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
3850 0 : dup_start_lsn = dup_end_lsn;
3851 0 : dup_end_lsn = lsn_range.end;
3852 2373923 : }
3853 2373923 : if writer.is_some() {
3854 2373630 : let written_size = writer.as_mut().unwrap().size();
3855 2373630 : let contains_hole =
3856 2373630 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
3857 : // check if key cause layer overflow or contains hole...
3858 2373630 : if is_dup_layer
3859 2373578 : || dup_end_lsn.is_valid()
3860 2373567 : || written_size + key_values_total_size > target_file_size
3861 2363277 : || contains_hole
3862 : {
3863 : // ... if so, flush previous layer and prepare to write new one
3864 10439 : new_layers.push(
3865 10439 : writer
3866 10439 : .take()
3867 10439 : .unwrap()
3868 10439 : .finish(prev_key.unwrap().next(), self)
3869 1190 : .await?,
3870 : );
3871 10439 : writer = None;
3872 10439 :
3873 10439 : if contains_hole {
3874 88 : // skip hole
3875 88 : next_hole += 1;
3876 10351 : }
3877 2363191 : }
3878 293 : }
3879 : // Remember size of key value because at next iteration we will access next item
3880 2373923 : key_values_total_size = next_key_size;
3881 15129234 : }
3882 17503157 : if writer.is_none() {
3883 10732 : // Create writer if not initiaized yet
3884 10732 : writer = Some(
3885 : DeltaLayerWriter::new(
3886 10732 : self.conf,
3887 10732 : self.timeline_id,
3888 10732 : self.tenant_shard_id,
3889 10732 : key,
3890 10732 : if dup_end_lsn.is_valid() {
3891 : // this is a layer containing slice of values of the same key
3892 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
3893 52 : dup_start_lsn..dup_end_lsn
3894 : } else {
3895 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
3896 10680 : lsn_range.clone()
3897 : },
3898 : )
3899 10 : .await?,
3900 : );
3901 17492425 : }
3902 :
3903 17503157 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
3904 0 : Err(CompactionError::Other(anyhow::anyhow!(
3905 0 : "failpoint delta-layer-writer-fail-before-finish"
3906 0 : )))
3907 17503157 : });
3908 :
3909 17503157 : if !self.shard_identity.is_key_disposable(&key) {
3910 17503157 : writer.as_mut().unwrap().put_value(key, lsn, value).await?;
3911 : } else {
3912 0 : debug!(
3913 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
3914 0 : key,
3915 0 : self.shard_identity.get_shard_number(&key)
3916 0 : );
3917 : }
3918 :
3919 17503157 : if !new_layers.is_empty() {
3920 14171587 : fail_point!("after-timeline-compacted-first-L1");
3921 14171587 : }
3922 :
3923 17503157 : prev_key = Some(key);
3924 : }
3925 292 : if let Some(writer) = writer {
3926 292 : new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
3927 0 : }
3928 :
3929 : // Sync layers
3930 292 : if !new_layers.is_empty() {
3931 : // Print a warning if the created layer is larger than double the target size
3932 : // Add two pages for potential overhead. This should in theory be already
3933 : // accounted for in the target calculation, but for very small targets,
3934 : // we still might easily hit the limit otherwise.
3935 292 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
3936 10730 : for layer in new_layers.iter() {
3937 10730 : if layer.layer_desc().file_size > warn_limit {
3938 0 : warn!(
3939 0 : %layer,
3940 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
3941 0 : );
3942 10730 : }
3943 : }
3944 :
3945 : // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
3946 292 : let layer_paths: Vec<Utf8PathBuf> = new_layers
3947 292 : .iter()
3948 10730 : .map(|l| l.local_path().to_owned())
3949 292 : .collect();
3950 292 :
3951 292 : // Fsync all the layer files and directory using multiple threads to
3952 292 : // minimize latency.
3953 292 : par_fsync::par_fsync_async(&layer_paths)
3954 527 : .await
3955 292 : .context("fsync all new layers")?;
3956 :
3957 292 : let timeline_dir = self
3958 292 : .conf
3959 292 : .timeline_path(&self.tenant_shard_id, &self.timeline_id);
3960 292 :
3961 292 : par_fsync::par_fsync_async(&[timeline_dir])
3962 323 : .await
3963 292 : .context("fsync of timeline dir")?;
3964 0 : }
3965 :
3966 292 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
3967 292 : stats.new_deltas_count = Some(new_layers.len());
3968 10730 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
3969 292 :
3970 292 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
3971 292 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
3972 : {
3973 292 : Ok(stats_json) => {
3974 292 : info!(
3975 292 : stats_json = stats_json.as_str(),
3976 292 : "compact_level0_phase1 stats available"
3977 292 : )
3978 : }
3979 0 : Err(e) => {
3980 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
3981 : }
3982 : }
3983 :
3984 292 : Ok(CompactLevel0Phase1Result {
3985 292 : new_layers,
3986 292 : deltas_to_compact: deltas_to_compact
3987 292 : .into_iter()
3988 4213 : .map(|x| x.drop_eviction_guard())
3989 292 : .collect::<Vec<_>>(),
3990 292 : })
3991 1518 : }
3992 :
3993 : ///
3994 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
3995 : /// as Level 1 files.
3996 : ///
3997 1519 : async fn compact_level0(
3998 1519 : self: &Arc<Self>,
3999 1519 : target_file_size: u64,
4000 1519 : ctx: &RequestContext,
4001 1519 : ) -> Result<(), CompactionError> {
4002 : let CompactLevel0Phase1Result {
4003 1518 : new_layers,
4004 1518 : deltas_to_compact,
4005 : } = {
4006 1519 : let phase1_span = info_span!("compact_level0_phase1");
4007 1519 : let ctx = ctx.attached_child();
4008 1519 : let mut stats = CompactLevel0Phase1StatsBuilder {
4009 1519 : version: Some(2),
4010 1519 : tenant_id: Some(self.tenant_shard_id),
4011 1519 : timeline_id: Some(self.timeline_id),
4012 1519 : ..Default::default()
4013 1519 : };
4014 1519 :
4015 1519 : let begin = tokio::time::Instant::now();
4016 1519 : let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
4017 1519 : let now = tokio::time::Instant::now();
4018 1519 : stats.read_lock_acquisition_micros =
4019 1519 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
4020 1519 : self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
4021 1519 : .instrument(phase1_span)
4022 331540 : .await?
4023 : };
4024 :
4025 1518 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
4026 : // nothing to do
4027 1223 : return Ok(());
4028 295 : }
4029 :
4030 295 : let mut guard = self.layers.write().await;
4031 :
4032 295 : let mut duplicated_layers = HashSet::new();
4033 295 :
4034 295 : let mut insert_layers = Vec::with_capacity(new_layers.len());
4035 :
4036 11065 : for l in &new_layers {
4037 10770 : if guard.contains(l.as_ref()) {
4038 : // expected in tests
4039 40 : tracing::error!(layer=%l, "duplicated L1 layer");
4040 :
4041 : // good ways to cause a duplicate: we repeatedly error after taking the writelock
4042 : // `guard` on self.layers. as of writing this, there are no error returns except
4043 : // for compact_level0_phase1 creating an L0, which does not happen in practice
4044 : // because we have not implemented L0 => L0 compaction.
4045 40 : duplicated_layers.insert(l.layer_desc().key());
4046 10730 : } else if LayerMap::is_l0(l.layer_desc()) {
4047 0 : return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
4048 10730 : } else {
4049 10730 : insert_layers.push(l.clone());
4050 10730 : }
4051 : }
4052 :
4053 295 : let remove_layers = {
4054 295 : let mut deltas_to_compact = deltas_to_compact;
4055 295 : // only remove those inputs which were not outputs
4056 4253 : deltas_to_compact.retain(|l| !duplicated_layers.contains(&l.layer_desc().key()));
4057 295 : deltas_to_compact
4058 295 : };
4059 295 :
4060 295 : // deletion will happen later, the layer file manager calls garbage_collect_on_drop
4061 295 : guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
4062 :
4063 295 : if let Some(remote_client) = self.remote_client.as_ref() {
4064 295 : remote_client.schedule_compaction_update(&remove_layers, &new_layers)?;
4065 0 : }
4066 :
4067 295 : drop_wlock(guard);
4068 295 :
4069 295 : Ok(())
4070 1518 : }
4071 :
4072 : /// Update information about which layer files need to be retained on
4073 : /// garbage collection. This is separate from actually performing the GC,
4074 : /// and is updated more frequently, so that compaction can remove obsolete
4075 : /// page versions more aggressively.
4076 : ///
4077 : /// TODO: that's wishful thinking, compaction doesn't actually do that
4078 : /// currently.
4079 : ///
4080 : /// The caller specifies how much history is needed with the 3 arguments:
4081 : ///
4082 : /// retain_lsns: keep a version of each page at these LSNs
4083 : /// cutoff_horizon: also keep everything newer than this LSN
4084 : /// pitr: the time duration required to keep data for PITR
4085 : ///
4086 : /// The 'retain_lsns' list is currently used to prevent removing files that
4087 : /// are needed by child timelines. In the future, the user might be able to
4088 : /// name additional points in time to retain. The caller is responsible for
4089 : /// collecting that information.
4090 : ///
4091 : /// The 'cutoff_horizon' point is used to retain recent versions that might still be
4092 : /// needed by read-only nodes. (As of this writing, the caller just passes
4093 : /// the latest LSN subtracted by a constant, and doesn't do anything smart
4094 : /// to figure out what read-only nodes might actually need.)
4095 : ///
4096 : /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
4097 : /// whether a record is needed for PITR.
4098 : ///
4099 : /// NOTE: This function holds a short-lived lock to protect the 'gc_info'
4100 : /// field, so that the three values passed as argument are stored
4101 : /// atomically. But the caller is responsible for ensuring that no new
4102 : /// branches are created that would need to be included in 'retain_lsns',
4103 : /// for example. The caller should hold `Tenant::gc_cs` lock to ensure
4104 : /// that.
4105 : ///
4106 0 : #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
4107 : pub(super) async fn update_gc_info(
4108 : &self,
4109 : retain_lsns: Vec<Lsn>,
4110 : cutoff_horizon: Lsn,
4111 : pitr: Duration,
4112 : cancel: &CancellationToken,
4113 : ctx: &RequestContext,
4114 : ) -> anyhow::Result<()> {
4115 : // First, calculate pitr_cutoff_timestamp and then convert it to LSN.
4116 : //
4117 : // Some unit tests depend on garbage-collection working even when
4118 : // CLOG data is missing, so that find_lsn_for_timestamp() doesn't
4119 : // work, so avoid calling it altogether if time-based retention is not
4120 : // configured. It would be pointless anyway.
4121 : let pitr_cutoff = if pitr != Duration::ZERO {
4122 : let now = SystemTime::now();
4123 : if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
4124 : let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
4125 :
4126 : match self
4127 : .find_lsn_for_timestamp(pitr_timestamp, cancel, ctx)
4128 : .await?
4129 : {
4130 : LsnForTimestamp::Present(lsn) => lsn,
4131 : LsnForTimestamp::Future(lsn) => {
4132 : // The timestamp is in the future. That sounds impossible,
4133 : // but what it really means is that there hasn't been
4134 : // any commits since the cutoff timestamp.
4135 0 : debug!("future({})", lsn);
4136 : cutoff_horizon
4137 : }
4138 : LsnForTimestamp::Past(lsn) => {
4139 0 : debug!("past({})", lsn);
4140 : // conservative, safe default is to remove nothing, when we
4141 : // have no commit timestamp data available
4142 : *self.get_latest_gc_cutoff_lsn()
4143 : }
4144 : LsnForTimestamp::NoData(lsn) => {
4145 0 : debug!("nodata({})", lsn);
4146 : // conservative, safe default is to remove nothing, when we
4147 : // have no commit timestamp data available
4148 : *self.get_latest_gc_cutoff_lsn()
4149 : }
4150 : }
4151 : } else {
4152 : // If we don't have enough data to convert to LSN,
4153 : // play safe and don't remove any layers.
4154 : *self.get_latest_gc_cutoff_lsn()
4155 : }
4156 : } else {
4157 : // No time-based retention was configured. Set time-based cutoff to
4158 : // same as LSN based.
4159 : cutoff_horizon
4160 : };
4161 :
4162 : // Grab the lock and update the values
4163 : *self.gc_info.write().unwrap() = GcInfo {
4164 : retain_lsns,
4165 : horizon_cutoff: cutoff_horizon,
4166 : pitr_cutoff,
4167 : };
4168 :
4169 : Ok(())
4170 : }
4171 :
4172 : /// Garbage collect layer files on a timeline that are no longer needed.
4173 : ///
4174 : /// Currently, we don't make any attempt at removing unneeded page versions
4175 : /// within a layer file. We can only remove the whole file if it's fully
4176 : /// obsolete.
4177 720 : pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
4178 720 : // this is most likely the background tasks, but it might be the spawned task from
4179 720 : // immediate_gc
4180 720 : let cancel = crate::task_mgr::shutdown_token();
4181 720 : let _g = tokio::select! {
4182 719 : guard = self.gc_lock.lock() => guard,
4183 : _ = self.cancel.cancelled() => return Ok(GcResult::default()),
4184 : _ = cancel.cancelled() => return Ok(GcResult::default()),
4185 : };
4186 719 : let timer = self.metrics.garbage_collect_histo.start_timer();
4187 :
4188 0 : fail_point!("before-timeline-gc");
4189 :
4190 : // Is the timeline being deleted?
4191 719 : if self.is_stopping() {
4192 0 : anyhow::bail!("timeline is Stopping");
4193 719 : }
4194 719 :
4195 719 : let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
4196 719 : let gc_info = self.gc_info.read().unwrap();
4197 719 :
4198 719 : let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
4199 719 : let pitr_cutoff = gc_info.pitr_cutoff;
4200 719 : let retain_lsns = gc_info.retain_lsns.clone();
4201 719 : (horizon_cutoff, pitr_cutoff, retain_lsns)
4202 719 : };
4203 719 :
4204 719 : let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
4205 719 : let standby_horizon = self.standby_horizon.load();
4206 719 : let new_gc_cutoff = if standby_horizon != Lsn::INVALID {
4207 10 : Lsn::min(standby_horizon, new_gc_cutoff)
4208 : } else {
4209 709 : new_gc_cutoff
4210 : };
4211 :
4212 : // Reset standby horizon to ignore it if it is not updated till next GC
4213 719 : self.standby_horizon.store(Lsn::INVALID);
4214 :
4215 719 : let res = self
4216 719 : .gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
4217 719 : .instrument(
4218 719 : info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
4219 : )
4220 99 : .await?;
4221 :
4222 : // only record successes
4223 719 : timer.stop_and_record();
4224 719 :
4225 719 : Ok(res)
4226 720 : }
4227 :
4228 719 : async fn gc_timeline(
4229 719 : &self,
4230 719 : horizon_cutoff: Lsn,
4231 719 : pitr_cutoff: Lsn,
4232 719 : retain_lsns: Vec<Lsn>,
4233 719 : new_gc_cutoff: Lsn,
4234 719 : ) -> anyhow::Result<GcResult> {
4235 719 : let now = SystemTime::now();
4236 719 : let mut result: GcResult = GcResult::default();
4237 719 :
4238 719 : // Nothing to GC. Return early.
4239 719 : let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
4240 719 : if latest_gc_cutoff >= new_gc_cutoff {
4241 98 : info!(
4242 98 : "Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
4243 98 : );
4244 98 : return Ok(result);
4245 621 : }
4246 :
4247 : // We need to ensure that no one tries to read page versions or create
4248 : // branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
4249 : // for details. This will block until the old value is no longer in use.
4250 : //
4251 : // The GC cutoff should only ever move forwards.
4252 621 : let waitlist = {
4253 621 : let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
4254 621 : ensure!(
4255 621 : *write_guard <= new_gc_cutoff,
4256 0 : "Cannot move GC cutoff LSN backwards (was {}, new {})",
4257 0 : *write_guard,
4258 : new_gc_cutoff
4259 : );
4260 621 : write_guard.store_and_unlock(new_gc_cutoff)
4261 621 : };
4262 621 : waitlist.wait().await;
4263 :
4264 621 : info!("GC starting");
4265 :
4266 0 : debug!("retain_lsns: {:?}", retain_lsns);
4267 :
4268 621 : let mut layers_to_remove = Vec::new();
4269 621 : let mut wanted_image_layers = KeySpaceRandomAccum::default();
4270 :
4271 : // Scan all layers in the timeline (remote or on-disk).
4272 : //
4273 : // Garbage collect the layer if all conditions are satisfied:
4274 : // 1. it is older than cutoff LSN;
4275 : // 2. it is older than PITR interval;
4276 : // 3. it doesn't need to be retained for 'retain_lsns';
4277 : // 4. newer on-disk image layers cover the layer's whole key range
4278 : //
4279 : // TODO holding a write lock is too agressive and avoidable
4280 621 : let mut guard = self.layers.write().await;
4281 621 : let layers = guard.layer_map();
4282 16897 : 'outer: for l in layers.iter_historic_layers() {
4283 16897 : result.layers_total += 1;
4284 16897 :
4285 16897 : // 1. Is it newer than GC horizon cutoff point?
4286 16897 : if l.get_lsn_range().end > horizon_cutoff {
4287 0 : debug!(
4288 0 : "keeping {} because it's newer than horizon_cutoff {}",
4289 0 : l.filename(),
4290 0 : horizon_cutoff,
4291 0 : );
4292 1660 : result.layers_needed_by_cutoff += 1;
4293 1660 : continue 'outer;
4294 15237 : }
4295 15237 :
4296 15237 : // 2. It is newer than PiTR cutoff point?
4297 15237 : if l.get_lsn_range().end > pitr_cutoff {
4298 0 : debug!(
4299 0 : "keeping {} because it's newer than pitr_cutoff {}",
4300 0 : l.filename(),
4301 0 : pitr_cutoff,
4302 0 : );
4303 40 : result.layers_needed_by_pitr += 1;
4304 40 : continue 'outer;
4305 15197 : }
4306 :
4307 : // 3. Is it needed by a child branch?
4308 : // NOTE With that we would keep data that
4309 : // might be referenced by child branches forever.
4310 : // We can track this in child timeline GC and delete parent layers when
4311 : // they are no longer needed. This might be complicated with long inheritance chains.
4312 : //
4313 : // TODO Vec is not a great choice for `retain_lsns`
4314 15221 : for retain_lsn in &retain_lsns {
4315 : // start_lsn is inclusive
4316 458 : if &l.get_lsn_range().start <= retain_lsn {
4317 0 : debug!(
4318 0 : "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
4319 0 : l.filename(),
4320 0 : retain_lsn,
4321 0 : l.is_incremental(),
4322 0 : );
4323 434 : result.layers_needed_by_branches += 1;
4324 434 : continue 'outer;
4325 24 : }
4326 : }
4327 :
4328 : // 4. Is there a later on-disk layer for this relation?
4329 : //
4330 : // The end-LSN is exclusive, while disk_consistent_lsn is
4331 : // inclusive. For example, if disk_consistent_lsn is 100, it is
4332 : // OK for a delta layer to have end LSN 101, but if the end LSN
4333 : // is 102, then it might not have been fully flushed to disk
4334 : // before crash.
4335 : //
4336 : // For example, imagine that the following layers exist:
4337 : //
4338 : // 1000 - image (A)
4339 : // 1000-2000 - delta (B)
4340 : // 2000 - image (C)
4341 : // 2000-3000 - delta (D)
4342 : // 3000 - image (E)
4343 : //
4344 : // If GC horizon is at 2500, we can remove layers A and B, but
4345 : // we cannot remove C, even though it's older than 2500, because
4346 : // the delta layer 2000-3000 depends on it.
4347 14763 : if !layers
4348 14763 : .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
4349 : {
4350 0 : debug!("keeping {} because it is the latest layer", l.filename());
4351 : // Collect delta key ranges that need image layers to allow garbage
4352 : // collecting the layers.
4353 : // It is not so obvious whether we need to propagate information only about
4354 : // delta layers. Image layers can form "stairs" preventing old image from been deleted.
4355 : // But image layers are in any case less sparse than delta layers. Also we need some
4356 : // protection from replacing recent image layers with new one after each GC iteration.
4357 13566 : if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
4358 0 : wanted_image_layers.add_range(l.get_key_range());
4359 13566 : }
4360 13566 : result.layers_not_updated += 1;
4361 13566 : continue 'outer;
4362 1197 : }
4363 :
4364 : // We didn't find any reason to keep this file, so remove it.
4365 0 : debug!(
4366 0 : "garbage collecting {} is_dropped: xx is_incremental: {}",
4367 0 : l.filename(),
4368 0 : l.is_incremental(),
4369 0 : );
4370 1197 : layers_to_remove.push(l);
4371 : }
4372 621 : self.wanted_image_layers
4373 621 : .lock()
4374 621 : .unwrap()
4375 621 : .replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
4376 621 :
4377 621 : if !layers_to_remove.is_empty() {
4378 : // Persist the new GC cutoff value in the metadata file, before
4379 : // we actually remove anything.
4380 : //
4381 : // This does not in fact have any effect as we no longer consider local metadata unless
4382 : // running without remote storage.
4383 : //
4384 : // This unconditionally schedules also an index_part.json update, even though, we will
4385 : // be doing one a bit later with the unlinked gc'd layers.
4386 : //
4387 : // TODO: remove when implementing <https://github.com/neondatabase/neon/issues/4099>.
4388 20 : self.update_metadata_file(self.disk_consistent_lsn.load(), None)
4389 0 : .await?;
4390 :
4391 20 : let gc_layers = layers_to_remove
4392 20 : .iter()
4393 1197 : .map(|x| guard.get_from_desc(x))
4394 20 : .collect::<Vec<Layer>>();
4395 20 :
4396 20 : result.layers_removed = gc_layers.len() as u64;
4397 :
4398 20 : if let Some(remote_client) = self.remote_client.as_ref() {
4399 20 : remote_client.schedule_gc_update(&gc_layers)?;
4400 0 : }
4401 :
4402 20 : guard.finish_gc_timeline(&gc_layers);
4403 20 :
4404 20 : #[cfg(feature = "testing")]
4405 20 : {
4406 20 : result.doomed_layers = gc_layers;
4407 20 : }
4408 601 : }
4409 :
4410 621 : info!(
4411 621 : "GC completed removing {} layers, cutoff {}",
4412 621 : result.layers_removed, new_gc_cutoff
4413 621 : );
4414 :
4415 621 : result.elapsed = now.elapsed()?;
4416 621 : Ok(result)
4417 719 : }
4418 :
4419 : /// Reconstruct a value, using the given base image and WAL records in 'data'.
4420 8623837 : async fn reconstruct_value(
4421 8623837 : &self,
4422 8623837 : key: Key,
4423 8623837 : request_lsn: Lsn,
4424 8623837 : mut data: ValueReconstructState,
4425 8623839 : ) -> Result<Bytes, PageReconstructError> {
4426 8623839 : // Perform WAL redo if needed
4427 8623839 : data.records.reverse();
4428 8623839 :
4429 8623839 : // If we have a page image, and no WAL, we're all set
4430 8623839 : if data.records.is_empty() {
4431 5762922 : if let Some((img_lsn, img)) = &data.img {
4432 0 : trace!(
4433 0 : "found page image for key {} at {}, no WAL redo required, req LSN {}",
4434 0 : key,
4435 0 : img_lsn,
4436 0 : request_lsn,
4437 0 : );
4438 5762922 : Ok(img.clone())
4439 : } else {
4440 0 : Err(PageReconstructError::from(anyhow!(
4441 0 : "base image for {key} at {request_lsn} not found"
4442 0 : )))
4443 : }
4444 : } else {
4445 : // We need to do WAL redo.
4446 : //
4447 : // If we don't have a base image, then the oldest WAL record better initialize
4448 : // the page
4449 2860917 : if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
4450 0 : Err(PageReconstructError::from(anyhow!(
4451 0 : "Base image for {} at {} not found, but got {} WAL records",
4452 0 : key,
4453 0 : request_lsn,
4454 0 : data.records.len()
4455 0 : )))
4456 : } else {
4457 2860917 : if data.img.is_some() {
4458 0 : trace!(
4459 0 : "found {} WAL records and a base image for {} at {}, performing WAL redo",
4460 0 : data.records.len(),
4461 0 : key,
4462 0 : request_lsn
4463 0 : );
4464 : } else {
4465 0 : trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
4466 : };
4467 :
4468 2860917 : let last_rec_lsn = data.records.last().unwrap().0;
4469 :
4470 2860917 : let img = match self
4471 2860917 : .walredo_mgr
4472 2860917 : .as_ref()
4473 2860917 : .context("timeline has no walredo manager")
4474 2860917 : .map_err(PageReconstructError::WalRedo)?
4475 2860917 : .request_redo(key, request_lsn, data.img, data.records, self.pg_version)
4476 0 : .await
4477 2860917 : .context("reconstruct a page image")
4478 : {
4479 2860917 : Ok(img) => img,
4480 0 : Err(e) => return Err(PageReconstructError::WalRedo(e)),
4481 : };
4482 :
4483 2860917 : if img.len() == page_cache::PAGE_SZ {
4484 2857268 : let cache = page_cache::get();
4485 2857268 : if let Err(e) = cache
4486 2857268 : .memorize_materialized_page(
4487 2857268 : self.tenant_shard_id,
4488 2857268 : self.timeline_id,
4489 2857268 : key,
4490 2857268 : last_rec_lsn,
4491 2857268 : &img,
4492 2857268 : )
4493 10099 : .await
4494 2857268 : .context("Materialized page memoization failed")
4495 : {
4496 0 : return Err(PageReconstructError::from(e));
4497 2857268 : }
4498 3649 : }
4499 :
4500 2860917 : Ok(img)
4501 : }
4502 : }
4503 8623839 : }
4504 :
4505 2 : pub(crate) async fn spawn_download_all_remote_layers(
4506 2 : self: Arc<Self>,
4507 2 : request: DownloadRemoteLayersTaskSpawnRequest,
4508 2 : ) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
4509 2 : use pageserver_api::models::DownloadRemoteLayersTaskState;
4510 2 :
4511 2 : // this is not really needed anymore; it has tests which really check the return value from
4512 2 : // http api. it would be better not to maintain this anymore.
4513 2 :
4514 2 : let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
4515 2 : if let Some(st) = &*status_guard {
4516 1 : match &st.state {
4517 : DownloadRemoteLayersTaskState::Running => {
4518 0 : return Err(st.clone());
4519 : }
4520 : DownloadRemoteLayersTaskState::ShutDown
4521 1 : | DownloadRemoteLayersTaskState::Completed => {
4522 1 : *status_guard = None;
4523 1 : }
4524 : }
4525 1 : }
4526 :
4527 2 : let self_clone = Arc::clone(&self);
4528 2 : let task_id = task_mgr::spawn(
4529 2 : task_mgr::BACKGROUND_RUNTIME.handle(),
4530 2 : task_mgr::TaskKind::DownloadAllRemoteLayers,
4531 2 : Some(self.tenant_shard_id),
4532 2 : Some(self.timeline_id),
4533 2 : "download all remote layers task",
4534 : false,
4535 2 : async move {
4536 9 : self_clone.download_all_remote_layers(request).await;
4537 2 : let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
4538 2 : match &mut *status_guard {
4539 : None => {
4540 0 : warn!("tasks status is supposed to be Some(), since we are running");
4541 : }
4542 2 : Some(st) => {
4543 2 : let exp_task_id = format!("{}", task_mgr::current_task_id().unwrap());
4544 2 : if st.task_id != exp_task_id {
4545 0 : warn!("task id changed while we were still running, expecting {} but have {}", exp_task_id, st.task_id);
4546 2 : } else {
4547 2 : st.state = DownloadRemoteLayersTaskState::Completed;
4548 2 : }
4549 : }
4550 : };
4551 2 : Ok(())
4552 2 : }
4553 2 : .instrument(info_span!(parent: None, "download_all_remote_layers", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
4554 : );
4555 :
4556 2 : let initial_info = DownloadRemoteLayersTaskInfo {
4557 2 : task_id: format!("{task_id}"),
4558 2 : state: DownloadRemoteLayersTaskState::Running,
4559 2 : total_layer_count: 0,
4560 2 : successful_download_count: 0,
4561 2 : failed_download_count: 0,
4562 2 : };
4563 2 : *status_guard = Some(initial_info.clone());
4564 2 :
4565 2 : Ok(initial_info)
4566 2 : }
4567 :
4568 2 : async fn download_all_remote_layers(
4569 2 : self: &Arc<Self>,
4570 2 : request: DownloadRemoteLayersTaskSpawnRequest,
4571 2 : ) {
4572 : use pageserver_api::models::DownloadRemoteLayersTaskState;
4573 :
4574 2 : let remaining = {
4575 2 : let guard = self.layers.read().await;
4576 2 : guard
4577 2 : .layer_map()
4578 2 : .iter_historic_layers()
4579 10 : .map(|desc| guard.get_from_desc(&desc))
4580 2 : .collect::<Vec<_>>()
4581 2 : };
4582 2 : let total_layer_count = remaining.len();
4583 2 :
4584 2 : macro_rules! lock_status {
4585 2 : ($st:ident) => {
4586 2 : let mut st = self.download_all_remote_layers_task_info.write().unwrap();
4587 2 : let st = st
4588 2 : .as_mut()
4589 2 : .expect("this function is only called after the task has been spawned");
4590 2 : assert_eq!(
4591 2 : st.task_id,
4592 2 : format!(
4593 2 : "{}",
4594 2 : task_mgr::current_task_id().expect("we run inside a task_mgr task")
4595 2 : )
4596 2 : );
4597 2 : let $st = st;
4598 2 : };
4599 2 : }
4600 2 :
4601 2 : {
4602 2 : lock_status!(st);
4603 2 : st.total_layer_count = total_layer_count as u64;
4604 2 : }
4605 2 :
4606 2 : let mut remaining = remaining.into_iter();
4607 2 : let mut have_remaining = true;
4608 2 : let mut js = tokio::task::JoinSet::new();
4609 2 :
4610 2 : let cancel = task_mgr::shutdown_token();
4611 2 :
4612 2 : let limit = request.max_concurrent_downloads;
4613 :
4614 : loop {
4615 12 : while js.len() < limit.get() && have_remaining && !cancel.is_cancelled() {
4616 12 : let Some(next) = remaining.next() else {
4617 2 : have_remaining = false;
4618 2 : break;
4619 : };
4620 :
4621 10 : let span = tracing::info_span!("download", layer = %next);
4622 :
4623 10 : js.spawn(
4624 10 : async move {
4625 26 : let res = next.download().await;
4626 10 : (next, res)
4627 10 : }
4628 10 : .instrument(span),
4629 10 : );
4630 : }
4631 :
4632 12 : while let Some(res) = js.join_next().await {
4633 10 : match res {
4634 : Ok((_, Ok(_))) => {
4635 5 : lock_status!(st);
4636 5 : st.successful_download_count += 1;
4637 : }
4638 5 : Ok((layer, Err(e))) => {
4639 5 : tracing::error!(%layer, "download failed: {e:#}");
4640 5 : lock_status!(st);
4641 5 : st.failed_download_count += 1;
4642 : }
4643 0 : Err(je) if je.is_cancelled() => unreachable!("not used here"),
4644 0 : Err(je) if je.is_panic() => {
4645 0 : lock_status!(st);
4646 0 : st.failed_download_count += 1;
4647 : }
4648 0 : Err(je) => tracing::warn!("unknown joinerror: {je:?}"),
4649 : }
4650 : }
4651 :
4652 2 : if js.is_empty() && (!have_remaining || cancel.is_cancelled()) {
4653 2 : break;
4654 0 : }
4655 : }
4656 :
4657 : {
4658 2 : lock_status!(st);
4659 2 : st.state = DownloadRemoteLayersTaskState::Completed;
4660 2 : }
4661 2 : }
4662 :
4663 50 : pub(crate) fn get_download_all_remote_layers_task_info(
4664 50 : &self,
4665 50 : ) -> Option<DownloadRemoteLayersTaskInfo> {
4666 50 : self.download_all_remote_layers_task_info
4667 50 : .read()
4668 50 : .unwrap()
4669 50 : .clone()
4670 50 : }
4671 : }
4672 :
4673 : impl Timeline {
4674 : /// Returns non-remote layers for eviction.
4675 43 : pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
4676 43 : let guard = self.layers.read().await;
4677 43 : let layers = guard.layer_map();
4678 43 :
4679 43 : let mut max_layer_size: Option<u64> = None;
4680 43 : let mut resident_layers = Vec::new();
4681 :
4682 3018 : for l in layers.iter_historic_layers() {
4683 3018 : let file_size = l.file_size();
4684 3018 : max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
4685 3018 :
4686 3018 : let l = guard.get_from_desc(&l);
4687 :
4688 3018 : let l = match l.keep_resident().await {
4689 3017 : Ok(Some(l)) => l,
4690 1 : Ok(None) => continue,
4691 0 : Err(e) => {
4692 : // these should not happen, but we cannot make them statically impossible right
4693 : // now.
4694 0 : tracing::warn!(layer=%l, "failed to keep the layer resident: {e:#}");
4695 0 : continue;
4696 : }
4697 : };
4698 :
4699 3017 : let last_activity_ts = l.access_stats().latest_activity().unwrap_or_else(|| {
4700 0 : // We only use this fallback if there's an implementation error.
4701 0 : // `latest_activity` already does rate-limited warn!() log.
4702 0 : debug!(layer=%l, "last_activity returns None, using SystemTime::now");
4703 0 : SystemTime::now()
4704 3017 : });
4705 3017 :
4706 3017 : resident_layers.push(EvictionCandidate {
4707 3017 : layer: l.drop_eviction_guard().into(),
4708 3017 : last_activity_ts,
4709 3017 : relative_last_activity: finite_f32::FiniteF32::ZERO,
4710 3017 : });
4711 : }
4712 :
4713 43 : DiskUsageEvictionInfo {
4714 43 : max_layer_size,
4715 43 : resident_layers,
4716 43 : }
4717 43 : }
4718 :
4719 22356 : pub(crate) fn get_shard_index(&self) -> ShardIndex {
4720 22356 : ShardIndex {
4721 22356 : shard_number: self.tenant_shard_id.shard_number,
4722 22356 : shard_count: self.tenant_shard_id.shard_count,
4723 22356 : }
4724 22356 : }
4725 : }
4726 :
4727 : type TraversalPathItem = (
4728 : ValueReconstructResult,
4729 : Lsn,
4730 : Box<dyn Send + FnOnce() -> TraversalId>,
4731 : );
4732 :
4733 : /// Helper function for get_reconstruct_data() to add the path of layers traversed
4734 : /// to an error, as anyhow context information.
4735 1264 : fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructError {
4736 1264 : // We want the original 'msg' to be the outermost context. The outermost context
4737 1264 : // is the most high-level information, which also gets propagated to the client.
4738 1264 : let mut msg_iter = path
4739 1264 : .into_iter()
4740 1388 : .map(|(r, c, l)| {
4741 1388 : format!(
4742 1388 : "layer traversal: result {:?}, cont_lsn {}, layer: {}",
4743 1388 : r,
4744 1388 : c,
4745 1388 : l(),
4746 1388 : )
4747 1388 : })
4748 1264 : .chain(std::iter::once(msg));
4749 1264 : // Construct initial message from the first traversed layer
4750 1264 : let err = anyhow!(msg_iter.next().unwrap());
4751 1264 :
4752 1264 : // Append all subsequent traversals, and the error message 'msg', as contexts.
4753 1388 : let msg = msg_iter.fold(err, |err, msg| err.context(msg));
4754 1264 : PageReconstructError::from(msg)
4755 1264 : }
4756 :
4757 : /// Various functions to mutate the timeline.
4758 : // TODO Currently, Deref is used to allow easy access to read methods from this trait.
4759 : // This is probably considered a bad practice in Rust and should be fixed eventually,
4760 : // but will cause large code changes.
4761 : pub(crate) struct TimelineWriter<'a> {
4762 : tl: &'a Timeline,
4763 : _write_guard: tokio::sync::MutexGuard<'a, ()>,
4764 : }
4765 :
4766 : impl Deref for TimelineWriter<'_> {
4767 : type Target = Timeline;
4768 :
4769 0 : fn deref(&self) -> &Self::Target {
4770 0 : self.tl
4771 0 : }
4772 : }
4773 :
4774 : impl<'a> TimelineWriter<'a> {
4775 : /// Put a new page version that can be constructed from a WAL record
4776 : ///
4777 : /// This will implicitly extend the relation, if the page is beyond the
4778 : /// current end-of-file.
4779 1214102 : pub(crate) async fn put(
4780 1214102 : &self,
4781 1214102 : key: Key,
4782 1214102 : lsn: Lsn,
4783 1214102 : value: &Value,
4784 1214102 : ctx: &RequestContext,
4785 1214102 : ) -> anyhow::Result<()> {
4786 1214102 : self.tl.put_value(key, lsn, value, ctx).await
4787 1214102 : }
4788 :
4789 1718651 : pub(crate) async fn put_batch(
4790 1718651 : &self,
4791 1718651 : batch: &HashMap<Key, Vec<(Lsn, Value)>>,
4792 1718651 : ctx: &RequestContext,
4793 1718651 : ) -> anyhow::Result<()> {
4794 1718651 : self.tl.put_values(batch, ctx).await
4795 1718650 : }
4796 :
4797 19216 : pub(crate) async fn delete_batch(&self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
4798 19216 : self.tl.put_tombstones(batch).await
4799 19216 : }
4800 :
4801 : /// Track the end of the latest digested WAL record.
4802 : /// Remember the (end of) last valid WAL record remembered in the timeline.
4803 : ///
4804 : /// Call this after you have finished writing all the WAL up to 'lsn'.
4805 : ///
4806 : /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for
4807 : /// the 'lsn' or anything older. The previous last record LSN is stored alongside
4808 : /// the latest and can be read.
4809 76570323 : pub(crate) fn finish_write(&self, new_lsn: Lsn) {
4810 76570323 : self.tl.finish_write(new_lsn);
4811 76570323 : }
4812 :
4813 642421 : pub(crate) fn update_current_logical_size(&self, delta: i64) {
4814 642421 : self.tl.update_current_logical_size(delta)
4815 642421 : }
4816 : }
4817 :
4818 : // We need TimelineWriter to be send in upcoming conversion of
4819 : // Timeline::layers to tokio::sync::RwLock.
4820 2 : #[test]
4821 2 : fn is_send() {
4822 2 : fn _assert_send<T: Send>() {}
4823 2 : _assert_send::<TimelineWriter<'_>>();
4824 2 : }
4825 :
4826 : /// Add a suffix to a layer file's name: .{num}.old
4827 : /// Uses the first available num (starts at 0)
4828 1 : fn rename_to_backup(path: &Utf8Path) -> anyhow::Result<()> {
4829 1 : let filename = path
4830 1 : .file_name()
4831 1 : .ok_or_else(|| anyhow!("Path {path} don't have a file name"))?;
4832 1 : let mut new_path = path.to_owned();
4833 :
4834 1 : for i in 0u32.. {
4835 1 : new_path.set_file_name(format!("{filename}.{i}.old"));
4836 1 : if !new_path.exists() {
4837 1 : std::fs::rename(path, &new_path)
4838 1 : .with_context(|| format!("rename {path:?} to {new_path:?}"))?;
4839 1 : return Ok(());
4840 0 : }
4841 : }
4842 :
4843 0 : bail!("couldn't find an unused backup number for {:?}", path)
4844 1 : }
4845 :
4846 : #[cfg(test)]
4847 : mod tests {
4848 : use utils::{id::TimelineId, lsn::Lsn};
4849 :
4850 : use crate::tenant::{
4851 : harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline,
4852 : };
4853 :
4854 2 : #[tokio::test]
4855 2 : async fn two_layer_eviction_attempts_at_the_same_time() {
4856 2 : let harness =
4857 2 : TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
4858 2 :
4859 2 : let ctx = any_context();
4860 2 : let tenant = harness.try_load(&ctx).await.unwrap();
4861 2 : let timeline = tenant
4862 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
4863 7 : .await
4864 2 : .unwrap();
4865 :
4866 2 : let layer = find_some_layer(&timeline).await;
4867 2 : let layer = layer
4868 2 : .keep_resident()
4869 0 : .await
4870 2 : .expect("no download => no downloading errors")
4871 2 : .expect("should had been resident")
4872 2 : .drop_eviction_guard();
4873 2 :
4874 2 : let first = async { layer.evict_and_wait().await };
4875 2 : let second = async { layer.evict_and_wait().await };
4876 :
4877 3 : let (first, second) = tokio::join!(first, second);
4878 :
4879 2 : let res = layer.keep_resident().await;
4880 2 : assert!(matches!(res, Ok(None)), "{res:?}");
4881 :
4882 2 : match (first, second) {
4883 0 : (Ok(()), Ok(())) => {
4884 0 : // because there are no more timeline locks being taken on eviction path, we can
4885 0 : // witness all three outcomes here.
4886 0 : }
4887 2 : (Ok(()), Err(EvictionError::NotFound)) | (Err(EvictionError::NotFound), Ok(())) => {
4888 2 : // if one completes before the other, this is fine just as well.
4889 2 : }
4890 0 : other => unreachable!("unexpected {:?}", other),
4891 : }
4892 : }
4893 :
4894 2 : fn any_context() -> crate::context::RequestContext {
4895 2 : use crate::context::*;
4896 2 : use crate::task_mgr::*;
4897 2 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
4898 2 : }
4899 :
4900 2 : async fn find_some_layer(timeline: &Timeline) -> Layer {
4901 2 : let layers = timeline.layers.read().await;
4902 2 : let desc = layers
4903 2 : .layer_map()
4904 2 : .iter_historic_layers()
4905 2 : .next()
4906 2 : .expect("must find one layer to evict");
4907 2 :
4908 2 : layers.get_from_desc(&desc)
4909 2 : }
4910 : }
|