Line data Source code
1 : pub mod delete;
2 : mod eviction_task;
3 : mod init;
4 : pub mod layer_manager;
5 : pub(crate) mod logical_size;
6 : pub mod span;
7 : pub mod uninit;
8 : mod walreceiver;
9 :
10 : use anyhow::{anyhow, bail, ensure, Context, Result};
11 : use bytes::Bytes;
12 : use camino::{Utf8Path, Utf8PathBuf};
13 : use enumset::EnumSet;
14 : use fail::fail_point;
15 : use futures::stream::StreamExt;
16 : use itertools::Itertools;
17 : use once_cell::sync::Lazy;
18 : use pageserver_api::{
19 : keyspace::{key_range_size, KeySpaceAccum},
20 : models::{
21 : DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
22 : LayerMapInfo, TimelineState,
23 : },
24 : reltag::BlockNumber,
25 : shard::{ShardIdentity, TenantShardId},
26 : };
27 : use rand::Rng;
28 : use serde_with::serde_as;
29 : use storage_broker::BrokerClientChannel;
30 : use tokio::{
31 : runtime::Handle,
32 : sync::{oneshot, watch},
33 : };
34 : use tokio_util::sync::CancellationToken;
35 : use tracing::*;
36 : use utils::sync::gate::Gate;
37 :
38 : use std::ops::{Deref, Range};
39 : use std::pin::pin;
40 : use std::sync::atomic::Ordering as AtomicOrdering;
41 : use std::sync::{Arc, Mutex, RwLock, Weak};
42 : use std::time::{Duration, Instant, SystemTime};
43 : use std::{
44 : array,
45 : collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
46 : sync::atomic::AtomicU64,
47 : };
48 : use std::{
49 : cmp::{max, min, Ordering},
50 : ops::ControlFlow,
51 : };
52 :
53 : use crate::pgdatadir_mapping::DirectoryKind;
54 : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
55 : use crate::tenant::{
56 : layer_map::{LayerMap, SearchResult},
57 : metadata::{save_metadata, TimelineMetadata},
58 : par_fsync,
59 : };
60 : use crate::{
61 : context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder},
62 : disk_usage_eviction_task::DiskUsageEvictionInfo,
63 : };
64 : use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
65 : use crate::{
66 : disk_usage_eviction_task::finite_f32,
67 : tenant::storage_layer::{
68 : AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
69 : LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
70 : ValueReconstructState,
71 : },
72 : };
73 : use crate::{
74 : disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
75 : };
76 : use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
77 :
78 : use crate::config::PageServerConf;
79 : use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
80 : use crate::metrics::{
81 : TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
82 : };
83 : use crate::pgdatadir_mapping::CalculateLogicalSizeError;
84 : use crate::tenant::config::TenantConfOpt;
85 : use pageserver_api::key::{is_inherited_key, is_rel_fsm_block_key, is_rel_vm_block_key};
86 : use pageserver_api::reltag::RelTag;
87 : use pageserver_api::shard::ShardIndex;
88 :
89 : use postgres_connection::PgConnectionConfig;
90 : use postgres_ffi::to_pg_timestamp;
91 : use utils::{
92 : completion,
93 : generation::Generation,
94 : id::TimelineId,
95 : lsn::{AtomicLsn, Lsn, RecordLsn},
96 : seqwait::SeqWait,
97 : simple_rcu::{Rcu, RcuReadGuard},
98 : };
99 :
100 : use crate::page_cache;
101 : use crate::repository::GcResult;
102 : use crate::repository::{Key, Value};
103 : use crate::task_mgr;
104 : use crate::task_mgr::TaskKind;
105 : use crate::ZERO_PAGE;
106 :
107 : use self::delete::DeleteTimelineFlow;
108 : pub(super) use self::eviction_task::EvictionTaskTenantState;
109 : use self::eviction_task::EvictionTaskTimelineState;
110 : use self::layer_manager::LayerManager;
111 : use self::logical_size::LogicalSize;
112 : use self::walreceiver::{WalReceiver, WalReceiverConf};
113 :
114 : use super::config::TenantConf;
115 : use super::remote_timeline_client::index::IndexPart;
116 : use super::remote_timeline_client::RemoteTimelineClient;
117 : use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
118 : use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
119 :
120 45 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
121 : pub(super) enum FlushLoopState {
122 : NotStarted,
123 : Running {
124 : #[cfg(test)]
125 : expect_initdb_optimization: bool,
126 : #[cfg(test)]
127 : initdb_optimization_count: usize,
128 : },
129 : Exited,
130 : }
131 :
132 : /// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
133 0 : #[derive(Debug, Clone, PartialEq, Eq)]
134 : pub(crate) struct Hole {
135 : key_range: Range<Key>,
136 : coverage_size: usize,
137 : }
138 :
139 : impl Ord for Hole {
140 286 : fn cmp(&self, other: &Self) -> Ordering {
141 286 : other.coverage_size.cmp(&self.coverage_size) // inverse order
142 286 : }
143 : }
144 :
145 : impl PartialOrd for Hole {
146 286 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
147 286 : Some(self.cmp(other))
148 286 : }
149 : }
150 :
151 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
152 : /// Can be removed after all refactors are done.
153 295 : fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
154 295 : drop(rlock)
155 295 : }
156 :
157 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
158 : /// Can be removed after all refactors are done.
159 1994 : fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
160 1994 : drop(rlock)
161 1994 : }
162 :
163 : /// The outward-facing resources required to build a Timeline
164 : pub struct TimelineResources {
165 : pub remote_client: Option<RemoteTimelineClient>,
166 : pub deletion_queue_client: DeletionQueueClient,
167 : }
168 :
169 : pub struct Timeline {
170 : conf: &'static PageServerConf,
171 : tenant_conf: Arc<RwLock<AttachedTenantConf>>,
172 :
173 : myself: Weak<Self>,
174 :
175 : pub(crate) tenant_shard_id: TenantShardId,
176 : pub timeline_id: TimelineId,
177 :
178 : /// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
179 : /// Never changes for the lifetime of this [`Timeline`] object.
180 : ///
181 : /// This duplicates the generation stored in LocationConf, but that structure is mutable:
182 : /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
183 : pub(crate) generation: Generation,
184 :
185 : /// The detailed sharding information from our parent Tenant. This enables us to map keys
186 : /// to shards, and is constant through the lifetime of this Timeline.
187 : shard_identity: ShardIdentity,
188 :
189 : pub pg_version: u32,
190 :
191 : /// The tuple has two elements.
192 : /// 1. `LayerFileManager` keeps track of the various physical representations of the layer files (inmem, local, remote).
193 : /// 2. `LayerMap`, the acceleration data structure for `get_reconstruct_data`.
194 : ///
195 : /// `LayerMap` maps out the `(PAGE,LSN) / (KEY,LSN)` space, which is composed of `(KeyRange, LsnRange)` rectangles.
196 : /// We describe these rectangles through the `PersistentLayerDesc` struct.
197 : ///
198 : /// When we want to reconstruct a page, we first find the `PersistentLayerDesc`'s that we need for page reconstruction,
199 : /// using `LayerMap`. Then, we use `LayerFileManager` to get the `PersistentLayer`'s that correspond to the
200 : /// `PersistentLayerDesc`'s.
201 : ///
202 : /// Hence, it's important to keep things coherent. The `LayerFileManager` must always have an entry for all
203 : /// `PersistentLayerDesc`'s in the `LayerMap`. If it doesn't, `LayerFileManager::get_from_desc` will panic at
204 : /// runtime, e.g., during page reconstruction.
205 : ///
206 : /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
207 : /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
208 : pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
209 :
210 : /// Set of key ranges which should be covered by image layers to
211 : /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
212 : /// It is used by compaction task when it checks if new image layer should be created.
213 : /// Newly created image layer doesn't help to remove the delta layer, until the
214 : /// newly created image layer falls off the PITR horizon. So on next GC cycle,
215 : /// gc_timeline may still want the new image layer to be created. To avoid redundant
216 : /// image layers creation we should check if image layer exists but beyond PITR horizon.
217 : /// This is why we need remember GC cutoff LSN.
218 : ///
219 : wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
220 :
221 : last_freeze_at: AtomicLsn,
222 : // Atomic would be more appropriate here.
223 : last_freeze_ts: RwLock<Instant>,
224 :
225 : // WAL redo manager. `None` only for broken tenants.
226 : walredo_mgr: Option<Arc<super::WalRedoManager>>,
227 :
228 : /// Remote storage client.
229 : /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
230 : pub remote_client: Option<Arc<RemoteTimelineClient>>,
231 :
232 : // What page versions do we hold in the repository? If we get a
233 : // request > last_record_lsn, we need to wait until we receive all
234 : // the WAL up to the request. The SeqWait provides functions for
235 : // that. TODO: If we get a request for an old LSN, such that the
236 : // versions have already been garbage collected away, we should
237 : // throw an error, but we don't track that currently.
238 : //
239 : // last_record_lsn.load().last points to the end of last processed WAL record.
240 : //
241 : // We also remember the starting point of the previous record in
242 : // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
243 : // first WAL record when the node is started up. But here, we just
244 : // keep track of it.
245 : last_record_lsn: SeqWait<RecordLsn, Lsn>,
246 :
247 : // All WAL records have been processed and stored durably on files on
248 : // local disk, up to this LSN. On crash and restart, we need to re-process
249 : // the WAL starting from this point.
250 : //
251 : // Some later WAL records might have been processed and also flushed to disk
252 : // already, so don't be surprised to see some, but there's no guarantee on
253 : // them yet.
254 : disk_consistent_lsn: AtomicLsn,
255 :
256 : // Parent timeline that this timeline was branched from, and the LSN
257 : // of the branch point.
258 : ancestor_timeline: Option<Arc<Timeline>>,
259 : ancestor_lsn: Lsn,
260 :
261 : pub(super) metrics: TimelineMetrics,
262 :
263 : // `Timeline` doesn't write these metrics itself, but it manages the lifetime. Code
264 : // in `crate::page_service` writes these metrics.
265 : pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
266 :
267 : directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM],
268 :
269 : /// Ensures layers aren't frozen by checkpointer between
270 : /// [`Timeline::get_layer_for_write`] and layer reads.
271 : /// Locked automatically by [`TimelineWriter`] and checkpointer.
272 : /// Must always be acquired before the layer map/individual layer lock
273 : /// to avoid deadlock.
274 : write_lock: tokio::sync::Mutex<()>,
275 :
276 : /// Used to avoid multiple `flush_loop` tasks running
277 : pub(super) flush_loop_state: Mutex<FlushLoopState>,
278 :
279 : /// layer_flush_start_tx can be used to wake up the layer-flushing task.
280 : /// The value is a counter, incremented every time a new flush cycle is requested.
281 : /// The flush cycle counter is sent back on the layer_flush_done channel when
282 : /// the flush finishes. You can use that to wait for the flush to finish.
283 : layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
284 : /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
285 : layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
286 :
287 : // Needed to ensure that we can't create a branch at a point that was already garbage collected
288 : pub latest_gc_cutoff_lsn: Rcu<Lsn>,
289 :
290 : // List of child timelines and their branch points. This is needed to avoid
291 : // garbage collecting data that is still needed by the child timelines.
292 : pub gc_info: std::sync::RwLock<GcInfo>,
293 :
294 : // It may change across major versions so for simplicity
295 : // keep it after running initdb for a timeline.
296 : // It is needed in checks when we want to error on some operations
297 : // when they are requested for pre-initdb lsn.
298 : // It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn",
299 : // though let's keep them both for better error visibility.
300 : pub initdb_lsn: Lsn,
301 :
302 : /// When did we last calculate the partitioning?
303 : partitioning: Mutex<(KeyPartitioning, Lsn)>,
304 :
305 : /// Configuration: how often should the partitioning be recalculated.
306 : repartition_threshold: u64,
307 :
308 : /// Current logical size of the "datadir", at the last LSN.
309 : current_logical_size: LogicalSize,
310 :
311 : /// Information about the last processed message by the WAL receiver,
312 : /// or None if WAL receiver has not received anything for this timeline
313 : /// yet.
314 : pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
315 : pub walreceiver: Mutex<Option<WalReceiver>>,
316 :
317 : /// Relation size cache
318 : pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
319 :
320 : download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
321 :
322 : state: watch::Sender<TimelineState>,
323 :
324 : /// Prevent two tasks from deleting the timeline at the same time. If held, the
325 : /// timeline is being deleted. If 'true', the timeline has already been deleted.
326 : pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
327 :
328 : eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
329 :
330 : /// Load or creation time information about the disk_consistent_lsn and when the loading
331 : /// happened. Used for consumption metrics.
332 : pub(crate) loaded_at: (Lsn, SystemTime),
333 :
334 : /// Gate to prevent shutdown completing while I/O is still happening to this timeline's data
335 : pub(crate) gate: Gate,
336 :
337 : /// Cancellation token scoped to this timeline: anything doing long-running work relating
338 : /// to the timeline should drop out when this token fires.
339 : pub(crate) cancel: CancellationToken,
340 :
341 : /// Make sure we only have one running compaction at a time in tests.
342 : ///
343 : /// Must only be taken in two places:
344 : /// - [`Timeline::compact`] (this file)
345 : /// - [`delete::delete_local_layer_files`]
346 : ///
347 : /// Timeline deletion will acquire both compaction and gc locks in whatever order.
348 : compaction_lock: tokio::sync::Mutex<()>,
349 :
350 : /// Make sure we only have one running gc at a time.
351 : ///
352 : /// Must only be taken in two places:
353 : /// - [`Timeline::gc`] (this file)
354 : /// - [`delete::delete_local_layer_files`]
355 : ///
356 : /// Timeline deletion will acquire both compaction and gc locks in whatever order.
357 : gc_lock: tokio::sync::Mutex<()>,
358 : }
359 :
360 : pub struct WalReceiverInfo {
361 : pub wal_source_connconf: PgConnectionConfig,
362 : pub last_received_msg_lsn: Lsn,
363 : pub last_received_msg_ts: u128,
364 : }
365 :
366 : ///
367 : /// Information about how much history needs to be retained, needed by
368 : /// Garbage Collection.
369 : ///
370 : pub struct GcInfo {
371 : /// Specific LSNs that are needed.
372 : ///
373 : /// Currently, this includes all points where child branches have
374 : /// been forked off from. In the future, could also include
375 : /// explicit user-defined snapshot points.
376 : pub retain_lsns: Vec<Lsn>,
377 :
378 : /// In addition to 'retain_lsns', keep everything newer than this
379 : /// point.
380 : ///
381 : /// This is calculated by subtracting 'gc_horizon' setting from
382 : /// last-record LSN
383 : ///
384 : /// FIXME: is this inclusive or exclusive?
385 : pub horizon_cutoff: Lsn,
386 :
387 : /// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this
388 : /// point.
389 : ///
390 : /// This is calculated by finding a number such that a record is needed for PITR
391 : /// if only if its LSN is larger than 'pitr_cutoff'.
392 : pub pitr_cutoff: Lsn,
393 : }
394 :
395 : /// An error happened in a get() operation.
396 70 : #[derive(thiserror::Error, Debug)]
397 : pub(crate) enum PageReconstructError {
398 : #[error(transparent)]
399 : Other(#[from] anyhow::Error),
400 :
401 : #[error("Ancestor LSN wait error: {0}")]
402 : AncestorLsnTimeout(#[from] WaitLsnError),
403 :
404 : #[error("timeline shutting down")]
405 : Cancelled,
406 :
407 : /// The ancestor of this is being stopped
408 : #[error("ancestor timeline {0} is being stopped")]
409 : AncestorStopping(TimelineId),
410 :
411 : /// An error happened replaying WAL records
412 : #[error(transparent)]
413 : WalRedo(anyhow::Error),
414 : }
415 :
416 : impl PageReconstructError {
417 : /// Returns true if this error indicates a tenant/timeline shutdown alike situation
418 0 : pub(crate) fn is_stopping(&self) -> bool {
419 0 : use PageReconstructError::*;
420 0 : match self {
421 0 : Other(_) => false,
422 0 : AncestorLsnTimeout(_) => false,
423 0 : Cancelled | AncestorStopping(_) => true,
424 0 : WalRedo(_) => false,
425 : }
426 0 : }
427 : }
428 :
429 0 : #[derive(thiserror::Error, Debug)]
430 : enum CreateImageLayersError {
431 : #[error("timeline shutting down")]
432 : Cancelled,
433 :
434 : #[error(transparent)]
435 : GetVectoredError(GetVectoredError),
436 :
437 : #[error(transparent)]
438 : PageReconstructError(PageReconstructError),
439 :
440 : #[error(transparent)]
441 : Other(#[from] anyhow::Error),
442 : }
443 :
444 0 : #[derive(thiserror::Error, Debug)]
445 : enum FlushLayerError {
446 : /// Timeline cancellation token was cancelled
447 : #[error("timeline shutting down")]
448 : Cancelled,
449 :
450 : #[error(transparent)]
451 : CreateImageLayersError(CreateImageLayersError),
452 :
453 : #[error(transparent)]
454 : Other(#[from] anyhow::Error),
455 : }
456 :
457 0 : #[derive(thiserror::Error, Debug)]
458 : pub(crate) enum GetVectoredError {
459 : #[error("timeline shutting down")]
460 : Cancelled,
461 :
462 : #[error("Requested too many keys: {0} > {}", Timeline::MAX_GET_VECTORED_KEYS)]
463 : Oversized(u64),
464 :
465 : #[error("Requested at invalid LSN: {0}")]
466 : InvalidLsn(Lsn),
467 : }
468 :
469 0 : #[derive(thiserror::Error, Debug)]
470 : pub(crate) enum GetReadyAncestorError {
471 : #[error("ancestor timeline {0} is being stopped")]
472 : AncestorStopping(TimelineId),
473 :
474 : #[error("Ancestor LSN wait error: {0}")]
475 : AncestorLsnTimeout(#[from] WaitLsnError),
476 :
477 : #[error("Cancelled")]
478 : Cancelled,
479 :
480 : #[error(transparent)]
481 : Other(#[from] anyhow::Error),
482 : }
483 :
484 0 : #[derive(Clone, Copy)]
485 : pub enum LogicalSizeCalculationCause {
486 : Initial,
487 : ConsumptionMetricsSyntheticSize,
488 : EvictionTaskImitation,
489 : TenantSizeHandler,
490 : }
491 :
492 : pub enum GetLogicalSizePriority {
493 : User,
494 : Background,
495 : }
496 :
497 813 : #[derive(enumset::EnumSetType)]
498 : pub(crate) enum CompactFlags {
499 : ForceRepartition,
500 : }
501 :
502 : impl std::fmt::Debug for Timeline {
503 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
504 0 : write!(f, "Timeline<{}>", self.timeline_id)
505 0 : }
506 : }
507 :
508 48 : #[derive(thiserror::Error, Debug)]
509 : pub(crate) enum WaitLsnError {
510 : // Called on a timeline which is shutting down
511 : #[error("Shutdown")]
512 : Shutdown,
513 :
514 : // Called on an timeline not in active state or shutting down
515 : #[error("Bad state (not active)")]
516 : BadState,
517 :
518 : // Timeout expired while waiting for LSN to catch up with goal.
519 : #[error("{0}")]
520 : Timeout(String),
521 : }
522 :
523 : // The impls below achieve cancellation mapping for errors.
524 : // Perhaps there's a way of achieving this with less cruft.
525 :
526 : impl From<CreateImageLayersError> for CompactionError {
527 0 : fn from(e: CreateImageLayersError) -> Self {
528 0 : match e {
529 0 : CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
530 0 : _ => CompactionError::Other(e.into()),
531 : }
532 0 : }
533 : }
534 :
535 : impl From<CreateImageLayersError> for FlushLayerError {
536 0 : fn from(e: CreateImageLayersError) -> Self {
537 0 : match e {
538 0 : CreateImageLayersError::Cancelled => FlushLayerError::Cancelled,
539 0 : any => FlushLayerError::CreateImageLayersError(any),
540 : }
541 0 : }
542 : }
543 :
544 : impl From<PageReconstructError> for CreateImageLayersError {
545 0 : fn from(e: PageReconstructError) -> Self {
546 0 : match e {
547 0 : PageReconstructError::Cancelled => CreateImageLayersError::Cancelled,
548 0 : _ => CreateImageLayersError::PageReconstructError(e),
549 : }
550 0 : }
551 : }
552 :
553 : impl From<GetVectoredError> for CreateImageLayersError {
554 0 : fn from(e: GetVectoredError) -> Self {
555 0 : match e {
556 0 : GetVectoredError::Cancelled => CreateImageLayersError::Cancelled,
557 0 : _ => CreateImageLayersError::GetVectoredError(e),
558 : }
559 0 : }
560 : }
561 :
562 : impl From<GetReadyAncestorError> for PageReconstructError {
563 4 : fn from(e: GetReadyAncestorError) -> Self {
564 4 : use GetReadyAncestorError::*;
565 4 : match e {
566 2 : AncestorStopping(tid) => PageReconstructError::AncestorStopping(tid),
567 0 : AncestorLsnTimeout(wait_err) => PageReconstructError::AncestorLsnTimeout(wait_err),
568 0 : Cancelled => PageReconstructError::Cancelled,
569 2 : Other(other) => PageReconstructError::Other(other),
570 : }
571 4 : }
572 : }
573 :
574 : /// Public interface functions
575 : impl Timeline {
576 : /// Get the LSN where this branch was created
577 3803 : pub(crate) fn get_ancestor_lsn(&self) -> Lsn {
578 3803 : self.ancestor_lsn
579 3803 : }
580 :
581 : /// Get the ancestor's timeline id
582 5285 : pub(crate) fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
583 5285 : self.ancestor_timeline
584 5285 : .as_ref()
585 5285 : .map(|ancestor| ancestor.timeline_id)
586 5285 : }
587 :
588 : /// Lock and get timeline's GC cutoff
589 4472625 : pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
590 4472625 : self.latest_gc_cutoff_lsn.read()
591 4472625 : }
592 :
593 : /// Look up given page version.
594 : ///
595 : /// If a remote layer file is needed, it is downloaded as part of this
596 : /// call.
597 : ///
598 : /// NOTE: It is considered an error to 'get' a key that doesn't exist. The
599 : /// abstraction above this needs to store suitable metadata to track what
600 : /// data exists with what keys, in separate metadata entries. If a
601 : /// non-existent key is requested, we may incorrectly return a value from
602 : /// an ancestor branch, for example, or waste a lot of cycles chasing the
603 : /// non-existing key.
604 : ///
605 : /// # Cancel-Safety
606 : ///
607 : /// This method is cancellation-safe.
608 7398627 : pub(crate) async fn get(
609 7398627 : &self,
610 7398627 : key: Key,
611 7398627 : lsn: Lsn,
612 7398627 : ctx: &RequestContext,
613 7398634 : ) -> Result<Bytes, PageReconstructError> {
614 7398634 : if !lsn.is_valid() {
615 1 : return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
616 7398633 : }
617 :
618 : // This check is debug-only because of the cost of hashing, and because it's a double-check: we
619 : // already checked the key against the shard_identity when looking up the Timeline from
620 : // page_service.
621 7398633 : debug_assert!(!self.shard_identity.is_key_disposable(&key));
622 :
623 : // XXX: structured stats collection for layer eviction here.
624 0 : trace!(
625 0 : "get page request for {}@{} from task kind {:?}",
626 0 : key,
627 0 : lsn,
628 0 : ctx.task_kind()
629 0 : );
630 :
631 : // Check the page cache. We will get back the most recent page with lsn <= `lsn`.
632 : // The cached image can be returned directly if there is no WAL between the cached image
633 : // and requested LSN. The cached image can also be used to reduce the amount of WAL needed
634 : // for redo.
635 7398633 : let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
636 1787176 : Some((cached_lsn, cached_img)) => {
637 1787176 : match cached_lsn.cmp(&lsn) {
638 1710871 : Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
639 : Ordering::Equal => {
640 76305 : MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
641 76305 : return Ok(cached_img); // exact LSN match, return the image
642 : }
643 : Ordering::Greater => {
644 0 : unreachable!("the returned lsn should never be after the requested lsn")
645 : }
646 : }
647 1710871 : Some((cached_lsn, cached_img))
648 : }
649 5611457 : None => None,
650 : };
651 :
652 7322328 : let mut reconstruct_state = ValueReconstructState {
653 7322328 : records: Vec::new(),
654 7322328 : img: cached_page_img,
655 7322328 : };
656 7322328 :
657 7322328 : let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
658 7322328 : let path = self
659 7322328 : .get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
660 2279470 : .await?;
661 7320977 : timer.stop_and_record();
662 7320977 :
663 7320977 : let start = Instant::now();
664 7320977 : let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
665 7320975 : let elapsed = start.elapsed();
666 7320975 : crate::metrics::RECONSTRUCT_TIME
667 7320975 : .for_result(&res)
668 7320975 : .observe(elapsed.as_secs_f64());
669 7320975 :
670 7320975 : if cfg!(feature = "testing") && res.is_err() {
671 : // it can only be walredo issue
672 : use std::fmt::Write;
673 :
674 0 : let mut msg = String::new();
675 0 :
676 0 : path.into_iter().for_each(|(res, cont_lsn, layer)| {
677 0 : writeln!(
678 0 : msg,
679 0 : "- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
680 0 : layer(),
681 0 : )
682 0 : .expect("string grows")
683 0 : });
684 :
685 : // this is to rule out or provide evidence that we could in some cases read a duplicate
686 : // walrecord
687 0 : tracing::info!("walredo failed, path:\n{msg}");
688 7320975 : }
689 :
690 7320975 : res
691 7398620 : }
692 :
693 : pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
694 :
695 : /// Look up multiple page versions at a given LSN
696 : ///
697 : /// This naive implementation will be replaced with a more efficient one
698 : /// which actually vectorizes the read path.
699 78555 : pub(crate) async fn get_vectored(
700 78555 : &self,
701 78555 : key_ranges: &[Range<Key>],
702 78555 : lsn: Lsn,
703 78555 : ctx: &RequestContext,
704 78555 : ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
705 78555 : if !lsn.is_valid() {
706 0 : return Err(GetVectoredError::InvalidLsn(lsn));
707 78555 : }
708 78555 :
709 78555 : let key_count = key_ranges
710 78555 : .iter()
711 79754 : .map(|range| key_range_size(range) as u64)
712 78555 : .sum();
713 78555 : if key_count > Timeline::MAX_GET_VECTORED_KEYS {
714 0 : return Err(GetVectoredError::Oversized(key_count));
715 78555 : }
716 78555 :
717 78555 : let _timer = crate::metrics::GET_VECTORED_LATENCY
718 78555 : .for_task_kind(ctx.task_kind())
719 78555 : .map(|t| t.start_timer());
720 78555 :
721 78555 : let mut values = BTreeMap::new();
722 158309 : for range in key_ranges {
723 79754 : let mut key = range.start;
724 311977 : while key != range.end {
725 232223 : assert!(!self.shard_identity.is_key_disposable(&key));
726 :
727 232223 : let block = self.get(key, lsn, ctx).await;
728 :
729 : if matches!(
730 0 : block,
731 : Err(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_))
732 : ) {
733 0 : return Err(GetVectoredError::Cancelled);
734 232223 : }
735 232223 :
736 232223 : values.insert(key, block);
737 232223 : key = key.next();
738 : }
739 : }
740 :
741 78555 : Ok(values)
742 78555 : }
743 :
744 : /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
745 9450148 : pub(crate) fn get_last_record_lsn(&self) -> Lsn {
746 9450148 : self.last_record_lsn.load().last
747 9450148 : }
748 :
749 3185 : pub(crate) fn get_prev_record_lsn(&self) -> Lsn {
750 3185 : self.last_record_lsn.load().prev
751 3185 : }
752 :
753 : /// Atomically get both last and prev.
754 1080 : pub(crate) fn get_last_record_rlsn(&self) -> RecordLsn {
755 1080 : self.last_record_lsn.load()
756 1080 : }
757 :
758 757672 : pub(crate) fn get_disk_consistent_lsn(&self) -> Lsn {
759 757672 : self.disk_consistent_lsn.load()
760 757672 : }
761 :
762 : /// remote_consistent_lsn from the perspective of the tenant's current generation,
763 : /// not validated with control plane yet.
764 : /// See [`Self::get_remote_consistent_lsn_visible`].
765 3185 : pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
766 3185 : if let Some(remote_client) = &self.remote_client {
767 3185 : remote_client.remote_consistent_lsn_projected()
768 : } else {
769 0 : None
770 : }
771 3185 : }
772 :
773 : /// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
774 : /// i.e. a value of remote_consistent_lsn_projected which has undergone
775 : /// generation validation in the deletion queue.
776 755304 : pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
777 755304 : if let Some(remote_client) = &self.remote_client {
778 755304 : remote_client.remote_consistent_lsn_visible()
779 : } else {
780 0 : None
781 : }
782 755304 : }
783 :
784 : /// The sum of the file size of all historic layers in the layer map.
785 : /// This method makes no distinction between local and remote layers.
786 : /// Hence, the result **does not represent local filesystem usage**.
787 3661 : pub(crate) async fn layer_size_sum(&self) -> u64 {
788 3661 : let guard = self.layers.read().await;
789 3661 : let layer_map = guard.layer_map();
790 3661 : let mut size = 0;
791 325484 : for l in layer_map.iter_historic_layers() {
792 325484 : size += l.file_size();
793 325484 : }
794 3661 : size
795 3661 : }
796 :
797 17 : pub(crate) fn resident_physical_size(&self) -> u64 {
798 17 : self.metrics.resident_physical_size_get()
799 17 : }
800 :
801 3185 : pub(crate) fn get_directory_metrics(&self) -> [u64; DirectoryKind::KINDS_NUM] {
802 22295 : array::from_fn(|idx| self.directory_metrics[idx].load(AtomicOrdering::Relaxed))
803 3185 : }
804 :
805 : ///
806 : /// Wait until WAL has been received and processed up to this LSN.
807 : ///
808 : /// You should call this before any of the other get_* or list_* functions. Calling
809 : /// those functions with an LSN that has been processed yet is an error.
810 : ///
811 1539795 : pub(crate) async fn wait_lsn(
812 1539795 : &self,
813 1539795 : lsn: Lsn,
814 1539795 : _ctx: &RequestContext, /* Prepare for use by cancellation */
815 1539796 : ) -> Result<(), WaitLsnError> {
816 1539796 : if self.cancel.is_cancelled() {
817 0 : return Err(WaitLsnError::Shutdown);
818 1539796 : } else if !self.is_active() {
819 0 : return Err(WaitLsnError::BadState);
820 1539796 : }
821 :
822 : // This should never be called from the WAL receiver, because that could lead
823 : // to a deadlock.
824 : debug_assert!(
825 1539796 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
826 0 : "wait_lsn cannot be called in WAL receiver"
827 : );
828 : debug_assert!(
829 1539796 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
830 0 : "wait_lsn cannot be called in WAL receiver"
831 : );
832 : debug_assert!(
833 1539796 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
834 0 : "wait_lsn cannot be called in WAL receiver"
835 : );
836 :
837 1539796 : let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
838 1539796 :
839 1539796 : match self
840 1539796 : .last_record_lsn
841 1539796 : .wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
842 108459 : .await
843 : {
844 1539772 : Ok(()) => Ok(()),
845 24 : Err(e) => {
846 24 : use utils::seqwait::SeqWaitError::*;
847 24 : match e {
848 0 : Shutdown => Err(WaitLsnError::Shutdown),
849 : Timeout => {
850 : // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
851 24 : drop(_timer);
852 24 : let walreceiver_status = self.walreceiver_status();
853 24 : Err(WaitLsnError::Timeout(format!(
854 24 : "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
855 24 : lsn,
856 24 : self.get_last_record_lsn(),
857 24 : self.get_disk_consistent_lsn(),
858 24 : walreceiver_status,
859 24 : )))
860 : }
861 : }
862 : }
863 : }
864 1539796 : }
865 :
866 3209 : pub(crate) fn walreceiver_status(&self) -> String {
867 3209 : match &*self.walreceiver.lock().unwrap() {
868 112 : None => "stopping or stopped".to_string(),
869 3097 : Some(walreceiver) => match walreceiver.status() {
870 1920 : Some(status) => status.to_human_readable_string(),
871 1177 : None => "Not active".to_string(),
872 : },
873 : }
874 3209 : }
875 :
876 : /// Check that it is valid to request operations with that lsn.
877 670 : pub(crate) fn check_lsn_is_in_scope(
878 670 : &self,
879 670 : lsn: Lsn,
880 670 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
881 670 : ) -> anyhow::Result<()> {
882 670 : ensure!(
883 670 : lsn >= **latest_gc_cutoff_lsn,
884 16 : "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
885 16 : lsn,
886 16 : **latest_gc_cutoff_lsn,
887 : );
888 654 : Ok(())
889 670 : }
890 :
891 : /// Flush to disk all data that was written with the put_* functions
892 3812 : #[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
893 : pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
894 : self.freeze_inmem_layer(false).await;
895 : self.flush_frozen_layers_and_wait().await
896 : }
897 :
898 : /// Outermost timeline compaction operation; downloads needed layers.
899 1623 : pub(crate) async fn compact(
900 1623 : self: &Arc<Self>,
901 1623 : cancel: &CancellationToken,
902 1623 : flags: EnumSet<CompactFlags>,
903 1623 : ctx: &RequestContext,
904 1623 : ) -> Result<(), CompactionError> {
905 1623 : // most likely the cancellation token is from background task, but in tests it could be the
906 1623 : // request task as well.
907 1623 :
908 1623 : let prepare = async move {
909 1623 : let guard = self.compaction_lock.lock().await;
910 :
911 1623 : let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
912 1623 : BackgroundLoopKind::Compaction,
913 1623 : ctx,
914 1623 : )
915 0 : .await;
916 :
917 1623 : (guard, permit)
918 1623 : };
919 :
920 : // this wait probably never needs any "long time spent" logging, because we already nag if
921 : // compaction task goes over it's period (20s) which is quite often in production.
922 1623 : let (_guard, _permit) = tokio::select! {
923 1623 : tuple = prepare => { tuple },
924 : _ = self.cancel.cancelled() => return Ok(()),
925 : _ = cancel.cancelled() => return Ok(()),
926 : };
927 :
928 1623 : let last_record_lsn = self.get_last_record_lsn();
929 1623 :
930 1623 : // Last record Lsn could be zero in case the timeline was just created
931 1623 : if !last_record_lsn.is_valid() {
932 0 : warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
933 0 : return Ok(());
934 1623 : }
935 1623 :
936 1623 : // High level strategy for compaction / image creation:
937 1623 : //
938 1623 : // 1. First, calculate the desired "partitioning" of the
939 1623 : // currently in-use key space. The goal is to partition the
940 1623 : // key space into roughly fixed-size chunks, but also take into
941 1623 : // account any existing image layers, and try to align the
942 1623 : // chunk boundaries with the existing image layers to avoid
943 1623 : // too much churn. Also try to align chunk boundaries with
944 1623 : // relation boundaries. In principle, we don't know about
945 1623 : // relation boundaries here, we just deal with key-value
946 1623 : // pairs, and the code in pgdatadir_mapping.rs knows how to
947 1623 : // map relations into key-value pairs. But in practice we know
948 1623 : // that 'field6' is the block number, and the fields 1-5
949 1623 : // identify a relation. This is just an optimization,
950 1623 : // though.
951 1623 : //
952 1623 : // 2. Once we know the partitioning, for each partition,
953 1623 : // decide if it's time to create a new image layer. The
954 1623 : // criteria is: there has been too much "churn" since the last
955 1623 : // image layer? The "churn" is fuzzy concept, it's a
956 1623 : // combination of too many delta files, or too much WAL in
957 1623 : // total in the delta file. Or perhaps: if creating an image
958 1623 : // file would allow to delete some older files.
959 1623 : //
960 1623 : // 3. After that, we compact all level0 delta files if there
961 1623 : // are too many of them. While compacting, we also garbage
962 1623 : // collect any page versions that are no longer needed because
963 1623 : // of the new image layers we created in step 2.
964 1623 : //
965 1623 : // TODO: This high level strategy hasn't been implemented yet.
966 1623 : // Below are functions compact_level0() and create_image_layers()
967 1623 : // but they are a bit ad hoc and don't quite work like it's explained
968 1623 : // above. Rewrite it.
969 1623 :
970 1623 : // Is the timeline being deleted?
971 1623 : if self.is_stopping() {
972 0 : trace!("Dropping out of compaction on timeline shutdown");
973 0 : return Err(CompactionError::ShuttingDown);
974 1623 : }
975 1623 :
976 1623 : let target_file_size = self.get_checkpoint_distance();
977 1623 :
978 1623 : // Define partitioning schema if needed
979 1623 :
980 1623 : // FIXME: the match should only cover repartitioning, not the next steps
981 1623 : match self
982 1623 : .repartition(
983 1623 : self.get_last_record_lsn(),
984 1623 : self.get_compaction_target_size(),
985 1623 : flags,
986 1623 : ctx,
987 1623 : )
988 176612 : .await
989 : {
990 1621 : Ok((partitioning, lsn)) => {
991 1621 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
992 1621 : let image_ctx = RequestContextBuilder::extend(ctx)
993 1621 : .access_stats_behavior(AccessStatsBehavior::Skip)
994 1621 : .build();
995 1621 :
996 1621 : // 2. Compact
997 1621 : let timer = self.metrics.compact_time_histo.start_timer();
998 314622 : self.compact_level0(target_file_size, ctx).await?;
999 1619 : timer.stop_and_record();
1000 :
1001 : // 3. Create new image layers for partitions that have been modified
1002 : // "enough".
1003 1619 : let layers = self
1004 1619 : .create_image_layers(&partitioning, lsn, false, &image_ctx)
1005 56836 : .await
1006 1619 : .map_err(anyhow::Error::from)?;
1007 1619 : if let Some(remote_client) = &self.remote_client {
1008 8014 : for layer in layers {
1009 6395 : remote_client.schedule_layer_file_upload(layer)?;
1010 : }
1011 0 : }
1012 :
1013 1619 : if let Some(remote_client) = &self.remote_client {
1014 : // should any new image layer been created, not uploading index_part will
1015 : // result in a mismatch between remote_physical_size and layermap calculated
1016 : // size, which will fail some tests, but should not be an issue otherwise.
1017 1619 : remote_client.schedule_index_upload_for_file_changes()?;
1018 0 : }
1019 : }
1020 1 : Err(err) => {
1021 1 : // no partitioning? This is normal, if the timeline was just created
1022 1 : // as an empty timeline. Also in unit tests, when we use the timeline
1023 1 : // as a simple key-value store, ignoring the datadir layout. Log the
1024 1 : // error but continue.
1025 1 : //
1026 1 : // Suppress error when it's due to cancellation
1027 1 : if !self.cancel.is_cancelled() {
1028 1 : error!("could not compact, repartitioning keyspace failed: {err:?}");
1029 0 : }
1030 : }
1031 : };
1032 :
1033 1620 : Ok(())
1034 1620 : }
1035 :
1036 : /// Mutate the timeline with a [`TimelineWriter`].
1037 3310571 : pub(crate) async fn writer(&self) -> TimelineWriter<'_> {
1038 3310571 : TimelineWriter {
1039 3310571 : tl: self,
1040 3310571 : _write_guard: self.write_lock.lock().await,
1041 : }
1042 3310571 : }
1043 :
1044 : /// Check if more than 'checkpoint_distance' of WAL has been accumulated in
1045 : /// the in-memory layer, and initiate flushing it if so.
1046 : ///
1047 : /// Also flush after a period of time without new data -- it helps
1048 : /// safekeepers to regard pageserver as caught up and suspend activity.
1049 1356943 : pub(crate) async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
1050 1356943 : let last_lsn = self.get_last_record_lsn();
1051 1356123 : let open_layer_size = {
1052 1356943 : let guard = self.layers.read().await;
1053 1356943 : let layers = guard.layer_map();
1054 1356943 : let Some(open_layer) = layers.open_layer.as_ref() else {
1055 820 : return Ok(());
1056 : };
1057 1356123 : open_layer.size().await?
1058 : };
1059 1356123 : let last_freeze_at = self.last_freeze_at.load();
1060 1356123 : let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
1061 1356123 : let distance = last_lsn.widening_sub(last_freeze_at);
1062 1356123 : // Checkpointing the open layer can be triggered by layer size or LSN range.
1063 1356123 : // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
1064 1356123 : // we want to stay below that with a big margin. The LSN distance determines how
1065 1356123 : // much WAL the safekeepers need to store.
1066 1356123 : if distance >= self.get_checkpoint_distance().into()
1067 1355570 : || open_layer_size > self.get_checkpoint_distance()
1068 1352619 : || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
1069 : {
1070 3504 : info!(
1071 3504 : "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
1072 3504 : distance,
1073 3504 : open_layer_size,
1074 3504 : last_freeze_ts.elapsed()
1075 3504 : );
1076 :
1077 3504 : self.freeze_inmem_layer(true).await;
1078 3504 : self.last_freeze_at.store(last_lsn);
1079 3504 : *(self.last_freeze_ts.write().unwrap()) = Instant::now();
1080 3504 :
1081 3504 : // Wake up the layer flusher
1082 3504 : self.flush_frozen_layers();
1083 1352619 : }
1084 1356123 : Ok(())
1085 1356943 : }
1086 :
1087 1257 : pub(crate) fn activate(
1088 1257 : self: &Arc<Self>,
1089 1257 : broker_client: BrokerClientChannel,
1090 1257 : background_jobs_can_start: Option<&completion::Barrier>,
1091 1257 : ctx: &RequestContext,
1092 1257 : ) {
1093 1257 : if self.tenant_shard_id.is_zero() {
1094 1206 : // Logical size is only maintained accurately on shard zero.
1095 1206 : self.spawn_initial_logical_size_computation_task(ctx);
1096 1206 : }
1097 1257 : self.launch_wal_receiver(ctx, broker_client);
1098 1257 : self.set_state(TimelineState::Active);
1099 1257 : self.launch_eviction_task(background_jobs_can_start);
1100 1257 : }
1101 :
1102 : /// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
1103 : /// also to remote storage. This method can easily take multiple seconds for a busy timeline.
1104 : ///
1105 : /// While we are flushing, we continue to accept read I/O.
1106 262 : pub(crate) async fn flush_and_shutdown(&self) {
1107 262 : debug_assert_current_span_has_tenant_and_timeline_id();
1108 :
1109 : // Stop ingesting data, so that we are not still writing to an InMemoryLayer while
1110 : // trying to flush
1111 0 : tracing::debug!("Waiting for WalReceiverManager...");
1112 262 : task_mgr::shutdown_tasks(
1113 262 : Some(TaskKind::WalReceiverManager),
1114 262 : Some(self.tenant_shard_id),
1115 262 : Some(self.timeline_id),
1116 262 : )
1117 145 : .await;
1118 :
1119 : // Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
1120 262 : self.last_record_lsn.shutdown();
1121 262 :
1122 262 : // now all writers to InMemory layer are gone, do the final flush if requested
1123 262 : match self.freeze_and_flush().await {
1124 : Ok(_) => {
1125 : // drain the upload queue
1126 217 : if let Some(client) = self.remote_client.as_ref() {
1127 : // if we did not wait for completion here, it might be our shutdown process
1128 : // didn't wait for remote uploads to complete at all, as new tasks can forever
1129 : // be spawned.
1130 : //
1131 : // what is problematic is the shutting down of RemoteTimelineClient, because
1132 : // obviously it does not make sense to stop while we wait for it, but what
1133 : // about corner cases like s3 suddenly hanging up?
1134 217 : if let Err(e) = client.shutdown().await {
1135 : // Non-fatal. Shutdown is infallible. Failures to flush just mean that
1136 : // we have some extra WAL replay to do next time the timeline starts.
1137 0 : warn!("failed to flush to remote storage: {e:#}");
1138 217 : }
1139 0 : }
1140 : }
1141 45 : Err(e) => {
1142 : // Non-fatal. Shutdown is infallible. Failures to flush just mean that
1143 : // we have some extra WAL replay to do next time the timeline starts.
1144 45 : warn!("failed to freeze and flush: {e:#}");
1145 : }
1146 : }
1147 :
1148 438 : self.shutdown().await;
1149 262 : }
1150 :
1151 : /// Shut down immediately, without waiting for any open layers to flush to disk. This is a subset of
1152 : /// the graceful [`Timeline::flush_and_shutdown`] function.
1153 593 : pub(crate) async fn shutdown(&self) {
1154 593 : debug_assert_current_span_has_tenant_and_timeline_id();
1155 :
1156 : // Signal any subscribers to our cancellation token to drop out
1157 0 : tracing::debug!("Cancelling CancellationToken");
1158 593 : self.cancel.cancel();
1159 593 :
1160 593 : // Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
1161 593 : // while doing so.
1162 593 : self.last_record_lsn.shutdown();
1163 593 :
1164 593 : // Shut down the layer flush task before the remote client, as one depends on the other
1165 593 : task_mgr::shutdown_tasks(
1166 593 : Some(TaskKind::LayerFlushTask),
1167 593 : Some(self.tenant_shard_id),
1168 593 : Some(self.timeline_id),
1169 593 : )
1170 502 : .await;
1171 :
1172 : // Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
1173 : // case our caller wants to use that for a deletion
1174 593 : if let Some(remote_client) = self.remote_client.as_ref() {
1175 593 : match remote_client.stop() {
1176 593 : Ok(()) => {}
1177 0 : Err(StopError::QueueUninitialized) => {
1178 0 : // Shutting down during initialization is legal
1179 0 : }
1180 : }
1181 0 : }
1182 :
1183 0 : tracing::debug!("Waiting for tasks...");
1184 :
1185 658 : task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;
1186 :
1187 : // Finally wait until any gate-holders are complete
1188 593 : self.gate.close().await;
1189 593 : }
1190 :
1191 2281 : pub(crate) fn set_state(&self, new_state: TimelineState) {
1192 2281 : match (self.current_state(), new_state) {
1193 2281 : (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
1194 126 : info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
1195 : }
1196 0 : (st, TimelineState::Loading) => {
1197 0 : error!("ignoring transition from {st:?} into Loading state");
1198 : }
1199 7 : (TimelineState::Broken { .. }, new_state) => {
1200 7 : error!("Ignoring state update {new_state:?} for broken timeline");
1201 : }
1202 : (TimelineState::Stopping, TimelineState::Active) => {
1203 0 : error!("Not activating a Stopping timeline");
1204 : }
1205 2148 : (_, new_state) => {
1206 2148 : self.state.send_replace(new_state);
1207 2148 : }
1208 : }
1209 2281 : }
1210 :
1211 19 : pub(crate) fn set_broken(&self, reason: String) {
1212 19 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
1213 19 : let broken_state = TimelineState::Broken {
1214 19 : reason,
1215 19 : backtrace: backtrace_str,
1216 19 : };
1217 19 : self.set_state(broken_state);
1218 19 :
1219 19 : // Although the Broken state is not equivalent to shutdown() (shutdown will be called
1220 19 : // later when this tenant is detach or the process shuts down), firing the cancellation token
1221 19 : // here avoids the need for other tasks to watch for the Broken state explicitly.
1222 19 : self.cancel.cancel();
1223 19 : }
1224 :
1225 2250837 : pub(crate) fn current_state(&self) -> TimelineState {
1226 2250837 : self.state.borrow().clone()
1227 2250837 : }
1228 :
1229 943 : pub(crate) fn is_broken(&self) -> bool {
1230 943 : matches!(&*self.state.borrow(), TimelineState::Broken { .. })
1231 943 : }
1232 :
1233 1554712 : pub(crate) fn is_active(&self) -> bool {
1234 1554712 : self.current_state() == TimelineState::Active
1235 1554712 : }
1236 :
1237 2813 : pub(crate) fn is_stopping(&self) -> bool {
1238 2813 : self.current_state() == TimelineState::Stopping
1239 2813 : }
1240 :
1241 1253 : pub(crate) fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
1242 1253 : self.state.subscribe()
1243 1253 : }
1244 :
1245 1313705 : pub(crate) async fn wait_to_become_active(
1246 1313705 : &self,
1247 1313705 : _ctx: &RequestContext, // Prepare for use by cancellation
1248 1313706 : ) -> Result<(), TimelineState> {
1249 1313706 : let mut receiver = self.state.subscribe();
1250 1313735 : loop {
1251 1313735 : let current_state = receiver.borrow().clone();
1252 1313735 : match current_state {
1253 : TimelineState::Loading => {
1254 29 : receiver
1255 29 : .changed()
1256 29 : .await
1257 29 : .expect("holding a reference to self");
1258 : }
1259 : TimelineState::Active { .. } => {
1260 1313699 : return Ok(());
1261 : }
1262 : TimelineState::Broken { .. } | TimelineState::Stopping => {
1263 : // There's no chance the timeline can transition back into ::Active
1264 7 : return Err(current_state);
1265 : }
1266 : }
1267 : }
1268 1313706 : }
1269 :
1270 97 : pub(crate) async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
1271 97 : let guard = self.layers.read().await;
1272 97 : let layer_map = guard.layer_map();
1273 97 : let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
1274 97 : if let Some(open_layer) = &layer_map.open_layer {
1275 3 : in_memory_layers.push(open_layer.info());
1276 94 : }
1277 97 : for frozen_layer in &layer_map.frozen_layers {
1278 0 : in_memory_layers.push(frozen_layer.info());
1279 0 : }
1280 :
1281 97 : let mut historic_layers = Vec::new();
1282 3050 : for historic_layer in layer_map.iter_historic_layers() {
1283 3050 : let historic_layer = guard.get_from_desc(&historic_layer);
1284 3050 : historic_layers.push(historic_layer.info(reset));
1285 3050 : }
1286 :
1287 97 : LayerMapInfo {
1288 97 : in_memory_layers,
1289 97 : historic_layers,
1290 97 : }
1291 97 : }
1292 :
1293 4 : #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
1294 : pub(crate) async fn download_layer(
1295 : &self,
1296 : layer_file_name: &str,
1297 : ) -> anyhow::Result<Option<bool>> {
1298 : let Some(layer) = self.find_layer(layer_file_name).await else {
1299 : return Ok(None);
1300 : };
1301 :
1302 : if self.remote_client.is_none() {
1303 : return Ok(Some(false));
1304 : }
1305 :
1306 : layer.download().await?;
1307 :
1308 : Ok(Some(true))
1309 : }
1310 :
1311 : /// Evict just one layer.
1312 : ///
1313 : /// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`.
1314 2206 : pub(crate) async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
1315 2206 : let _gate = self
1316 2206 : .gate
1317 2206 : .enter()
1318 2206 : .map_err(|_| anyhow::anyhow!("Shutting down"))?;
1319 :
1320 2206 : let Some(local_layer) = self.find_layer(layer_file_name).await else {
1321 0 : return Ok(None);
1322 : };
1323 :
1324 2206 : match local_layer.evict_and_wait().await {
1325 2206 : Ok(()) => Ok(Some(true)),
1326 0 : Err(EvictionError::NotFound) => Ok(Some(false)),
1327 0 : Err(EvictionError::Downloaded) => Ok(Some(false)),
1328 : }
1329 2206 : }
1330 : }
1331 :
1332 : /// Number of times we will compute partition within a checkpoint distance.
1333 : const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
1334 :
1335 : // Private functions
1336 : impl Timeline {
1337 607 : pub(crate) fn get_lazy_slru_download(&self) -> bool {
1338 607 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1339 607 : tenant_conf
1340 607 : .lazy_slru_download
1341 607 : .unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
1342 607 : }
1343 :
1344 2714908 : fn get_checkpoint_distance(&self) -> u64 {
1345 2714908 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1346 2714908 : tenant_conf
1347 2714908 : .checkpoint_distance
1348 2714908 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
1349 2714908 : }
1350 :
1351 1352619 : fn get_checkpoint_timeout(&self) -> Duration {
1352 1352619 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1353 1352619 : tenant_conf
1354 1352619 : .checkpoint_timeout
1355 1352619 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
1356 1352619 : }
1357 :
1358 1703 : fn get_compaction_target_size(&self) -> u64 {
1359 1703 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1360 1703 : tenant_conf
1361 1703 : .compaction_target_size
1362 1703 : .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
1363 1703 : }
1364 :
1365 1621 : fn get_compaction_threshold(&self) -> usize {
1366 1621 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1367 1621 : tenant_conf
1368 1621 : .compaction_threshold
1369 1621 : .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
1370 1621 : }
1371 :
1372 36833 : fn get_image_creation_threshold(&self) -> usize {
1373 36833 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1374 36833 : tenant_conf
1375 36833 : .image_creation_threshold
1376 36833 : .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
1377 36833 : }
1378 :
1379 2664 : fn get_eviction_policy(&self) -> EvictionPolicy {
1380 2664 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1381 2664 : tenant_conf
1382 2664 : .eviction_policy
1383 2664 : .unwrap_or(self.conf.default_tenant_conf.eviction_policy)
1384 2664 : }
1385 :
1386 1654 : fn get_evictions_low_residence_duration_metric_threshold(
1387 1654 : tenant_conf: &TenantConfOpt,
1388 1654 : default_tenant_conf: &TenantConf,
1389 1654 : ) -> Duration {
1390 1654 : tenant_conf
1391 1654 : .evictions_low_residence_duration_metric_threshold
1392 1654 : .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
1393 1654 : }
1394 :
1395 12981 : fn get_gc_feedback(&self) -> bool {
1396 12981 : let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
1397 12981 : tenant_conf
1398 12981 : .gc_feedback
1399 12981 : .unwrap_or(self.conf.default_tenant_conf.gc_feedback)
1400 12981 : }
1401 :
1402 62 : pub(super) fn tenant_conf_updated(&self) {
1403 62 : // NB: Most tenant conf options are read by background loops, so,
1404 62 : // changes will automatically be picked up.
1405 62 :
1406 62 : // The threshold is embedded in the metric. So, we need to update it.
1407 62 : {
1408 62 : let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
1409 62 : &self.tenant_conf.read().unwrap().tenant_conf,
1410 62 : &self.conf.default_tenant_conf,
1411 62 : );
1412 62 :
1413 62 : let tenant_id_str = self.tenant_shard_id.tenant_id.to_string();
1414 62 : let shard_id_str = format!("{}", self.tenant_shard_id.shard_slug());
1415 62 :
1416 62 : let timeline_id_str = self.timeline_id.to_string();
1417 62 : self.metrics
1418 62 : .evictions_with_low_residence_duration
1419 62 : .write()
1420 62 : .unwrap()
1421 62 : .change_threshold(
1422 62 : &tenant_id_str,
1423 62 : &shard_id_str,
1424 62 : &timeline_id_str,
1425 62 : new_threshold,
1426 62 : );
1427 62 : }
1428 62 : }
1429 :
1430 : /// Open a Timeline handle.
1431 : ///
1432 : /// Loads the metadata for the timeline into memory, but not the layer map.
1433 : #[allow(clippy::too_many_arguments)]
1434 1592 : pub(super) fn new(
1435 1592 : conf: &'static PageServerConf,
1436 1592 : tenant_conf: Arc<RwLock<AttachedTenantConf>>,
1437 1592 : metadata: &TimelineMetadata,
1438 1592 : ancestor: Option<Arc<Timeline>>,
1439 1592 : timeline_id: TimelineId,
1440 1592 : tenant_shard_id: TenantShardId,
1441 1592 : generation: Generation,
1442 1592 : shard_identity: ShardIdentity,
1443 1592 : walredo_mgr: Option<Arc<super::WalRedoManager>>,
1444 1592 : resources: TimelineResources,
1445 1592 : pg_version: u32,
1446 1592 : state: TimelineState,
1447 1592 : cancel: CancellationToken,
1448 1592 : ) -> Arc<Self> {
1449 1592 : let disk_consistent_lsn = metadata.disk_consistent_lsn();
1450 1592 : let (state, _) = watch::channel(state);
1451 1592 :
1452 1592 : let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
1453 1592 : let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
1454 1592 :
1455 1592 : let tenant_conf_guard = tenant_conf.read().unwrap();
1456 1592 :
1457 1592 : let evictions_low_residence_duration_metric_threshold =
1458 1592 : Self::get_evictions_low_residence_duration_metric_threshold(
1459 1592 : &tenant_conf_guard.tenant_conf,
1460 1592 : &conf.default_tenant_conf,
1461 1592 : );
1462 1592 : drop(tenant_conf_guard);
1463 1592 :
1464 1592 : Arc::new_cyclic(|myself| {
1465 1592 : let mut result = Timeline {
1466 1592 : conf,
1467 1592 : tenant_conf,
1468 1592 : myself: myself.clone(),
1469 1592 : timeline_id,
1470 1592 : tenant_shard_id,
1471 1592 : generation,
1472 1592 : shard_identity,
1473 1592 : pg_version,
1474 1592 : layers: Default::default(),
1475 1592 : wanted_image_layers: Mutex::new(None),
1476 1592 :
1477 1592 : walredo_mgr,
1478 1592 : walreceiver: Mutex::new(None),
1479 1592 :
1480 1592 : remote_client: resources.remote_client.map(Arc::new),
1481 1592 :
1482 1592 : // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
1483 1592 : last_record_lsn: SeqWait::new(RecordLsn {
1484 1592 : last: disk_consistent_lsn,
1485 1592 : prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
1486 1592 : }),
1487 1592 : disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
1488 1592 :
1489 1592 : last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
1490 1592 : last_freeze_ts: RwLock::new(Instant::now()),
1491 1592 :
1492 1592 : loaded_at: (disk_consistent_lsn, SystemTime::now()),
1493 1592 :
1494 1592 : ancestor_timeline: ancestor,
1495 1592 : ancestor_lsn: metadata.ancestor_lsn(),
1496 1592 :
1497 1592 : metrics: TimelineMetrics::new(
1498 1592 : &tenant_shard_id,
1499 1592 : &timeline_id,
1500 1592 : crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
1501 1592 : "mtime",
1502 1592 : evictions_low_residence_duration_metric_threshold,
1503 1592 : ),
1504 1592 : ),
1505 1592 :
1506 1592 : query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new(
1507 1592 : &tenant_shard_id,
1508 1592 : &timeline_id,
1509 1592 : ),
1510 1592 :
1511 11144 : directory_metrics: array::from_fn(|_| AtomicU64::new(0)),
1512 1592 :
1513 1592 : flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
1514 1592 :
1515 1592 : layer_flush_start_tx,
1516 1592 : layer_flush_done_tx,
1517 1592 :
1518 1592 : write_lock: tokio::sync::Mutex::new(()),
1519 1592 :
1520 1592 : gc_info: std::sync::RwLock::new(GcInfo {
1521 1592 : retain_lsns: Vec::new(),
1522 1592 : horizon_cutoff: Lsn(0),
1523 1592 : pitr_cutoff: Lsn(0),
1524 1592 : }),
1525 1592 :
1526 1592 : latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
1527 1592 : initdb_lsn: metadata.initdb_lsn(),
1528 1592 :
1529 1592 : current_logical_size: if disk_consistent_lsn.is_valid() {
1530 : // we're creating timeline data with some layer files existing locally,
1531 : // need to recalculate timeline's logical size based on data in the layers.
1532 907 : LogicalSize::deferred_initial(disk_consistent_lsn)
1533 : } else {
1534 : // we're creating timeline data without any layers existing locally,
1535 : // initial logical size is 0.
1536 685 : LogicalSize::empty_initial()
1537 : },
1538 1592 : partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
1539 1592 : repartition_threshold: 0,
1540 1592 :
1541 1592 : last_received_wal: Mutex::new(None),
1542 1592 : rel_size_cache: RwLock::new(HashMap::new()),
1543 1592 :
1544 1592 : download_all_remote_layers_task_info: RwLock::new(None),
1545 1592 :
1546 1592 : state,
1547 1592 :
1548 1592 : eviction_task_timeline_state: tokio::sync::Mutex::new(
1549 1592 : EvictionTaskTimelineState::default(),
1550 1592 : ),
1551 1592 : delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
1552 1592 :
1553 1592 : cancel,
1554 1592 : gate: Gate::default(),
1555 1592 :
1556 1592 : compaction_lock: tokio::sync::Mutex::default(),
1557 1592 : gc_lock: tokio::sync::Mutex::default(),
1558 1592 : };
1559 1592 : result.repartition_threshold =
1560 1592 : result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
1561 1592 : result
1562 1592 : .metrics
1563 1592 : .last_record_gauge
1564 1592 : .set(disk_consistent_lsn.0 as i64);
1565 1592 : result
1566 1592 : })
1567 1592 : }
1568 :
1569 2245 : pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
1570 2245 : let Ok(guard) = self.gate.enter() else {
1571 0 : info!("cannot start flush loop when the timeline gate has already been closed");
1572 0 : return;
1573 : };
1574 2245 : let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
1575 2245 : match *flush_loop_state {
1576 1573 : FlushLoopState::NotStarted => (),
1577 : FlushLoopState::Running { .. } => {
1578 672 : info!(
1579 672 : "skipping attempt to start flush_loop twice {}/{}",
1580 672 : self.tenant_shard_id, self.timeline_id
1581 672 : );
1582 672 : return;
1583 : }
1584 : FlushLoopState::Exited => {
1585 0 : warn!(
1586 0 : "ignoring attempt to restart exited flush_loop {}/{}",
1587 0 : self.tenant_shard_id, self.timeline_id
1588 0 : );
1589 0 : return;
1590 : }
1591 : }
1592 :
1593 1573 : let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
1594 1573 : let self_clone = Arc::clone(self);
1595 1573 :
1596 1573 : debug!("spawning flush loop");
1597 1573 : *flush_loop_state = FlushLoopState::Running {
1598 1573 : #[cfg(test)]
1599 1573 : expect_initdb_optimization: false,
1600 1573 : #[cfg(test)]
1601 1573 : initdb_optimization_count: 0,
1602 1573 : };
1603 1573 : task_mgr::spawn(
1604 1573 : task_mgr::BACKGROUND_RUNTIME.handle(),
1605 1573 : task_mgr::TaskKind::LayerFlushTask,
1606 1573 : Some(self.tenant_shard_id),
1607 1573 : Some(self.timeline_id),
1608 1573 : "layer flush task",
1609 : false,
1610 1573 : async move {
1611 1573 : let _guard = guard;
1612 1573 : let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
1613 31609 : self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
1614 592 : let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
1615 592 : assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
1616 592 : *flush_loop_state = FlushLoopState::Exited;
1617 592 : Ok(())
1618 592 : }
1619 1573 : .instrument(info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
1620 : );
1621 2245 : }
1622 :
1623 : /// Creates and starts the wal receiver.
1624 : ///
1625 : /// This function is expected to be called at most once per Timeline's lifecycle
1626 : /// when the timeline is activated.
1627 1257 : fn launch_wal_receiver(
1628 1257 : self: &Arc<Self>,
1629 1257 : ctx: &RequestContext,
1630 1257 : broker_client: BrokerClientChannel,
1631 1257 : ) {
1632 1257 : info!(
1633 1257 : "launching WAL receiver for timeline {} of tenant {}",
1634 1257 : self.timeline_id, self.tenant_shard_id
1635 1257 : );
1636 :
1637 1257 : let tenant_conf_guard = self.tenant_conf.read().unwrap();
1638 1257 : let wal_connect_timeout = tenant_conf_guard
1639 1257 : .tenant_conf
1640 1257 : .walreceiver_connect_timeout
1641 1257 : .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
1642 1257 : let lagging_wal_timeout = tenant_conf_guard
1643 1257 : .tenant_conf
1644 1257 : .lagging_wal_timeout
1645 1257 : .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
1646 1257 : let max_lsn_wal_lag = tenant_conf_guard
1647 1257 : .tenant_conf
1648 1257 : .max_lsn_wal_lag
1649 1257 : .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
1650 1257 : drop(tenant_conf_guard);
1651 1257 :
1652 1257 : let mut guard = self.walreceiver.lock().unwrap();
1653 1257 : assert!(
1654 1257 : guard.is_none(),
1655 0 : "multiple launches / re-launches of WAL receiver are not supported"
1656 : );
1657 1257 : *guard = Some(WalReceiver::start(
1658 1257 : Arc::clone(self),
1659 1257 : WalReceiverConf {
1660 1257 : wal_connect_timeout,
1661 1257 : lagging_wal_timeout,
1662 1257 : max_lsn_wal_lag,
1663 1257 : auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
1664 1257 : availability_zone: self.conf.availability_zone.clone(),
1665 1257 : ingest_batch_size: self.conf.ingest_batch_size,
1666 1257 : },
1667 1257 : broker_client,
1668 1257 : ctx,
1669 1257 : ));
1670 1257 : }
1671 :
1672 : /// Initialize with an empty layer map. Used when creating a new timeline.
1673 1149 : pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
1674 1149 : let mut layers = self.layers.try_write().expect(
1675 1149 : "in the context where we call this function, no other task has access to the object",
1676 1149 : );
1677 1149 : layers.initialize_empty(Lsn(start_lsn.0));
1678 1149 : }
1679 :
1680 : /// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
1681 : /// files.
1682 431 : pub(super) async fn load_layer_map(
1683 431 : &self,
1684 431 : disk_consistent_lsn: Lsn,
1685 431 : index_part: Option<IndexPart>,
1686 431 : ) -> anyhow::Result<()> {
1687 : use init::{Decision::*, Discovered, DismissedLayer};
1688 : use LayerFileName::*;
1689 :
1690 431 : let mut guard = self.layers.write().await;
1691 :
1692 431 : let timer = self.metrics.load_layer_map_histo.start_timer();
1693 431 :
1694 431 : // Scan timeline directory and create ImageFileName and DeltaFilename
1695 431 : // structs representing all files on disk
1696 431 : let timeline_path = self
1697 431 : .conf
1698 431 : .timeline_path(&self.tenant_shard_id, &self.timeline_id);
1699 431 : let conf = self.conf;
1700 431 : let span = tracing::Span::current();
1701 431 :
1702 431 : // Copy to move into the task we're about to spawn
1703 431 : let generation = self.generation;
1704 431 : let shard = self.get_shard_index();
1705 431 : let this = self.myself.upgrade().expect("&self method holds the arc");
1706 :
1707 431 : let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
1708 431 : move || {
1709 431 : let _g = span.entered();
1710 431 : let discovered = init::scan_timeline_dir(&timeline_path)?;
1711 431 : let mut discovered_layers = Vec::with_capacity(discovered.len());
1712 431 : let mut unrecognized_files = Vec::new();
1713 431 :
1714 431 : let mut path = timeline_path;
1715 :
1716 13545 : for discovered in discovered {
1717 13114 : let (name, kind) = match discovered {
1718 12632 : Discovered::Layer(file_name, file_size) => {
1719 12632 : discovered_layers.push((file_name, file_size));
1720 12632 : continue;
1721 : }
1722 : Discovered::Metadata | Discovered::IgnoredBackup => {
1723 431 : continue;
1724 : }
1725 0 : Discovered::Unknown(file_name) => {
1726 0 : // we will later error if there are any
1727 0 : unrecognized_files.push(file_name);
1728 0 : continue;
1729 : }
1730 47 : Discovered::Ephemeral(name) => (name, "old ephemeral file"),
1731 1 : Discovered::Temporary(name) => (name, "temporary timeline file"),
1732 3 : Discovered::TemporaryDownload(name) => (name, "temporary download"),
1733 : };
1734 51 : path.push(Utf8Path::new(&name));
1735 51 : init::cleanup(&path, kind)?;
1736 51 : path.pop();
1737 : }
1738 :
1739 431 : if !unrecognized_files.is_empty() {
1740 : // assume that if there are any there are many many.
1741 0 : let n = unrecognized_files.len();
1742 0 : let first = &unrecognized_files[..n.min(10)];
1743 0 : anyhow::bail!(
1744 0 : "unrecognized files in timeline dir (total {n}), first 10: {first:?}"
1745 0 : );
1746 431 : }
1747 431 :
1748 431 : let decided = init::reconcile(
1749 431 : discovered_layers,
1750 431 : index_part.as_ref(),
1751 431 : disk_consistent_lsn,
1752 431 : generation,
1753 431 : shard,
1754 431 : );
1755 431 :
1756 431 : let mut loaded_layers = Vec::new();
1757 431 : let mut needs_cleanup = Vec::new();
1758 431 : let mut total_physical_size = 0;
1759 :
1760 53963 : for (name, decision) in decided {
1761 53532 : let decision = match decision {
1762 11635 : Ok(UseRemote { local, remote }) => {
1763 11635 : // Remote is authoritative, but we may still choose to retain
1764 11635 : // the local file if the contents appear to match
1765 11635 : if local.file_size() == remote.file_size() {
1766 : // Use the local file, but take the remote metadata so that we pick up
1767 : // the correct generation.
1768 11634 : UseLocal(remote)
1769 : } else {
1770 1 : path.push(name.file_name());
1771 1 : init::cleanup_local_file_for_remote(&path, &local, &remote)?;
1772 1 : path.pop();
1773 1 : UseRemote { local, remote }
1774 : }
1775 : }
1776 41409 : Ok(decision) => decision,
1777 2 : Err(DismissedLayer::Future { local }) => {
1778 2 : if local.is_some() {
1779 0 : path.push(name.file_name());
1780 0 : init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
1781 0 : path.pop();
1782 2 : }
1783 2 : needs_cleanup.push(name);
1784 2 : continue;
1785 : }
1786 486 : Err(DismissedLayer::LocalOnly(local)) => {
1787 486 : path.push(name.file_name());
1788 486 : init::cleanup_local_only_file(&path, &name, &local)?;
1789 486 : path.pop();
1790 486 : // this file never existed remotely, we will have to do rework
1791 486 : continue;
1792 : }
1793 : };
1794 :
1795 53044 : match &name {
1796 22921 : Delta(d) => assert!(d.lsn_range.end <= disk_consistent_lsn + 1),
1797 30123 : Image(i) => assert!(i.lsn <= disk_consistent_lsn),
1798 : }
1799 :
1800 53044 : tracing::debug!(layer=%name, ?decision, "applied");
1801 :
1802 53044 : let layer = match decision {
1803 12145 : UseLocal(m) => {
1804 12145 : total_physical_size += m.file_size();
1805 12145 : Layer::for_resident(conf, &this, name, m).drop_eviction_guard()
1806 : }
1807 40898 : Evicted(remote) | UseRemote { remote, .. } => {
1808 40899 : Layer::for_evicted(conf, &this, name, remote)
1809 : }
1810 : };
1811 :
1812 53044 : loaded_layers.push(layer);
1813 : }
1814 431 : Ok((loaded_layers, needs_cleanup, total_physical_size))
1815 431 : }
1816 431 : })
1817 423 : .await
1818 431 : .map_err(anyhow::Error::new)
1819 431 : .and_then(|x| x)?;
1820 :
1821 431 : let num_layers = loaded_layers.len();
1822 431 :
1823 431 : guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
1824 :
1825 431 : if let Some(rtc) = self.remote_client.as_ref() {
1826 431 : rtc.schedule_layer_file_deletion(&needs_cleanup)?;
1827 431 : rtc.schedule_index_upload_for_file_changes()?;
1828 : // This barrier orders above DELETEs before any later operations.
1829 : // This is critical because code executing after the barrier might
1830 : // create again objects with the same key that we just scheduled for deletion.
1831 : // For example, if we just scheduled deletion of an image layer "from the future",
1832 : // later compaction might run again and re-create the same image layer.
1833 : // "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
1834 : // "same" here means same key range and LSN.
1835 : //
1836 : // Without a barrier between above DELETEs and the re-creation's PUTs,
1837 : // the upload queue may execute the PUT first, then the DELETE.
1838 : // In our example, we will end up with an IndexPart referencing a non-existent object.
1839 : //
1840 : // 1. a future image layer is created and uploaded
1841 : // 2. ps restart
1842 : // 3. the future layer from (1) is deleted during load layer map
1843 : // 4. image layer is re-created and uploaded
1844 : // 5. deletion queue would like to delete (1) but actually deletes (4)
1845 : // 6. delete by name works as expected, but it now deletes the wrong (later) version
1846 : //
1847 : // See https://github.com/neondatabase/neon/issues/5878
1848 : //
1849 : // NB: generation numbers naturally protect against this because they disambiguate
1850 : // (1) and (4)
1851 431 : rtc.schedule_barrier()?;
1852 : // Tenant::create_timeline will wait for these uploads to happen before returning, or
1853 : // on retry.
1854 0 : }
1855 :
1856 431 : info!(
1857 431 : "loaded layer map with {} layers at {}, total physical size: {}",
1858 431 : num_layers, disk_consistent_lsn, total_physical_size
1859 431 : );
1860 :
1861 431 : timer.stop_and_record();
1862 431 : Ok(())
1863 431 : }
1864 :
1865 : /// Retrieve current logical size of the timeline.
1866 : ///
1867 : /// The size could be lagging behind the actual number, in case
1868 : /// the initial size calculation has not been run (gets triggered on the first size access).
1869 : ///
1870 : /// return size and boolean flag that shows if the size is exact
1871 665329 : pub(crate) fn get_current_logical_size(
1872 665329 : self: &Arc<Self>,
1873 665329 : priority: GetLogicalSizePriority,
1874 665329 : ctx: &RequestContext,
1875 665329 : ) -> logical_size::CurrentLogicalSize {
1876 665329 : if !self.tenant_shard_id.is_zero() {
1877 : // Logical size is only accurately maintained on shard zero: when called elsewhere, for example
1878 : // when HTTP API is serving a GET for timeline zero, return zero
1879 189 : return logical_size::CurrentLogicalSize::Approximate(logical_size::Approximate::zero());
1880 665140 : }
1881 665140 :
1882 665140 : let current_size = self.current_logical_size.current_size();
1883 665140 : debug!("Current size: {current_size:?}");
1884 :
1885 665140 : match (current_size.accuracy(), priority) {
1886 664356 : (logical_size::Accuracy::Exact, _) => (), // nothing to do
1887 159 : (logical_size::Accuracy::Approximate, GetLogicalSizePriority::Background) => {
1888 159 : // background task will eventually deliver an exact value, we're in no rush
1889 159 : }
1890 : (logical_size::Accuracy::Approximate, GetLogicalSizePriority::User) => {
1891 : // background task is not ready, but user is asking for it now;
1892 : // => make the background task skip the line
1893 : // (The alternative would be to calculate the size here, but,
1894 : // it can actually take a long time if the user has a lot of rels.
1895 : // And we'll inevitable need it again; So, let the background task do the work.)
1896 625 : match self
1897 625 : .current_logical_size
1898 625 : .cancel_wait_for_background_loop_concurrency_limit_semaphore
1899 625 : .get()
1900 : {
1901 625 : Some(cancel) => cancel.cancel(),
1902 : None => {
1903 0 : let state = self.current_state();
1904 0 : if matches!(
1905 0 : state,
1906 : TimelineState::Broken { .. } | TimelineState::Stopping
1907 0 : ) {
1908 0 :
1909 0 : // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
1910 0 : // Don't make noise.
1911 0 : } else {
1912 0 : warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work");
1913 : }
1914 : }
1915 : };
1916 : }
1917 : }
1918 :
1919 665140 : if let CurrentLogicalSize::Approximate(_) = ¤t_size {
1920 784 : if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler {
1921 353 : let first = self
1922 353 : .current_logical_size
1923 353 : .did_return_approximate_to_walreceiver
1924 353 : .compare_exchange(
1925 353 : false,
1926 353 : true,
1927 353 : AtomicOrdering::Relaxed,
1928 353 : AtomicOrdering::Relaxed,
1929 353 : )
1930 353 : .is_ok();
1931 353 : if first {
1932 57 : crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc();
1933 296 : }
1934 431 : }
1935 664356 : }
1936 :
1937 665140 : current_size
1938 665329 : }
1939 :
1940 1206 : fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &RequestContext) {
1941 1206 : let Some(initial_part_end) = self.current_logical_size.initial_part_end else {
1942 : // nothing to do for freshly created timelines;
1943 572 : assert_eq!(
1944 572 : self.current_logical_size.current_size().accuracy(),
1945 572 : logical_size::Accuracy::Exact,
1946 572 : );
1947 572 : self.current_logical_size.initialized.add_permits(1);
1948 572 : return;
1949 : };
1950 :
1951 634 : let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new();
1952 634 : let token = cancel_wait_for_background_loop_concurrency_limit_semaphore.clone();
1953 634 : self.current_logical_size
1954 634 : .cancel_wait_for_background_loop_concurrency_limit_semaphore.set(token)
1955 634 : .expect("initial logical size calculation task must be spawned exactly once per Timeline object");
1956 634 :
1957 634 : let self_clone = Arc::clone(self);
1958 634 : let background_ctx = ctx.detached_child(
1959 634 : TaskKind::InitialLogicalSizeCalculation,
1960 634 : DownloadBehavior::Download,
1961 634 : );
1962 634 : task_mgr::spawn(
1963 634 : task_mgr::BACKGROUND_RUNTIME.handle(),
1964 634 : task_mgr::TaskKind::InitialLogicalSizeCalculation,
1965 634 : Some(self.tenant_shard_id),
1966 634 : Some(self.timeline_id),
1967 634 : "initial size calculation",
1968 : false,
1969 : // NB: don't log errors here, task_mgr will do that.
1970 634 : async move {
1971 634 : let cancel = task_mgr::shutdown_token();
1972 634 : self_clone
1973 634 : .initial_logical_size_calculation_task(
1974 634 : initial_part_end,
1975 634 : cancel_wait_for_background_loop_concurrency_limit_semaphore,
1976 634 : cancel,
1977 634 : background_ctx,
1978 634 : )
1979 73466 : .await;
1980 626 : Ok(())
1981 626 : }
1982 634 : .instrument(info_span!(parent: None, "initial_size_calculation", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id)),
1983 : );
1984 1206 : }
1985 :
1986 634 : async fn initial_logical_size_calculation_task(
1987 634 : self: Arc<Self>,
1988 634 : initial_part_end: Lsn,
1989 634 : skip_concurrency_limiter: CancellationToken,
1990 634 : cancel: CancellationToken,
1991 634 : background_ctx: RequestContext,
1992 634 : ) {
1993 626 : scopeguard::defer! {
1994 626 : // Irrespective of the outcome of this operation, we should unblock anyone waiting for it.
1995 626 : self.current_logical_size.initialized.add_permits(1);
1996 626 : }
1997 :
1998 : enum BackgroundCalculationError {
1999 : Cancelled,
2000 : Other(anyhow::Error),
2001 : }
2002 :
2003 634 : let try_once = |attempt: usize| {
2004 634 : let background_ctx = &background_ctx;
2005 634 : let self_ref = &self;
2006 634 : let skip_concurrency_limiter = &skip_concurrency_limiter;
2007 634 : async move {
2008 634 : let cancel = task_mgr::shutdown_token();
2009 634 : let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
2010 634 : BackgroundLoopKind::InitialLogicalSizeCalculation,
2011 634 : background_ctx,
2012 634 : );
2013 :
2014 : use crate::metrics::initial_logical_size::StartCircumstances;
2015 634 : let (_maybe_permit, circumstances) = tokio::select! {
2016 214 : permit = wait_for_permit => {
2017 : (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit)
2018 : }
2019 : _ = self_ref.cancel.cancelled() => {
2020 : return Err(BackgroundCalculationError::Cancelled);
2021 : }
2022 : _ = cancel.cancelled() => {
2023 : return Err(BackgroundCalculationError::Cancelled);
2024 : },
2025 : () = skip_concurrency_limiter.cancelled() => {
2026 : // Some action that is part of a end user interaction requested logical size
2027 : // => break out of the rate limit
2028 : // TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime;
2029 : // but then again what happens if they cancel; also, we should just be using
2030 : // one runtime across the entire process, so, let's leave this for now.
2031 : (None, StartCircumstances::SkippedConcurrencyLimiter)
2032 : }
2033 : };
2034 :
2035 617 : let metrics_guard = if attempt == 1 {
2036 617 : crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances)
2037 : } else {
2038 0 : crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
2039 : };
2040 :
2041 617 : match self_ref
2042 617 : .logical_size_calculation_task(
2043 617 : initial_part_end,
2044 617 : LogicalSizeCalculationCause::Initial,
2045 617 : background_ctx,
2046 617 : )
2047 72975 : .await
2048 : {
2049 577 : Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
2050 : Err(CalculateLogicalSizeError::Cancelled) => {
2051 27 : Err(BackgroundCalculationError::Cancelled)
2052 : }
2053 2 : Err(CalculateLogicalSizeError::Other(err)) => {
2054 1 : if let Some(PageReconstructError::AncestorStopping(_)) =
2055 2 : err.root_cause().downcast_ref()
2056 : {
2057 0 : Err(BackgroundCalculationError::Cancelled)
2058 : } else {
2059 2 : Err(BackgroundCalculationError::Other(err))
2060 : }
2061 : }
2062 : }
2063 623 : }
2064 634 : };
2065 :
2066 634 : let retrying = async {
2067 634 : let mut attempt = 0;
2068 634 : loop {
2069 634 : attempt += 1;
2070 634 :
2071 73463 : match try_once(attempt).await {
2072 577 : Ok(res) => return ControlFlow::Continue(res),
2073 44 : Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
2074 2 : Err(BackgroundCalculationError::Other(e)) => {
2075 2 : warn!(attempt, "initial size calculation failed: {e:?}");
2076 : // exponential back-off doesn't make sense at these long intervals;
2077 : // use fixed retry interval with generous jitter instead
2078 2 : let sleep_duration = Duration::from_secs(
2079 2 : u64::try_from(
2080 2 : // 1hour base
2081 2 : (60_i64 * 60_i64)
2082 2 : // 10min jitter
2083 2 : + rand::thread_rng().gen_range(-10 * 60..10 * 60),
2084 2 : )
2085 2 : .expect("10min < 1hour"),
2086 2 : );
2087 2 : tokio::time::sleep(sleep_duration).await;
2088 : }
2089 : }
2090 : }
2091 621 : };
2092 :
2093 634 : let (calculated_size, metrics_guard) = tokio::select! {
2094 621 : res = retrying => {
2095 : match res {
2096 : ControlFlow::Continue(calculated_size) => calculated_size,
2097 : ControlFlow::Break(()) => return,
2098 : }
2099 : }
2100 : _ = cancel.cancelled() => {
2101 : return;
2102 : }
2103 : };
2104 :
2105 : // we cannot query current_logical_size.current_size() to know the current
2106 : // *negative* value, only truncated to u64.
2107 577 : let added = self
2108 577 : .current_logical_size
2109 577 : .size_added_after_initial
2110 577 : .load(AtomicOrdering::Relaxed);
2111 577 :
2112 577 : let sum = calculated_size.saturating_add_signed(added);
2113 577 :
2114 577 : // set the gauge value before it can be set in `update_current_logical_size`.
2115 577 : self.metrics.current_logical_size_gauge.set(sum);
2116 577 :
2117 577 : self.current_logical_size
2118 577 : .initial_logical_size
2119 577 : .set((calculated_size, metrics_guard.calculation_result_saved()))
2120 577 : .ok()
2121 577 : .expect("only this task sets it");
2122 626 : }
2123 :
2124 36 : pub(crate) fn spawn_ondemand_logical_size_calculation(
2125 36 : self: &Arc<Self>,
2126 36 : lsn: Lsn,
2127 36 : cause: LogicalSizeCalculationCause,
2128 36 : ctx: RequestContext,
2129 36 : ) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
2130 36 : let (sender, receiver) = oneshot::channel();
2131 36 : let self_clone = Arc::clone(self);
2132 36 : // XXX if our caller loses interest, i.e., ctx is cancelled,
2133 36 : // we should stop the size calculation work and return an error.
2134 36 : // That would require restructuring this function's API to
2135 36 : // return the result directly, instead of a Receiver for the result.
2136 36 : let ctx = ctx.detached_child(
2137 36 : TaskKind::OndemandLogicalSizeCalculation,
2138 36 : DownloadBehavior::Download,
2139 36 : );
2140 36 : task_mgr::spawn(
2141 36 : task_mgr::BACKGROUND_RUNTIME.handle(),
2142 36 : task_mgr::TaskKind::OndemandLogicalSizeCalculation,
2143 36 : Some(self.tenant_shard_id),
2144 36 : Some(self.timeline_id),
2145 36 : "ondemand logical size calculation",
2146 36 : false,
2147 36 : async move {
2148 36 : let res = self_clone
2149 36 : .logical_size_calculation_task(lsn, cause, &ctx)
2150 2106 : .await;
2151 36 : let _ = sender.send(res).ok();
2152 36 : Ok(()) // Receiver is responsible for handling errors
2153 36 : }
2154 36 : .in_current_span(),
2155 36 : );
2156 36 : receiver
2157 36 : }
2158 :
2159 : /// # Cancel-Safety
2160 : ///
2161 : /// This method is cancellation-safe.
2162 1306 : #[instrument(skip_all)]
2163 : async fn logical_size_calculation_task(
2164 : self: &Arc<Self>,
2165 : lsn: Lsn,
2166 : cause: LogicalSizeCalculationCause,
2167 : ctx: &RequestContext,
2168 : ) -> Result<u64, CalculateLogicalSizeError> {
2169 : crate::span::debug_assert_current_span_has_tenant_and_timeline_id();
2170 : // We should never be calculating logical sizes on shard !=0, because these shards do not have
2171 : // accurate relation sizes, and they do not emit consumption metrics.
2172 : debug_assert!(self.tenant_shard_id.is_zero());
2173 :
2174 : let _guard = self.gate.enter();
2175 :
2176 : let self_calculation = Arc::clone(self);
2177 :
2178 653 : let mut calculation = pin!(async {
2179 653 : let ctx = ctx.attached_child();
2180 653 : self_calculation
2181 653 : .calculate_logical_size(lsn, cause, &ctx)
2182 75084 : .await
2183 642 : });
2184 :
2185 75730 : tokio::select! {
2186 641 : res = &mut calculation => { res }
2187 : _ = self.cancel.cancelled() => {
2188 0 : debug!("cancelling logical size calculation for timeline shutdown");
2189 : calculation.await
2190 : }
2191 : _ = task_mgr::shutdown_watcher() => {
2192 0 : debug!("cancelling logical size calculation for task shutdown");
2193 : calculation.await
2194 : }
2195 : }
2196 : }
2197 :
2198 : /// Calculate the logical size of the database at the latest LSN.
2199 : ///
2200 : /// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
2201 : /// especially if we need to download remote layers.
2202 : ///
2203 : /// # Cancel-Safety
2204 : ///
2205 : /// This method is cancellation-safe.
2206 660 : async fn calculate_logical_size(
2207 660 : &self,
2208 660 : up_to_lsn: Lsn,
2209 660 : cause: LogicalSizeCalculationCause,
2210 660 : ctx: &RequestContext,
2211 660 : ) -> Result<u64, CalculateLogicalSizeError> {
2212 660 : info!(
2213 660 : "Calculating logical size for timeline {} at {}",
2214 660 : self.timeline_id, up_to_lsn
2215 660 : );
2216 : // These failpoints are used by python tests to ensure that we don't delete
2217 : // the timeline while the logical size computation is ongoing.
2218 : // The first failpoint is used to make this function pause.
2219 : // Then the python test initiates timeline delete operation in a thread.
2220 : // It waits for a few seconds, then arms the second failpoint and disables
2221 : // the first failpoint. The second failpoint prints an error if the timeline
2222 : // delete code has deleted the on-disk state while we're still running here.
2223 : // It shouldn't do that. If it does it anyway, the error will be caught
2224 : // by the test suite, highlighting the problem.
2225 0 : fail::fail_point!("timeline-calculate-logical-size-pause");
2226 660 : fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
2227 2 : if !self
2228 2 : .conf
2229 2 : .metadata_path(&self.tenant_shard_id, &self.timeline_id)
2230 2 : .exists()
2231 : {
2232 0 : error!("timeline-calculate-logical-size-pre metadata file does not exist")
2233 2 : }
2234 : // need to return something
2235 2 : Ok(0)
2236 660 : });
2237 : // See if we've already done the work for initial size calculation.
2238 : // This is a short-cut for timelines that are mostly unused.
2239 658 : if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
2240 6 : return Ok(size);
2241 652 : }
2242 652 : let storage_time_metrics = match cause {
2243 : LogicalSizeCalculationCause::Initial
2244 : | LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
2245 639 : | LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
2246 : LogicalSizeCalculationCause::EvictionTaskImitation => {
2247 13 : &self.metrics.imitate_logical_size_histo
2248 : }
2249 : };
2250 652 : let timer = storage_time_metrics.start_timer();
2251 652 : let logical_size = self
2252 652 : .get_current_logical_size_non_incremental(up_to_lsn, ctx)
2253 76220 : .await?;
2254 0 : debug!("calculated logical size: {logical_size}");
2255 611 : timer.stop_and_record();
2256 611 : Ok(logical_size)
2257 649 : }
2258 :
2259 : /// Update current logical size, adding `delta' to the old value.
2260 640015 : fn update_current_logical_size(&self, delta: i64) {
2261 640015 : let logical_size = &self.current_logical_size;
2262 640015 : logical_size.increment_size(delta);
2263 640015 :
2264 640015 : // Also set the value in the prometheus gauge. Note that
2265 640015 : // there is a race condition here: if this is is called by two
2266 640015 : // threads concurrently, the prometheus gauge might be set to
2267 640015 : // one value while current_logical_size is set to the
2268 640015 : // other.
2269 640015 : match logical_size.current_size() {
2270 639929 : CurrentLogicalSize::Exact(ref new_current_size) => self
2271 639929 : .metrics
2272 639929 : .current_logical_size_gauge
2273 639929 : .set(new_current_size.into()),
2274 86 : CurrentLogicalSize::Approximate(_) => {
2275 86 : // don't update the gauge yet, this allows us not to update the gauge back and
2276 86 : // forth between the initial size calculation task.
2277 86 : }
2278 : }
2279 640015 : }
2280 :
2281 730583 : pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: u64) {
2282 730583 : self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed);
2283 730583 : let aux_metric =
2284 730583 : self.directory_metrics[DirectoryKind::AuxFiles.offset()].load(AtomicOrdering::Relaxed);
2285 730583 :
2286 730583 : let sum_of_entries = self
2287 730583 : .directory_metrics
2288 730583 : .iter()
2289 5114081 : .map(|v| v.load(AtomicOrdering::Relaxed))
2290 730583 : .sum();
2291 730583 : // Set a high general threshold and a lower threshold for the auxiliary files,
2292 730583 : // as we can have large numbers of relations in the db directory.
2293 730583 : const SUM_THRESHOLD: u64 = 5000;
2294 730583 : const AUX_THRESHOLD: u64 = 1000;
2295 730583 : if sum_of_entries >= SUM_THRESHOLD || aux_metric >= AUX_THRESHOLD {
2296 0 : self.metrics
2297 0 : .directory_entries_count_gauge
2298 0 : .set(sum_of_entries);
2299 730583 : } else if let Some(metric) = Lazy::get(&self.metrics.directory_entries_count_gauge) {
2300 0 : metric.set(sum_of_entries);
2301 730583 : }
2302 730583 : }
2303 :
2304 2208 : async fn find_layer(&self, layer_file_name: &str) -> Option<Layer> {
2305 2208 : let guard = self.layers.read().await;
2306 772744 : for historic_layer in guard.layer_map().iter_historic_layers() {
2307 772744 : let historic_layer_name = historic_layer.filename().file_name();
2308 772744 : if layer_file_name == historic_layer_name {
2309 2208 : return Some(guard.get_from_desc(&historic_layer));
2310 770536 : }
2311 : }
2312 :
2313 0 : None
2314 2208 : }
2315 :
2316 : /// The timeline heatmap is a hint to secondary locations from the primary location,
2317 : /// indicating which layers are currently on-disk on the primary.
2318 : ///
2319 : /// None is returned if the Timeline is in a state where uploading a heatmap
2320 : /// doesn't make sense, such as shutting down or initializing. The caller
2321 : /// should treat this as a cue to simply skip doing any heatmap uploading
2322 : /// for this timeline.
2323 12 : pub(crate) async fn generate_heatmap(&self) -> Option<HeatMapTimeline> {
2324 : // no point in heatmaps without remote client
2325 12 : let _remote_client = self.remote_client.as_ref()?;
2326 :
2327 12 : if !self.is_active() {
2328 0 : return None;
2329 12 : }
2330 :
2331 12 : let guard = self.layers.read().await;
2332 :
2333 2515 : let resident = guard.resident_layers().map(|layer| {
2334 2515 : let last_activity_ts = layer.access_stats().latest_activity_or_now();
2335 2515 :
2336 2515 : HeatMapLayer::new(
2337 2515 : layer.layer_desc().filename(),
2338 2515 : layer.metadata().into(),
2339 2515 : last_activity_ts,
2340 2515 : )
2341 2515 : });
2342 :
2343 12 : let layers = resident.collect().await;
2344 :
2345 12 : Some(HeatMapTimeline::new(self.timeline_id, layers))
2346 12 : }
2347 : }
2348 :
2349 : type TraversalId = String;
2350 :
2351 : trait TraversalLayerExt {
2352 : fn traversal_id(&self) -> TraversalId;
2353 : }
2354 :
2355 : impl TraversalLayerExt for Layer {
2356 1429 : fn traversal_id(&self) -> TraversalId {
2357 1429 : self.local_path().to_string()
2358 1429 : }
2359 : }
2360 :
2361 : impl TraversalLayerExt for Arc<InMemoryLayer> {
2362 346 : fn traversal_id(&self) -> TraversalId {
2363 346 : format!("timeline {} in-memory {self}", self.get_timeline_id())
2364 346 : }
2365 : }
2366 :
2367 : impl Timeline {
2368 : ///
2369 : /// Get a handle to a Layer for reading.
2370 : ///
2371 : /// The returned Layer might be from an ancestor timeline, if the
2372 : /// segment hasn't been updated on this timeline yet.
2373 : ///
2374 : /// This function takes the current timeline's locked LayerMap as an argument,
2375 : /// so callers can avoid potential race conditions.
2376 : ///
2377 : /// # Cancel-Safety
2378 : ///
2379 : /// This method is cancellation-safe.
2380 7322321 : async fn get_reconstruct_data(
2381 7322321 : &self,
2382 7322321 : key: Key,
2383 7322321 : request_lsn: Lsn,
2384 7322321 : reconstruct_state: &mut ValueReconstructState,
2385 7322321 : ctx: &RequestContext,
2386 7322329 : ) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
2387 7322329 : // Start from the current timeline.
2388 7322329 : let mut timeline_owned;
2389 7322329 : let mut timeline = self;
2390 7322329 :
2391 7322329 : let mut read_count = scopeguard::guard(0, |cnt| {
2392 7322321 : crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
2393 7322329 : });
2394 7322329 :
2395 7322329 : // For debugging purposes, collect the path of layers that we traversed
2396 7322329 : // through. It's included in the error message if we fail to find the key.
2397 7322329 : let mut traversal_path = Vec::<TraversalPathItem>::new();
2398 :
2399 7322329 : let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
2400 1710871 : *cached_lsn
2401 : } else {
2402 5611458 : Lsn(0)
2403 : };
2404 :
2405 : // 'prev_lsn' tracks the last LSN that we were at in our search. It's used
2406 : // to check that each iteration make some progress, to break infinite
2407 : // looping if something goes wrong.
2408 7322329 : let mut prev_lsn = Lsn(u64::MAX);
2409 7322329 :
2410 7322329 : let mut result = ValueReconstructResult::Continue;
2411 7322329 : let mut cont_lsn = Lsn(request_lsn.0 + 1);
2412 :
2413 31648875 : 'outer: loop {
2414 31648875 : if self.cancel.is_cancelled() {
2415 39 : return Err(PageReconstructError::Cancelled);
2416 31648836 : }
2417 31648836 :
2418 31648836 : // The function should have updated 'state'
2419 31648836 : //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
2420 31648836 : match result {
2421 5643396 : ValueReconstructResult::Complete => return Ok(traversal_path),
2422 : ValueReconstructResult::Continue => {
2423 : // If we reached an earlier cached page image, we're done.
2424 26005433 : if cont_lsn == cached_lsn + 1 {
2425 1677581 : MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
2426 1677581 : return Ok(traversal_path);
2427 24327852 : }
2428 24327852 : if prev_lsn <= cont_lsn {
2429 : // Didn't make any progress in last iteration. Error out to avoid
2430 : // getting stuck in the loop.
2431 1268 : return Err(layer_traversal_error(format!(
2432 1268 : "could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
2433 1268 : key,
2434 1268 : Lsn(cont_lsn.0 - 1),
2435 1268 : request_lsn,
2436 1268 : timeline.ancestor_lsn
2437 1268 : ), traversal_path));
2438 24326584 : }
2439 24326584 : prev_lsn = cont_lsn;
2440 : }
2441 : ValueReconstructResult::Missing => {
2442 : return Err(layer_traversal_error(
2443 7 : if cfg!(test) {
2444 6 : format!(
2445 6 : "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}\n{}",
2446 6 : key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
2447 6 : )
2448 : } else {
2449 1 : format!(
2450 1 : "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}",
2451 1 : key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn
2452 1 : )
2453 : },
2454 7 : traversal_path,
2455 : ));
2456 : }
2457 : }
2458 :
2459 : // Recurse into ancestor if needed
2460 24326584 : if is_inherited_key(key) && Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
2461 0 : trace!(
2462 0 : "going into ancestor {}, cont_lsn is {}",
2463 0 : timeline.ancestor_lsn,
2464 0 : cont_lsn
2465 0 : );
2466 :
2467 1312450 : timeline_owned = timeline.get_ready_ancestor_timeline(ctx).await?;
2468 1312446 : timeline = &*timeline_owned;
2469 1312446 : prev_lsn = Lsn(u64::MAX);
2470 1312446 : continue 'outer;
2471 23014133 : }
2472 :
2473 23014133 : let guard = timeline.layers.read().await;
2474 23014132 : let layers = guard.layer_map();
2475 :
2476 : // Check the open and frozen in-memory layers first, in order from newest
2477 : // to oldest.
2478 23014132 : if let Some(open_layer) = &layers.open_layer {
2479 17878268 : let start_lsn = open_layer.get_lsn_range().start;
2480 17878268 : if cont_lsn > start_lsn {
2481 : //info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
2482 : // Get all the data needed to reconstruct the page version from this layer.
2483 : // But if we have an older cached page image, no need to go past that.
2484 5733008 : let lsn_floor = max(cached_lsn + 1, start_lsn);
2485 5733008 : result = match open_layer
2486 5733008 : .get_value_reconstruct_data(
2487 5733008 : key,
2488 5733008 : lsn_floor..cont_lsn,
2489 5733008 : reconstruct_state,
2490 5733008 : ctx,
2491 5733008 : )
2492 667848 : .await
2493 : {
2494 5733008 : Ok(result) => result,
2495 0 : Err(e) => return Err(PageReconstructError::from(e)),
2496 : };
2497 5733008 : cont_lsn = lsn_floor;
2498 5733008 : // metrics: open_layer does not count as fs access, so we are not updating `read_count`
2499 5733008 : traversal_path.push((
2500 5733008 : result,
2501 5733008 : cont_lsn,
2502 5733008 : Box::new({
2503 5733008 : let open_layer = Arc::clone(open_layer);
2504 5733008 : move || open_layer.traversal_id()
2505 5733008 : }),
2506 5733008 : ));
2507 5733008 : continue 'outer;
2508 12145260 : }
2509 5135864 : }
2510 17281124 : for frozen_layer in layers.frozen_layers.iter().rev() {
2511 1281763 : let start_lsn = frozen_layer.get_lsn_range().start;
2512 1281763 : if cont_lsn > start_lsn {
2513 : //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
2514 292140 : let lsn_floor = max(cached_lsn + 1, start_lsn);
2515 292140 : result = match frozen_layer
2516 292140 : .get_value_reconstruct_data(
2517 292140 : key,
2518 292140 : lsn_floor..cont_lsn,
2519 292140 : reconstruct_state,
2520 292140 : ctx,
2521 292140 : )
2522 23211 : .await
2523 : {
2524 292140 : Ok(result) => result,
2525 0 : Err(e) => return Err(PageReconstructError::from(e)),
2526 : };
2527 292140 : cont_lsn = lsn_floor;
2528 292140 : // metrics: open_layer does not count as fs access, so we are not updating `read_count`
2529 292140 : traversal_path.push((
2530 292140 : result,
2531 292140 : cont_lsn,
2532 292140 : Box::new({
2533 292140 : let frozen_layer = Arc::clone(frozen_layer);
2534 292140 : move || frozen_layer.traversal_id()
2535 292140 : }),
2536 292140 : ));
2537 292140 : continue 'outer;
2538 989623 : }
2539 : }
2540 :
2541 16988984 : if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
2542 16843804 : let layer = guard.get_from_desc(&layer);
2543 16843804 : // Get all the data needed to reconstruct the page version from this layer.
2544 16843804 : // But if we have an older cached page image, no need to go past that.
2545 16843804 : let lsn_floor = max(cached_lsn + 1, lsn_floor);
2546 16843804 : result = match layer
2547 16843804 : .get_value_reconstruct_data(key, lsn_floor..cont_lsn, reconstruct_state, ctx)
2548 1021995 : .await
2549 : {
2550 16843772 : Ok(result) => result,
2551 21 : Err(e) => return Err(PageReconstructError::from(e)),
2552 : };
2553 16843772 : cont_lsn = lsn_floor;
2554 16843772 : *read_count += 1;
2555 16843772 : traversal_path.push((
2556 16843772 : result,
2557 16843772 : cont_lsn,
2558 16843772 : Box::new({
2559 16843772 : let layer = layer.to_owned();
2560 16843772 : move || layer.traversal_id()
2561 16843772 : }),
2562 16843772 : ));
2563 16843772 : continue 'outer;
2564 145180 : } else if timeline.ancestor_timeline.is_some() {
2565 : // Nothing on this timeline. Traverse to parent
2566 145177 : result = ValueReconstructResult::Continue;
2567 145177 : cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
2568 145177 : continue 'outer;
2569 : } else {
2570 : // Nothing found
2571 3 : result = ValueReconstructResult::Missing;
2572 3 : continue 'outer;
2573 : }
2574 : }
2575 7322316 : }
2576 :
2577 : /// # Cancel-safety
2578 : ///
2579 : /// This method is cancellation-safe.
2580 7398626 : async fn lookup_cached_page(
2581 7398626 : &self,
2582 7398626 : key: &Key,
2583 7398626 : lsn: Lsn,
2584 7398626 : ctx: &RequestContext,
2585 7398634 : ) -> Option<(Lsn, Bytes)> {
2586 7398634 : let cache = page_cache::get();
2587 :
2588 : // FIXME: It's pointless to check the cache for things that are not 8kB pages.
2589 : // We should look at the key to determine if it's a cacheable object
2590 7398634 : let (lsn, read_guard) = cache
2591 7398634 : .lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
2592 5611458 : .await?;
2593 1787176 : let img = Bytes::from(read_guard.to_vec());
2594 1787176 : Some((lsn, img))
2595 7398634 : }
2596 :
2597 1312449 : async fn get_ready_ancestor_timeline(
2598 1312449 : &self,
2599 1312449 : ctx: &RequestContext,
2600 1312450 : ) -> Result<Arc<Timeline>, GetReadyAncestorError> {
2601 1312450 : let ancestor = match self.get_ancestor_timeline() {
2602 1312450 : Ok(timeline) => timeline,
2603 0 : Err(e) => return Err(GetReadyAncestorError::from(e)),
2604 : };
2605 :
2606 : // It's possible that the ancestor timeline isn't active yet, or
2607 : // is active but hasn't yet caught up to the branch point. Wait
2608 : // for it.
2609 : //
2610 : // This cannot happen while the pageserver is running normally,
2611 : // because you cannot create a branch from a point that isn't
2612 : // present in the pageserver yet. However, we don't wait for the
2613 : // branch point to be uploaded to cloud storage before creating
2614 : // a branch. I.e., the branch LSN need not be remote consistent
2615 : // for the branching operation to succeed.
2616 : //
2617 : // Hence, if we try to load a tenant in such a state where
2618 : // 1. the existence of the branch was persisted (in IndexPart and/or locally)
2619 : // 2. but the ancestor state is behind branch_lsn because it was not yet persisted
2620 : // then we will need to wait for the ancestor timeline to
2621 : // re-stream WAL up to branch_lsn before we access it.
2622 : //
2623 : // How can a tenant get in such a state?
2624 : // - ungraceful pageserver process exit
2625 : // - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
2626 : //
2627 : // NB: this could be avoided by requiring
2628 : // branch_lsn >= remote_consistent_lsn
2629 : // during branch creation.
2630 1312450 : match ancestor.wait_to_become_active(ctx).await {
2631 1312446 : Ok(()) => {}
2632 : Err(TimelineState::Stopping) => {
2633 2 : return Err(GetReadyAncestorError::AncestorStopping(
2634 2 : ancestor.timeline_id,
2635 2 : ));
2636 : }
2637 2 : Err(state) => {
2638 2 : return Err(GetReadyAncestorError::Other(anyhow::anyhow!(
2639 2 : "Timeline {} will not become active. Current state: {:?}",
2640 2 : ancestor.timeline_id,
2641 2 : &state,
2642 2 : )));
2643 : }
2644 : }
2645 1312446 : ancestor
2646 1312446 : .wait_lsn(self.ancestor_lsn, ctx)
2647 5 : .await
2648 1312446 : .map_err(|e| match e {
2649 0 : e @ WaitLsnError::Timeout(_) => GetReadyAncestorError::AncestorLsnTimeout(e),
2650 0 : WaitLsnError::Shutdown => GetReadyAncestorError::Cancelled,
2651 0 : e @ WaitLsnError::BadState => GetReadyAncestorError::Other(anyhow::anyhow!(e)),
2652 1312446 : })?;
2653 :
2654 1312446 : Ok(ancestor)
2655 1312450 : }
2656 :
2657 1312449 : fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
2658 1312449 : let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
2659 0 : format!(
2660 0 : "Ancestor is missing. Timeline id: {} Ancestor id {:?}",
2661 0 : self.timeline_id,
2662 0 : self.get_ancestor_timeline_id(),
2663 0 : )
2664 1312449 : })?;
2665 1312449 : Ok(Arc::clone(ancestor))
2666 1312449 : }
2667 :
2668 1084703 : pub(crate) fn get_shard_identity(&self) -> &ShardIdentity {
2669 1084703 : &self.shard_identity
2670 1084703 : }
2671 :
2672 : ///
2673 : /// Get a handle to the latest layer for appending.
2674 : ///
2675 2907667 : async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
2676 2907667 : let mut guard = self.layers.write().await;
2677 2907667 : let layer = guard
2678 2907667 : .get_layer_for_write(
2679 2907667 : lsn,
2680 2907667 : self.get_last_record_lsn(),
2681 2907667 : self.conf,
2682 2907667 : self.timeline_id,
2683 2907667 : self.tenant_shard_id,
2684 2907667 : )
2685 307 : .await?;
2686 2907667 : Ok(layer)
2687 2907667 : }
2688 :
2689 1214102 : async fn put_value(
2690 1214102 : &self,
2691 1214102 : key: Key,
2692 1214102 : lsn: Lsn,
2693 1214102 : val: &Value,
2694 1214102 : ctx: &RequestContext,
2695 1214102 : ) -> anyhow::Result<()> {
2696 : //info!("PUT: key {} at {}", key, lsn);
2697 1214102 : let layer = self.get_layer_for_write(lsn).await?;
2698 1214102 : layer.put_value(key, lsn, val, ctx).await?;
2699 1214102 : Ok(())
2700 1214102 : }
2701 :
2702 1674304 : async fn put_values(
2703 1674304 : &self,
2704 1674304 : values: &HashMap<Key, Vec<(Lsn, Value)>>,
2705 1674304 : ctx: &RequestContext,
2706 1674304 : ) -> anyhow::Result<()> {
2707 : // Pick the first LSN in the batch to get the layer to write to.
2708 1674304 : for lsns in values.values() {
2709 1674304 : if let Some((lsn, _)) = lsns.first() {
2710 1674304 : let layer = self.get_layer_for_write(*lsn).await?;
2711 1674304 : layer.put_values(values, ctx).await?;
2712 1674303 : break;
2713 0 : }
2714 : }
2715 1674303 : Ok(())
2716 1674303 : }
2717 :
2718 19261 : async fn put_tombstones(&self, tombstones: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
2719 19261 : if let Some((_, lsn)) = tombstones.first() {
2720 19261 : let layer = self.get_layer_for_write(*lsn).await?;
2721 19261 : layer.put_tombstones(tombstones).await?;
2722 0 : }
2723 19261 : Ok(())
2724 19261 : }
2725 :
2726 76004085 : pub(crate) fn finish_write(&self, new_lsn: Lsn) {
2727 76004085 : assert!(new_lsn.is_aligned());
2728 :
2729 76004085 : self.metrics.last_record_gauge.set(new_lsn.0 as i64);
2730 76004085 : self.last_record_lsn.advance(new_lsn);
2731 76004085 : }
2732 :
2733 5410 : async fn freeze_inmem_layer(&self, write_lock_held: bool) {
2734 : // Freeze the current open in-memory layer. It will be written to disk on next
2735 : // iteration.
2736 5410 : let _write_guard = if write_lock_held {
2737 3504 : None
2738 : } else {
2739 1906 : Some(self.write_lock.lock().await)
2740 : };
2741 5410 : let mut guard = self.layers.write().await;
2742 5410 : guard
2743 5410 : .try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at)
2744 6 : .await;
2745 5410 : }
2746 :
2747 : /// Layer flusher task's main loop.
2748 1573 : async fn flush_loop(
2749 1573 : self: &Arc<Self>,
2750 1573 : mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
2751 1573 : ctx: &RequestContext,
2752 1573 : ) {
2753 1573 : info!("started flush loop");
2754 : loop {
2755 6110 : tokio::select! {
2756 10845 : _ = self.cancel.cancelled() => {
2757 10845 : info!("shutting down layer flush task");
2758 10845 : break;
2759 10845 : },
2760 10845 : _ = task_mgr::shutdown_watcher() => {
2761 10845 : info!("shutting down layer flush task");
2762 10845 : break;
2763 10845 : },
2764 10845 : _ = layer_flush_start_rx.changed() => {}
2765 10845 : }
2766 :
2767 0 : trace!("waking up");
2768 4544 : let timer = self.metrics.flush_time_histo.start_timer();
2769 4544 : let flush_counter = *layer_flush_start_rx.borrow();
2770 9717 : let result = loop {
2771 9717 : if self.cancel.is_cancelled() {
2772 0 : info!("dropping out of flush loop for timeline shutdown");
2773 : // Note: we do not bother transmitting into [`layer_flush_done_tx`], because
2774 : // anyone waiting on that will respect self.cancel as well: they will stop
2775 : // waiting at the same time we as drop out of this loop.
2776 0 : return;
2777 9717 : }
2778 :
2779 9717 : let layer_to_flush = {
2780 9717 : let guard = self.layers.read().await;
2781 9717 : guard.layer_map().frozen_layers.front().cloned()
2782 : // drop 'layers' lock to allow concurrent reads and writes
2783 : };
2784 9717 : let Some(layer_to_flush) = layer_to_flush else {
2785 4537 : break Ok(());
2786 : };
2787 26672 : match self.flush_frozen_layer(layer_to_flush, ctx).await {
2788 5173 : Ok(()) => {}
2789 : Err(FlushLayerError::Cancelled) => {
2790 1 : info!("dropping out of flush loop for timeline shutdown");
2791 1 : return;
2792 : }
2793 0 : err @ Err(
2794 : FlushLayerError::Other(_) | FlushLayerError::CreateImageLayersError(_),
2795 : ) => {
2796 0 : error!("could not flush frozen layer: {err:?}");
2797 0 : break err;
2798 : }
2799 : }
2800 : };
2801 : // Notify any listeners that we're done
2802 4537 : let _ = self
2803 4537 : .layer_flush_done_tx
2804 4537 : .send_replace((flush_counter, result));
2805 4537 :
2806 4537 : timer.stop_and_record();
2807 : }
2808 592 : }
2809 :
2810 1906 : async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
2811 1906 : let mut rx = self.layer_flush_done_tx.subscribe();
2812 1906 :
2813 1906 : // Increment the flush cycle counter and wake up the flush task.
2814 1906 : // Remember the new value, so that when we listen for the flush
2815 1906 : // to finish, we know when the flush that we initiated has
2816 1906 : // finished, instead of some other flush that was started earlier.
2817 1906 : let mut my_flush_request = 0;
2818 1906 :
2819 1906 : let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
2820 1906 : if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
2821 45 : anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
2822 1861 : }
2823 1861 :
2824 1861 : self.layer_flush_start_tx.send_modify(|counter| {
2825 1861 : my_flush_request = *counter + 1;
2826 1861 : *counter = my_flush_request;
2827 1861 : });
2828 :
2829 3721 : loop {
2830 3721 : {
2831 3721 : let (last_result_counter, last_result) = &*rx.borrow();
2832 3721 : if *last_result_counter >= my_flush_request {
2833 1860 : if let Err(_err) = last_result {
2834 : // We already logged the original error in
2835 : // flush_loop. We cannot propagate it to the caller
2836 : // here, because it might not be Cloneable
2837 0 : anyhow::bail!(
2838 0 : "Could not flush frozen layer. Request id: {}",
2839 0 : my_flush_request
2840 0 : );
2841 : } else {
2842 1860 : return Ok(());
2843 : }
2844 1861 : }
2845 : }
2846 0 : trace!("waiting for flush to complete");
2847 1861 : tokio::select! {
2848 1860 : rx_e = rx.changed() => {
2849 : rx_e?;
2850 : },
2851 : // Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring
2852 : // the notification from [`flush_loop`] that it completed.
2853 : _ = self.cancel.cancelled() => {
2854 1 : tracing::info!("Cancelled layer flush due on timeline shutdown");
2855 : return Ok(())
2856 : }
2857 : };
2858 0 : trace!("done")
2859 : }
2860 1906 : }
2861 :
2862 3504 : fn flush_frozen_layers(&self) {
2863 3504 : self.layer_flush_start_tx.send_modify(|val| *val += 1);
2864 3504 : }
2865 :
2866 : /// Flush one frozen in-memory layer to disk, as a new delta layer.
2867 10360 : #[instrument(skip_all, fields(layer=%frozen_layer))]
2868 : async fn flush_frozen_layer(
2869 : self: &Arc<Self>,
2870 : frozen_layer: Arc<InMemoryLayer>,
2871 : ctx: &RequestContext,
2872 : ) -> Result<(), FlushLayerError> {
2873 : debug_assert_current_span_has_tenant_and_timeline_id();
2874 : // As a special case, when we have just imported an image into the repository,
2875 : // instead of writing out a L0 delta layer, we directly write out image layer
2876 : // files instead. This is possible as long as *all* the data imported into the
2877 : // repository have the same LSN.
2878 : let lsn_range = frozen_layer.get_lsn_range();
2879 : let (layers_to_upload, delta_layer_to_add) =
2880 : if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
2881 : #[cfg(test)]
2882 : match &mut *self.flush_loop_state.lock().unwrap() {
2883 : FlushLoopState::NotStarted | FlushLoopState::Exited => {
2884 : panic!("flush loop not running")
2885 : }
2886 : FlushLoopState::Running {
2887 : initdb_optimization_count,
2888 : ..
2889 : } => {
2890 : *initdb_optimization_count += 1;
2891 : }
2892 : }
2893 : // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
2894 : // require downloading anything during initial import.
2895 : let (partitioning, _lsn) = self
2896 : .repartition(
2897 : self.initdb_lsn,
2898 : self.get_compaction_target_size(),
2899 : EnumSet::empty(),
2900 : ctx,
2901 : )
2902 : .await?;
2903 :
2904 : if self.cancel.is_cancelled() {
2905 : return Err(FlushLayerError::Cancelled);
2906 : }
2907 :
2908 : // For image layers, we add them immediately into the layer map.
2909 : (
2910 : self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
2911 : .await?,
2912 : None,
2913 : )
2914 : } else {
2915 : #[cfg(test)]
2916 : match &mut *self.flush_loop_state.lock().unwrap() {
2917 : FlushLoopState::NotStarted | FlushLoopState::Exited => {
2918 : panic!("flush loop not running")
2919 : }
2920 : FlushLoopState::Running {
2921 : expect_initdb_optimization,
2922 : ..
2923 : } => {
2924 : assert!(!*expect_initdb_optimization, "expected initdb optimization");
2925 : }
2926 : }
2927 : // Normal case, write out a L0 delta layer file.
2928 : // `create_delta_layer` will not modify the layer map.
2929 : // We will remove frozen layer and add delta layer in one atomic operation later.
2930 : let layer = self.create_delta_layer(&frozen_layer, ctx).await?;
2931 : (
2932 : // FIXME: even though we have a single image and single delta layer assumption
2933 : // we push them to vec
2934 : vec![layer.clone()],
2935 : Some(layer),
2936 : )
2937 : };
2938 :
2939 5178 : pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
2940 :
2941 : if self.cancel.is_cancelled() {
2942 : return Err(FlushLayerError::Cancelled);
2943 : }
2944 :
2945 : let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
2946 : let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
2947 :
2948 : // The new on-disk layers are now in the layer map. We can remove the
2949 : // in-memory layer from the map now. The flushed layer is stored in
2950 : // the mapping in `create_delta_layer`.
2951 : let metadata = {
2952 : let mut guard = self.layers.write().await;
2953 :
2954 : if self.cancel.is_cancelled() {
2955 : return Err(FlushLayerError::Cancelled);
2956 : }
2957 :
2958 : guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
2959 :
2960 : if disk_consistent_lsn != old_disk_consistent_lsn {
2961 : assert!(disk_consistent_lsn > old_disk_consistent_lsn);
2962 : self.disk_consistent_lsn.store(disk_consistent_lsn);
2963 :
2964 : // Schedule remote uploads that will reflect our new disk_consistent_lsn
2965 : Some(self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?)
2966 : } else {
2967 : None
2968 : }
2969 : // release lock on 'layers'
2970 : };
2971 :
2972 : // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
2973 : // a compaction can delete the file and then it won't be available for uploads any more.
2974 : // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
2975 : // race situation.
2976 : // See https://github.com/neondatabase/neon/issues/4526
2977 5177 : pausable_failpoint!("flush-frozen-pausable");
2978 :
2979 : // This failpoint is used by another test case `test_pageserver_recovery`.
2980 0 : fail_point!("flush-frozen-exit");
2981 :
2982 : // Update the metadata file, with new 'disk_consistent_lsn'
2983 : //
2984 : // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
2985 : // *all* the layers, to avoid fsyncing the file multiple times.
2986 :
2987 : // If we updated our disk_consistent_lsn, persist the updated metadata to local disk.
2988 : if let Some(metadata) = metadata {
2989 : save_metadata(
2990 : self.conf,
2991 : &self.tenant_shard_id,
2992 : &self.timeline_id,
2993 : &metadata,
2994 : )
2995 : .await
2996 : .context("save_metadata")?;
2997 : }
2998 : Ok(())
2999 : }
3000 :
3001 : /// Update metadata file
3002 5197 : fn schedule_uploads(
3003 5197 : &self,
3004 5197 : disk_consistent_lsn: Lsn,
3005 5197 : layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
3006 5197 : ) -> anyhow::Result<TimelineMetadata> {
3007 5197 : // We can only save a valid 'prev_record_lsn' value on disk if we
3008 5197 : // flushed *all* in-memory changes to disk. We only track
3009 5197 : // 'prev_record_lsn' in memory for the latest processed record, so we
3010 5197 : // don't remember what the correct value that corresponds to some old
3011 5197 : // LSN is. But if we flush everything, then the value corresponding
3012 5197 : // current 'last_record_lsn' is correct and we can store it on disk.
3013 5197 : let RecordLsn {
3014 5197 : last: last_record_lsn,
3015 5197 : prev: prev_record_lsn,
3016 5197 : } = self.last_record_lsn.load();
3017 5197 : let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
3018 1576 : Some(prev_record_lsn)
3019 : } else {
3020 3621 : None
3021 : };
3022 :
3023 5197 : let ancestor_timeline_id = self
3024 5197 : .ancestor_timeline
3025 5197 : .as_ref()
3026 5197 : .map(|ancestor| ancestor.timeline_id);
3027 5197 :
3028 5197 : let metadata = TimelineMetadata::new(
3029 5197 : disk_consistent_lsn,
3030 5197 : ondisk_prev_record_lsn,
3031 5197 : ancestor_timeline_id,
3032 5197 : self.ancestor_lsn,
3033 5197 : *self.latest_gc_cutoff_lsn.read(),
3034 5197 : self.initdb_lsn,
3035 5197 : self.pg_version,
3036 5197 : );
3037 5197 :
3038 5197 : fail_point!("checkpoint-before-saving-metadata", |x| bail!(
3039 0 : "{}",
3040 0 : x.unwrap()
3041 5197 : ));
3042 :
3043 5197 : if let Some(remote_client) = &self.remote_client {
3044 10374 : for layer in layers_to_upload {
3045 5177 : remote_client.schedule_layer_file_upload(layer)?;
3046 : }
3047 5197 : remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
3048 0 : }
3049 :
3050 5197 : Ok(metadata)
3051 5197 : }
3052 :
3053 20 : async fn update_metadata_file(
3054 20 : &self,
3055 20 : disk_consistent_lsn: Lsn,
3056 20 : layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
3057 20 : ) -> anyhow::Result<()> {
3058 20 : let metadata = self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?;
3059 :
3060 20 : save_metadata(
3061 20 : self.conf,
3062 20 : &self.tenant_shard_id,
3063 20 : &self.timeline_id,
3064 20 : &metadata,
3065 20 : )
3066 20 : .await
3067 20 : .context("save_metadata")?;
3068 :
3069 20 : Ok(())
3070 20 : }
3071 :
3072 2 : pub(crate) async fn preserve_initdb_archive(&self) -> anyhow::Result<()> {
3073 2 : if let Some(remote_client) = &self.remote_client {
3074 2 : remote_client
3075 2 : .preserve_initdb_archive(
3076 2 : &self.tenant_shard_id.tenant_id,
3077 2 : &self.timeline_id,
3078 2 : &self.cancel,
3079 2 : )
3080 2 : .await?;
3081 : } else {
3082 0 : bail!("No remote storage configured, but was asked to backup the initdb archive for {} / {}", self.tenant_shard_id.tenant_id, self.timeline_id);
3083 : }
3084 2 : Ok(())
3085 2 : }
3086 :
3087 : // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
3088 : // in layer map immediately. The caller is responsible to put it into the layer map.
3089 5100 : async fn create_delta_layer(
3090 5100 : self: &Arc<Self>,
3091 5100 : frozen_layer: &Arc<InMemoryLayer>,
3092 5100 : ctx: &RequestContext,
3093 5100 : ) -> anyhow::Result<ResidentLayer> {
3094 5100 : let span = tracing::info_span!("blocking");
3095 5100 : let new_delta: ResidentLayer = tokio::task::spawn_blocking({
3096 5100 : let self_clone = Arc::clone(self);
3097 5100 : let frozen_layer = Arc::clone(frozen_layer);
3098 5100 : let ctx = ctx.attached_child();
3099 5100 : move || {
3100 5100 : // Write it out
3101 5100 : // Keep this inside `spawn_blocking` and `Handle::current`
3102 5100 : // as long as the write path is still sync and the read impl
3103 5100 : // is still not fully async. Otherwise executor threads would
3104 5100 : // be blocked.
3105 5100 : let _g = span.entered();
3106 5100 : let new_delta =
3107 5100 : Handle::current().block_on(frozen_layer.write_to_disk(&self_clone, &ctx))?;
3108 5100 : let new_delta_path = new_delta.local_path().to_owned();
3109 5100 :
3110 5100 : // Sync it to disk.
3111 5100 : //
3112 5100 : // We must also fsync the timeline dir to ensure the directory entries for
3113 5100 : // new layer files are durable.
3114 5100 : //
3115 5100 : // NB: timeline dir must be synced _after_ the file contents are durable.
3116 5100 : // So, two separate fsyncs are required, they mustn't be batched.
3117 5100 : //
3118 5100 : // TODO: If we're running inside 'flush_frozen_layers' and there are multiple
3119 5100 : // files to flush, the fsync overhead can be reduces as follows:
3120 5100 : // 1. write them all to temporary file names
3121 5100 : // 2. fsync them
3122 5100 : // 3. rename to the final name
3123 5100 : // 4. fsync the parent directory.
3124 5100 : // Note that (1),(2),(3) today happen inside write_to_disk().
3125 5100 : //
3126 5100 : // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
3127 5100 : par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
3128 5100 : par_fsync::par_fsync(&[self_clone
3129 5100 : .conf
3130 5100 : .timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id)])
3131 5100 : .context("fsync of timeline dir")?;
3132 :
3133 5098 : anyhow::Ok(new_delta)
3134 5100 : }
3135 5100 : })
3136 5098 : .await
3137 5098 : .context("spawn_blocking")
3138 5098 : .and_then(|x| x)?;
3139 :
3140 5098 : Ok(new_delta)
3141 5098 : }
3142 :
3143 1703 : async fn repartition(
3144 1703 : &self,
3145 1703 : lsn: Lsn,
3146 1703 : partition_size: u64,
3147 1703 : flags: EnumSet<CompactFlags>,
3148 1703 : ctx: &RequestContext,
3149 1703 : ) -> anyhow::Result<(KeyPartitioning, Lsn)> {
3150 1703 : {
3151 1703 : let partitioning_guard = self.partitioning.lock().unwrap();
3152 1703 : let distance = lsn.0 - partitioning_guard.1 .0;
3153 1703 : if partitioning_guard.1 != Lsn(0)
3154 979 : && distance <= self.repartition_threshold
3155 805 : && !flags.contains(CompactFlags::ForceRepartition)
3156 : {
3157 0 : debug!(
3158 0 : distance,
3159 0 : threshold = self.repartition_threshold,
3160 0 : "no repartitioning needed"
3161 0 : );
3162 803 : return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
3163 900 : }
3164 : }
3165 176985 : let keyspace = self.collect_keyspace(lsn, ctx).await?;
3166 898 : let partitioning = keyspace.partition(partition_size);
3167 898 :
3168 898 : let mut partitioning_guard = self.partitioning.lock().unwrap();
3169 898 : if lsn > partitioning_guard.1 {
3170 898 : *partitioning_guard = (partitioning, lsn);
3171 898 : } else {
3172 0 : warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
3173 : }
3174 898 : Ok((partitioning_guard.0.clone(), partitioning_guard.1))
3175 1702 : }
3176 :
3177 : // Is it time to create a new image layer for the given partition?
3178 36833 : async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
3179 36833 : let threshold = self.get_image_creation_threshold();
3180 :
3181 36833 : let guard = self.layers.read().await;
3182 36833 : let layers = guard.layer_map();
3183 36833 :
3184 36833 : let mut max_deltas = 0;
3185 36833 : {
3186 36833 : let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
3187 36833 : if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
3188 4387 : let img_range =
3189 4387 : partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
3190 4387 : if wanted.overlaps(&img_range) {
3191 : //
3192 : // gc_timeline only pays attention to image layers that are older than the GC cutoff,
3193 : // but create_image_layers creates image layers at last-record-lsn.
3194 : // So it's possible that gc_timeline wants a new image layer to be created for a key range,
3195 : // but the range is already covered by image layers at more recent LSNs. Before we
3196 : // create a new image layer, check if the range is already covered at more recent LSNs.
3197 0 : if !layers
3198 0 : .image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))
3199 : {
3200 0 : debug!(
3201 0 : "Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
3202 0 : img_range.start, img_range.end, cutoff_lsn, lsn
3203 0 : );
3204 0 : return true;
3205 0 : }
3206 4387 : }
3207 32446 : }
3208 : }
3209 :
3210 2230963 : for part_range in &partition.ranges {
3211 2200525 : let image_coverage = layers.image_coverage(part_range, lsn);
3212 4051749 : for (img_range, last_img) in image_coverage {
3213 1857619 : let img_lsn = if let Some(last_img) = last_img {
3214 356586 : last_img.get_lsn_range().end
3215 : } else {
3216 1501033 : Lsn(0)
3217 : };
3218 : // Let's consider an example:
3219 : //
3220 : // delta layer with LSN range 71-81
3221 : // delta layer with LSN range 81-91
3222 : // delta layer with LSN range 91-101
3223 : // image layer at LSN 100
3224 : //
3225 : // If 'lsn' is still 100, i.e. no new WAL has been processed since the last image layer,
3226 : // there's no need to create a new one. We check this case explicitly, to avoid passing
3227 : // a bogus range to count_deltas below, with start > end. It's even possible that there
3228 : // are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
3229 : // after we read last_record_lsn, which is passed here in the 'lsn' argument.
3230 1857619 : if img_lsn < lsn {
3231 1824740 : let num_deltas =
3232 1824740 : layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold));
3233 1824740 :
3234 1824740 : max_deltas = max_deltas.max(num_deltas);
3235 1824740 : if num_deltas >= threshold {
3236 0 : debug!(
3237 0 : "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
3238 0 : img_range.start, img_range.end, num_deltas, img_lsn, lsn
3239 0 : );
3240 6395 : return true;
3241 1818345 : }
3242 32879 : }
3243 : }
3244 : }
3245 :
3246 0 : debug!(
3247 0 : max_deltas,
3248 0 : "none of the partitioned ranges had >= {threshold} deltas"
3249 0 : );
3250 30438 : false
3251 36833 : }
3252 :
3253 3398 : #[tracing::instrument(skip_all, fields(%lsn, %force))]
3254 : async fn create_image_layers(
3255 : self: &Arc<Timeline>,
3256 : partitioning: &KeyPartitioning,
3257 : lsn: Lsn,
3258 : force: bool,
3259 : ctx: &RequestContext,
3260 : ) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
3261 : let timer = self.metrics.create_images_time_histo.start_timer();
3262 : let mut image_layers = Vec::new();
3263 :
3264 : // We need to avoid holes between generated image layers.
3265 : // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
3266 : // image layer with hole between them. In this case such layer can not be utilized by GC.
3267 : //
3268 : // How such hole between partitions can appear?
3269 : // if we have relation with relid=1 and size 100 and relation with relid=2 with size 200 then result of
3270 : // KeySpace::partition may contain partitions <100000000..100000099> and <200000000..200000199>.
3271 : // If there is delta layer <100000000..300000000> then it never be garbage collected because
3272 : // image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
3273 : let mut start = Key::MIN;
3274 :
3275 : for partition in partitioning.parts.iter() {
3276 : let img_range = start..partition.ranges.last().unwrap().end;
3277 : start = img_range.end;
3278 : if force || self.time_for_new_image_layer(partition, lsn).await {
3279 : let mut image_layer_writer = ImageLayerWriter::new(
3280 : self.conf,
3281 : self.timeline_id,
3282 : self.tenant_shard_id,
3283 : &img_range,
3284 : lsn,
3285 : )
3286 : .await?;
3287 :
3288 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
3289 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
3290 0 : "failpoint image-layer-writer-fail-before-finish"
3291 0 : )))
3292 0 : });
3293 :
3294 : let mut key_request_accum = KeySpaceAccum::new();
3295 : for range in &partition.ranges {
3296 : let mut key = range.start;
3297 : while key < range.end {
3298 : if self.shard_identity.is_key_disposable(&key) {
3299 0 : debug!(
3300 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
3301 0 : key,
3302 0 : self.shard_identity.get_shard_number(&key)
3303 0 : );
3304 : key = key.next();
3305 : continue;
3306 : }
3307 :
3308 : key_request_accum.add_key(key);
3309 : if key_request_accum.size() >= Timeline::MAX_GET_VECTORED_KEYS
3310 : || key.next() == range.end
3311 : {
3312 : let results = self
3313 : .get_vectored(
3314 : &key_request_accum.consume_keyspace().ranges,
3315 : lsn,
3316 : ctx,
3317 : )
3318 : .await?;
3319 :
3320 : for (img_key, img) in results {
3321 : let img = match img {
3322 : Ok(img) => img,
3323 : Err(err) => {
3324 : // If we fail to reconstruct a VM or FSM page, we can zero the
3325 : // page without losing any actual user data. That seems better
3326 : // than failing repeatedly and getting stuck.
3327 : //
3328 : // We had a bug at one point, where we truncated the FSM and VM
3329 : // in the pageserver, but the Postgres didn't know about that
3330 : // and continued to generate incremental WAL records for pages
3331 : // that didn't exist in the pageserver. Trying to replay those
3332 : // WAL records failed to find the previous image of the page.
3333 : // This special case allows us to recover from that situation.
3334 : // See https://github.com/neondatabase/neon/issues/2601.
3335 : //
3336 : // Unfortunately we cannot do this for the main fork, or for
3337 : // any metadata keys, keys, as that would lead to actual data
3338 : // loss.
3339 : if is_rel_fsm_block_key(img_key)
3340 : || is_rel_vm_block_key(img_key)
3341 : {
3342 0 : warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
3343 : ZERO_PAGE.clone()
3344 : } else {
3345 : return Err(
3346 : CreateImageLayersError::PageReconstructError(err),
3347 : );
3348 : }
3349 : }
3350 : };
3351 :
3352 : image_layer_writer.put_image(img_key, img).await?;
3353 : }
3354 : }
3355 :
3356 : key = key.next();
3357 : }
3358 : }
3359 : let image_layer = image_layer_writer.finish(self).await?;
3360 : image_layers.push(image_layer);
3361 : }
3362 : }
3363 : // All layers that the GC wanted us to create have now been created.
3364 : //
3365 : // It's possible that another GC cycle happened while we were compacting, and added
3366 : // something new to wanted_image_layers, and we now clear that before processing it.
3367 : // That's OK, because the next GC iteration will put it back in.
3368 : *self.wanted_image_layers.lock().unwrap() = None;
3369 :
3370 : // Sync the new layer to disk before adding it to the layer map, to make sure
3371 : // we don't garbage collect something based on the new layer, before it has
3372 : // reached the disk.
3373 : //
3374 : // We must also fsync the timeline dir to ensure the directory entries for
3375 : // new layer files are durable
3376 : //
3377 : // Compaction creates multiple image layers. It would be better to create them all
3378 : // and fsync them all in parallel.
3379 : let all_paths = image_layers
3380 : .iter()
3381 6475 : .map(|layer| layer.local_path().to_owned())
3382 : .collect::<Vec<_>>();
3383 :
3384 : par_fsync::par_fsync_async(&all_paths)
3385 : .await
3386 : .context("fsync of newly created layer files")?;
3387 :
3388 : if !all_paths.is_empty() {
3389 : par_fsync::par_fsync_async(&[self
3390 : .conf
3391 : .timeline_path(&self.tenant_shard_id, &self.timeline_id)])
3392 : .await
3393 : .context("fsync of timeline dir")?;
3394 : }
3395 :
3396 : let mut guard = self.layers.write().await;
3397 :
3398 : // FIXME: we could add the images to be uploaded *before* returning from here, but right
3399 : // now they are being scheduled outside of write lock
3400 : guard.track_new_image_layers(&image_layers, &self.metrics);
3401 : drop_wlock(guard);
3402 : timer.stop_and_record();
3403 :
3404 : Ok(image_layers)
3405 : }
3406 :
3407 : /// Wait until the background initial logical size calculation is complete, or
3408 : /// this Timeline is shut down. Calling this function will cause the initial
3409 : /// logical size calculation to skip waiting for the background jobs barrier.
3410 257 : pub(crate) async fn await_initial_logical_size(self: Arc<Self>) {
3411 257 : if let Some(await_bg_cancel) = self
3412 257 : .current_logical_size
3413 257 : .cancel_wait_for_background_loop_concurrency_limit_semaphore
3414 257 : .get()
3415 244 : {
3416 244 : await_bg_cancel.cancel();
3417 244 : } else {
3418 : // We should not wait if we were not able to explicitly instruct
3419 : // the logical size cancellation to skip the concurrency limit semaphore.
3420 : // TODO: this is an unexpected case. We should restructure so that it
3421 : // can't happen.
3422 13 : tracing::info!(
3423 13 : "await_initial_logical_size: can't get semaphore cancel token, skipping"
3424 13 : );
3425 : }
3426 :
3427 257 : tokio::select!(
3428 495 : _ = self.current_logical_size.initialized.acquire() => {},
3429 495 : _ = self.cancel.cancelled() => {}
3430 495 : )
3431 243 : }
3432 : }
3433 :
3434 1324 : #[derive(Default)]
3435 : struct CompactLevel0Phase1Result {
3436 : new_layers: Vec<ResidentLayer>,
3437 : deltas_to_compact: Vec<Layer>,
3438 : }
3439 :
3440 : /// Top-level failure to compact.
3441 0 : #[derive(Debug, thiserror::Error)]
3442 : pub(crate) enum CompactionError {
3443 : #[error("The timeline or pageserver is shutting down")]
3444 : ShuttingDown,
3445 : /// Compaction cannot be done right now; page reconstruction and so on.
3446 : #[error(transparent)]
3447 : Other(#[from] anyhow::Error),
3448 : }
3449 :
3450 : #[serde_as]
3451 4102 : #[derive(serde::Serialize)]
3452 : struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
3453 :
3454 11347 : #[derive(Default)]
3455 : enum DurationRecorder {
3456 : #[default]
3457 : NotStarted,
3458 : Recorded(RecordedDuration, tokio::time::Instant),
3459 : }
3460 :
3461 : impl DurationRecorder {
3462 3094 : fn till_now(&self) -> DurationRecorder {
3463 3094 : match self {
3464 : DurationRecorder::NotStarted => {
3465 0 : panic!("must only call on recorded measurements")
3466 : }
3467 3094 : DurationRecorder::Recorded(_, ended) => {
3468 3094 : let now = tokio::time::Instant::now();
3469 3094 : DurationRecorder::Recorded(RecordedDuration(now - *ended), now)
3470 3094 : }
3471 3094 : }
3472 3094 : }
3473 2051 : fn into_recorded(self) -> Option<RecordedDuration> {
3474 2051 : match self {
3475 0 : DurationRecorder::NotStarted => None,
3476 2051 : DurationRecorder::Recorded(recorded, _) => Some(recorded),
3477 : }
3478 2051 : }
3479 : }
3480 :
3481 1621 : #[derive(Default)]
3482 : struct CompactLevel0Phase1StatsBuilder {
3483 : version: Option<u64>,
3484 : tenant_id: Option<TenantShardId>,
3485 : timeline_id: Option<TimelineId>,
3486 : read_lock_acquisition_micros: DurationRecorder,
3487 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
3488 : read_lock_held_key_sort_micros: DurationRecorder,
3489 : read_lock_held_prerequisites_micros: DurationRecorder,
3490 : read_lock_held_compute_holes_micros: DurationRecorder,
3491 : read_lock_drop_micros: DurationRecorder,
3492 : write_layer_files_micros: DurationRecorder,
3493 : level0_deltas_count: Option<usize>,
3494 : new_deltas_count: Option<usize>,
3495 : new_deltas_size: Option<u64>,
3496 : }
3497 :
3498 293 : #[derive(serde::Serialize)]
3499 : struct CompactLevel0Phase1Stats {
3500 : version: u64,
3501 : tenant_id: TenantShardId,
3502 : timeline_id: TimelineId,
3503 : read_lock_acquisition_micros: RecordedDuration,
3504 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
3505 : read_lock_held_key_sort_micros: RecordedDuration,
3506 : read_lock_held_prerequisites_micros: RecordedDuration,
3507 : read_lock_held_compute_holes_micros: RecordedDuration,
3508 : read_lock_drop_micros: RecordedDuration,
3509 : write_layer_files_micros: RecordedDuration,
3510 : level0_deltas_count: usize,
3511 : new_deltas_count: usize,
3512 : new_deltas_size: u64,
3513 : }
3514 :
3515 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
3516 : type Error = anyhow::Error;
3517 :
3518 293 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
3519 293 : Ok(Self {
3520 293 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
3521 293 : tenant_id: value
3522 293 : .tenant_id
3523 293 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
3524 293 : timeline_id: value
3525 293 : .timeline_id
3526 293 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
3527 293 : read_lock_acquisition_micros: value
3528 293 : .read_lock_acquisition_micros
3529 293 : .into_recorded()
3530 293 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
3531 293 : read_lock_held_spawn_blocking_startup_micros: value
3532 293 : .read_lock_held_spawn_blocking_startup_micros
3533 293 : .into_recorded()
3534 293 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
3535 293 : read_lock_held_key_sort_micros: value
3536 293 : .read_lock_held_key_sort_micros
3537 293 : .into_recorded()
3538 293 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
3539 293 : read_lock_held_prerequisites_micros: value
3540 293 : .read_lock_held_prerequisites_micros
3541 293 : .into_recorded()
3542 293 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
3543 293 : read_lock_held_compute_holes_micros: value
3544 293 : .read_lock_held_compute_holes_micros
3545 293 : .into_recorded()
3546 293 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
3547 293 : read_lock_drop_micros: value
3548 293 : .read_lock_drop_micros
3549 293 : .into_recorded()
3550 293 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
3551 293 : write_layer_files_micros: value
3552 293 : .write_layer_files_micros
3553 293 : .into_recorded()
3554 293 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
3555 293 : level0_deltas_count: value
3556 293 : .level0_deltas_count
3557 293 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
3558 293 : new_deltas_count: value
3559 293 : .new_deltas_count
3560 293 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
3561 293 : new_deltas_size: value
3562 293 : .new_deltas_size
3563 293 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
3564 : })
3565 293 : }
3566 : }
3567 :
3568 : impl Timeline {
3569 : /// Level0 files first phase of compaction, explained in the [`Self::compact`] comment.
3570 1621 : async fn compact_level0_phase1(
3571 1621 : self: &Arc<Self>,
3572 1621 : guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
3573 1621 : mut stats: CompactLevel0Phase1StatsBuilder,
3574 1621 : target_file_size: u64,
3575 1621 : ctx: &RequestContext,
3576 1621 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
3577 1621 : stats.read_lock_held_spawn_blocking_startup_micros =
3578 1621 : stats.read_lock_acquisition_micros.till_now(); // set by caller
3579 1621 : let layers = guard.layer_map();
3580 1621 : let level0_deltas = layers.get_level0_deltas()?;
3581 1621 : let mut level0_deltas = level0_deltas
3582 1621 : .into_iter()
3583 7263 : .map(|x| guard.get_from_desc(&x))
3584 1621 : .collect_vec();
3585 1621 : stats.level0_deltas_count = Some(level0_deltas.len());
3586 1621 : // Only compact if enough layers have accumulated.
3587 1621 : let threshold = self.get_compaction_threshold();
3588 1621 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
3589 0 : debug!(
3590 0 : level0_deltas = level0_deltas.len(),
3591 0 : threshold, "too few deltas to compact"
3592 0 : );
3593 1324 : return Ok(CompactLevel0Phase1Result::default());
3594 297 : }
3595 297 :
3596 297 : // This failpoint is used together with `test_duplicate_layers` integration test.
3597 297 : // It returns the compaction result exactly the same layers as input to compaction.
3598 297 : // We want to ensure that this will not cause any problem when updating the layer map
3599 297 : // after the compaction is finished.
3600 297 : //
3601 297 : // Currently, there are two rare edge cases that will cause duplicated layers being
3602 297 : // inserted.
3603 297 : // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
3604 297 : // is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
3605 297 : // map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
3606 297 : // point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
3607 297 : // and this causes an overwrite. This is acceptable because the content is the same, and we should do a
3608 297 : // layer replace instead of the normal remove / upload process.
3609 297 : // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
3610 297 : // size length. Compaction will likely create the same set of n files afterwards.
3611 297 : //
3612 297 : // This failpoint is a superset of both of the cases.
3613 297 : if cfg!(feature = "testing") {
3614 297 : let active = (|| {
3615 297 : ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
3616 297 : false
3617 297 : })();
3618 297 :
3619 297 : if active {
3620 2 : let mut new_layers = Vec::with_capacity(level0_deltas.len());
3621 30 : for delta in &level0_deltas {
3622 : // we are just faking these layers as being produced again for this failpoint
3623 28 : new_layers.push(
3624 28 : delta
3625 28 : .download_and_keep_resident()
3626 0 : .await
3627 28 : .context("download layer for failpoint")?,
3628 : );
3629 : }
3630 2 : tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
3631 2 : return Ok(CompactLevel0Phase1Result {
3632 2 : new_layers,
3633 2 : deltas_to_compact: level0_deltas,
3634 2 : });
3635 295 : }
3636 0 : }
3637 :
3638 : // Gather the files to compact in this iteration.
3639 : //
3640 : // Start with the oldest Level 0 delta file, and collect any other
3641 : // level 0 files that form a contiguous sequence, such that the end
3642 : // LSN of previous file matches the start LSN of the next file.
3643 : //
3644 : // Note that if the files don't form such a sequence, we might
3645 : // "compact" just a single file. That's a bit pointless, but it allows
3646 : // us to get rid of the level 0 file, and compact the other files on
3647 : // the next iteration. This could probably made smarter, but such
3648 : // "gaps" in the sequence of level 0 files should only happen in case
3649 : // of a crash, partial download from cloud storage, or something like
3650 : // that, so it's not a big deal in practice.
3651 7924 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
3652 295 : let mut level0_deltas_iter = level0_deltas.iter();
3653 295 :
3654 295 : let first_level0_delta = level0_deltas_iter.next().unwrap();
3655 295 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
3656 295 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
3657 295 :
3658 295 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
3659 4104 : for l in level0_deltas_iter {
3660 3809 : let lsn_range = &l.layer_desc().lsn_range;
3661 3809 :
3662 3809 : if lsn_range.start != prev_lsn_end {
3663 0 : break;
3664 3809 : }
3665 3809 : deltas_to_compact.push(l.download_and_keep_resident().await?);
3666 3809 : prev_lsn_end = lsn_range.end;
3667 : }
3668 295 : let lsn_range = Range {
3669 295 : start: deltas_to_compact
3670 295 : .first()
3671 295 : .unwrap()
3672 295 : .layer_desc()
3673 295 : .lsn_range
3674 295 : .start,
3675 295 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
3676 295 : };
3677 :
3678 295 : info!(
3679 295 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
3680 295 : lsn_range.start,
3681 295 : lsn_range.end,
3682 295 : deltas_to_compact.len(),
3683 295 : level0_deltas.len()
3684 295 : );
3685 :
3686 4104 : for l in deltas_to_compact.iter() {
3687 4104 : info!("compact includes {l}");
3688 : }
3689 :
3690 : // We don't need the original list of layers anymore. Drop it so that
3691 : // we don't accidentally use it later in the function.
3692 295 : drop(level0_deltas);
3693 295 :
3694 295 : stats.read_lock_held_prerequisites_micros = stats
3695 295 : .read_lock_held_spawn_blocking_startup_micros
3696 295 : .till_now();
3697 295 :
3698 295 : // Determine N largest holes where N is number of compacted layers.
3699 295 : let max_holes = deltas_to_compact.len();
3700 295 : let last_record_lsn = self.get_last_record_lsn();
3701 295 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
3702 295 : let min_hole_coverage_size = 3; // TODO: something more flexible?
3703 295 :
3704 295 : // min-heap (reserve space for one more element added before eviction)
3705 295 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
3706 295 : let mut prev: Option<Key> = None;
3707 295 :
3708 295 : let mut all_keys = Vec::new();
3709 :
3710 4104 : for l in deltas_to_compact.iter() {
3711 4104 : all_keys.extend(l.load_keys(ctx).await?);
3712 : }
3713 :
3714 : // FIXME: should spawn_blocking the rest of this function
3715 :
3716 : // The current stdlib sorting implementation is designed in a way where it is
3717 : // particularly fast where the slice is made up of sorted sub-ranges.
3718 155312530 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
3719 295 :
3720 295 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
3721 :
3722 17300095 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
3723 17300095 : if let Some(prev_key) = prev {
3724 : // just first fast filter
3725 17299800 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
3726 201573 : let key_range = prev_key..next_key;
3727 201573 : // Measuring hole by just subtraction of i128 representation of key range boundaries
3728 201573 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
3729 201573 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
3730 201573 : // That is why it is better to measure size of hole as number of covering image layers.
3731 201573 : let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
3732 201573 : if coverage_size >= min_hole_coverage_size {
3733 235 : heap.push(Hole {
3734 235 : key_range,
3735 235 : coverage_size,
3736 235 : });
3737 235 : if heap.len() > max_holes {
3738 132 : heap.pop(); // remove smallest hole
3739 132 : }
3740 201338 : }
3741 17098227 : }
3742 295 : }
3743 17300095 : prev = Some(next_key.next());
3744 : }
3745 295 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
3746 295 : drop_rlock(guard);
3747 295 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
3748 295 : let mut holes = heap.into_vec();
3749 295 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
3750 295 : let mut next_hole = 0; // index of next hole in holes vector
3751 295 :
3752 295 : // This iterator walks through all key-value pairs from all the layers
3753 295 : // we're compacting, in key, LSN order.
3754 295 : let all_values_iter = all_keys.iter();
3755 295 :
3756 295 : // This iterator walks through all keys and is needed to calculate size used by each key
3757 295 : let mut all_keys_iter = all_keys
3758 295 : .iter()
3759 16960274 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
3760 16959979 : .coalesce(|mut prev, cur| {
3761 16959979 : // Coalesce keys that belong to the same key pair.
3762 16959979 : // This ensures that compaction doesn't put them
3763 16959979 : // into different layer files.
3764 16959979 : // Still limit this by the target file size,
3765 16959979 : // so that we keep the size of the files in
3766 16959979 : // check.
3767 16959979 : if prev.0 == cur.0 && prev.2 < target_file_size {
3768 14778956 : prev.2 += cur.2;
3769 14778956 : Ok(prev)
3770 : } else {
3771 2181023 : Err((prev, cur))
3772 : }
3773 16959979 : });
3774 295 :
3775 295 : // Merge the contents of all the input delta layers into a new set
3776 295 : // of delta layers, based on the current partitioning.
3777 295 : //
3778 295 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
3779 295 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
3780 295 : // would be too large. In that case, we also split on the LSN dimension.
3781 295 : //
3782 295 : // LSN
3783 295 : // ^
3784 295 : // |
3785 295 : // | +-----------+ +--+--+--+--+
3786 295 : // | | | | | | | |
3787 295 : // | +-----------+ | | | | |
3788 295 : // | | | | | | | |
3789 295 : // | +-----------+ ==> | | | | |
3790 295 : // | | | | | | | |
3791 295 : // | +-----------+ | | | | |
3792 295 : // | | | | | | | |
3793 295 : // | +-----------+ +--+--+--+--+
3794 295 : // |
3795 295 : // +--------------> key
3796 295 : //
3797 295 : //
3798 295 : // If one key (X) has a lot of page versions:
3799 295 : //
3800 295 : // LSN
3801 295 : // ^
3802 295 : // | (X)
3803 295 : // | +-----------+ +--+--+--+--+
3804 295 : // | | | | | | | |
3805 295 : // | +-----------+ | | +--+ |
3806 295 : // | | | | | | | |
3807 295 : // | +-----------+ ==> | | | | |
3808 295 : // | | | | | +--+ |
3809 295 : // | +-----------+ | | | | |
3810 295 : // | | | | | | | |
3811 295 : // | +-----------+ +--+--+--+--+
3812 295 : // |
3813 295 : // +--------------> key
3814 295 : // TODO: this actually divides the layers into fixed-size chunks, not
3815 295 : // based on the partitioning.
3816 295 : //
3817 295 : // TODO: we should also opportunistically materialize and
3818 295 : // garbage collect what we can.
3819 295 : let mut new_layers = Vec::new();
3820 295 : let mut prev_key: Option<Key> = None;
3821 295 : let mut writer: Option<DeltaLayerWriter> = None;
3822 295 : let mut key_values_total_size = 0u64;
3823 295 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
3824 295 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
3825 :
3826 : for &DeltaEntry {
3827 16960269 : key, lsn, ref val, ..
3828 16960562 : } in all_values_iter
3829 : {
3830 16960269 : let value = val.load(ctx).await?;
3831 16960268 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
3832 16960268 : // We need to check key boundaries once we reach next key or end of layer with the same key
3833 16960268 : if !same_key || lsn == dup_end_lsn {
3834 2181314 : let mut next_key_size = 0u64;
3835 2181314 : let is_dup_layer = dup_end_lsn.is_valid();
3836 2181314 : dup_start_lsn = Lsn::INVALID;
3837 2181314 : if !same_key {
3838 2181277 : dup_end_lsn = Lsn::INVALID;
3839 2181277 : }
3840 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
3841 2181316 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
3842 2181316 : next_key_size = next_size;
3843 2181316 : if key != next_key {
3844 2180984 : if dup_end_lsn.is_valid() {
3845 12 : // We are writting segment with duplicates:
3846 12 : // place all remaining values of this key in separate segment
3847 12 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
3848 12 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
3849 2180972 : }
3850 2180984 : break;
3851 332 : }
3852 332 : key_values_total_size += next_size;
3853 332 : // Check if it is time to split segment: if total keys size is larger than target file size.
3854 332 : // We need to avoid generation of empty segments if next_size > target_file_size.
3855 332 : if key_values_total_size > target_file_size && lsn != next_lsn {
3856 : // Split key between multiple layers: such layer can contain only single key
3857 37 : dup_start_lsn = if dup_end_lsn.is_valid() {
3858 25 : dup_end_lsn // new segment with duplicates starts where old one stops
3859 : } else {
3860 12 : lsn // start with the first LSN for this key
3861 : };
3862 37 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
3863 37 : break;
3864 295 : }
3865 : }
3866 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
3867 2181314 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
3868 0 : dup_start_lsn = dup_end_lsn;
3869 0 : dup_end_lsn = lsn_range.end;
3870 2181314 : }
3871 2181314 : if writer.is_some() {
3872 2181019 : let written_size = writer.as_mut().unwrap().size();
3873 2181019 : let contains_hole =
3874 2181019 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
3875 : // check if key cause layer overflow or contains hole...
3876 2181019 : if is_dup_layer
3877 2180970 : || dup_end_lsn.is_valid()
3878 2180960 : || written_size + key_values_total_size > target_file_size
3879 2170682 : || contains_hole
3880 : {
3881 : // ... if so, flush previous layer and prepare to write new one
3882 10438 : new_layers.push(
3883 10438 : writer
3884 10438 : .take()
3885 10438 : .unwrap()
3886 10438 : .finish(prev_key.unwrap().next(), self)
3887 1284 : .await?,
3888 : );
3889 10438 : writer = None;
3890 10438 :
3891 10438 : if contains_hole {
3892 103 : // skip hole
3893 103 : next_hole += 1;
3894 10335 : }
3895 2170581 : }
3896 295 : }
3897 : // Remember size of key value because at next iteration we will access next item
3898 2181314 : key_values_total_size = next_key_size;
3899 14778954 : }
3900 16960268 : if writer.is_none() {
3901 10733 : // Create writer if not initiaized yet
3902 10733 : writer = Some(
3903 : DeltaLayerWriter::new(
3904 10733 : self.conf,
3905 10733 : self.timeline_id,
3906 10733 : self.tenant_shard_id,
3907 10733 : key,
3908 10733 : if dup_end_lsn.is_valid() {
3909 : // this is a layer containing slice of values of the same key
3910 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
3911 49 : dup_start_lsn..dup_end_lsn
3912 : } else {
3913 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
3914 10684 : lsn_range.clone()
3915 : },
3916 : )
3917 10 : .await?,
3918 : );
3919 16949535 : }
3920 :
3921 16960268 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
3922 0 : Err(CompactionError::Other(anyhow::anyhow!(
3923 0 : "failpoint delta-layer-writer-fail-before-finish"
3924 0 : )))
3925 16960268 : });
3926 :
3927 16960268 : if !self.shard_identity.is_key_disposable(&key) {
3928 16960268 : writer.as_mut().unwrap().put_value(key, lsn, value).await?;
3929 : } else {
3930 0 : debug!(
3931 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
3932 0 : key,
3933 0 : self.shard_identity.get_shard_number(&key)
3934 0 : );
3935 : }
3936 :
3937 16960267 : if !new_layers.is_empty() {
3938 13666937 : fail_point!("after-timeline-compacted-first-L1");
3939 13666937 : }
3940 :
3941 16960267 : prev_key = Some(key);
3942 : }
3943 293 : if let Some(writer) = writer {
3944 293 : new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
3945 0 : }
3946 :
3947 : // Sync layers
3948 293 : if !new_layers.is_empty() {
3949 : // Print a warning if the created layer is larger than double the target size
3950 : // Add two pages for potential overhead. This should in theory be already
3951 : // accounted for in the target calculation, but for very small targets,
3952 : // we still might easily hit the limit otherwise.
3953 293 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
3954 10726 : for layer in new_layers.iter() {
3955 10726 : if layer.layer_desc().file_size > warn_limit {
3956 0 : warn!(
3957 0 : %layer,
3958 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
3959 0 : );
3960 10726 : }
3961 : }
3962 :
3963 : // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
3964 293 : let layer_paths: Vec<Utf8PathBuf> = new_layers
3965 293 : .iter()
3966 10726 : .map(|l| l.local_path().to_owned())
3967 293 : .collect();
3968 293 :
3969 293 : // Fsync all the layer files and directory using multiple threads to
3970 293 : // minimize latency.
3971 293 : par_fsync::par_fsync_async(&layer_paths)
3972 516 : .await
3973 293 : .context("fsync all new layers")?;
3974 :
3975 293 : let timeline_dir = self
3976 293 : .conf
3977 293 : .timeline_path(&self.tenant_shard_id, &self.timeline_id);
3978 293 :
3979 293 : par_fsync::par_fsync_async(&[timeline_dir])
3980 321 : .await
3981 293 : .context("fsync of timeline dir")?;
3982 0 : }
3983 :
3984 293 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
3985 293 : stats.new_deltas_count = Some(new_layers.len());
3986 10726 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
3987 293 :
3988 293 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
3989 293 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
3990 : {
3991 293 : Ok(stats_json) => {
3992 293 : info!(
3993 293 : stats_json = stats_json.as_str(),
3994 293 : "compact_level0_phase1 stats available"
3995 293 : )
3996 : }
3997 0 : Err(e) => {
3998 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
3999 : }
4000 : }
4001 :
4002 293 : Ok(CompactLevel0Phase1Result {
4003 293 : new_layers,
4004 293 : deltas_to_compact: deltas_to_compact
4005 293 : .into_iter()
4006 4077 : .map(|x| x.drop_eviction_guard())
4007 293 : .collect::<Vec<_>>(),
4008 293 : })
4009 1619 : }
4010 :
4011 : ///
4012 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
4013 : /// as Level 1 files.
4014 : ///
4015 1621 : async fn compact_level0(
4016 1621 : self: &Arc<Self>,
4017 1621 : target_file_size: u64,
4018 1621 : ctx: &RequestContext,
4019 1621 : ) -> Result<(), CompactionError> {
4020 : let CompactLevel0Phase1Result {
4021 1619 : new_layers,
4022 1619 : deltas_to_compact,
4023 : } = {
4024 1621 : let phase1_span = info_span!("compact_level0_phase1");
4025 1621 : let ctx = ctx.attached_child();
4026 1621 : let mut stats = CompactLevel0Phase1StatsBuilder {
4027 1621 : version: Some(2),
4028 1621 : tenant_id: Some(self.tenant_shard_id),
4029 1621 : timeline_id: Some(self.timeline_id),
4030 1621 : ..Default::default()
4031 1621 : };
4032 1621 :
4033 1621 : let begin = tokio::time::Instant::now();
4034 1621 : let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
4035 1621 : let now = tokio::time::Instant::now();
4036 1621 : stats.read_lock_acquisition_micros =
4037 1621 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
4038 1621 : self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
4039 1621 : .instrument(phase1_span)
4040 314557 : .await?
4041 : };
4042 :
4043 1619 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
4044 : // nothing to do
4045 1324 : return Ok(());
4046 295 : }
4047 :
4048 295 : let mut guard = self.layers.write().await;
4049 :
4050 295 : let mut duplicated_layers = HashSet::new();
4051 295 :
4052 295 : let mut insert_layers = Vec::with_capacity(new_layers.len());
4053 :
4054 11049 : for l in &new_layers {
4055 10754 : if guard.contains(l.as_ref()) {
4056 : // expected in tests
4057 28 : tracing::error!(layer=%l, "duplicated L1 layer");
4058 :
4059 : // good ways to cause a duplicate: we repeatedly error after taking the writelock
4060 : // `guard` on self.layers. as of writing this, there are no error returns except
4061 : // for compact_level0_phase1 creating an L0, which does not happen in practice
4062 : // because we have not implemented L0 => L0 compaction.
4063 28 : duplicated_layers.insert(l.layer_desc().key());
4064 10726 : } else if LayerMap::is_l0(l.layer_desc()) {
4065 0 : return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
4066 10726 : } else {
4067 10726 : insert_layers.push(l.clone());
4068 10726 : }
4069 : }
4070 :
4071 295 : let remove_layers = {
4072 295 : let mut deltas_to_compact = deltas_to_compact;
4073 295 : // only remove those inputs which were not outputs
4074 4105 : deltas_to_compact.retain(|l| !duplicated_layers.contains(&l.layer_desc().key()));
4075 295 : deltas_to_compact
4076 295 : };
4077 295 :
4078 295 : // deletion will happen later, the layer file manager calls garbage_collect_on_drop
4079 295 : guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
4080 :
4081 295 : if let Some(remote_client) = self.remote_client.as_ref() {
4082 295 : remote_client.schedule_compaction_update(&remove_layers, &new_layers)?;
4083 0 : }
4084 :
4085 295 : drop_wlock(guard);
4086 295 :
4087 295 : Ok(())
4088 1619 : }
4089 :
4090 : /// Update information about which layer files need to be retained on
4091 : /// garbage collection. This is separate from actually performing the GC,
4092 : /// and is updated more frequently, so that compaction can remove obsolete
4093 : /// page versions more aggressively.
4094 : ///
4095 : /// TODO: that's wishful thinking, compaction doesn't actually do that
4096 : /// currently.
4097 : ///
4098 : /// The caller specifies how much history is needed with the 3 arguments:
4099 : ///
4100 : /// retain_lsns: keep a version of each page at these LSNs
4101 : /// cutoff_horizon: also keep everything newer than this LSN
4102 : /// pitr: the time duration required to keep data for PITR
4103 : ///
4104 : /// The 'retain_lsns' list is currently used to prevent removing files that
4105 : /// are needed by child timelines. In the future, the user might be able to
4106 : /// name additional points in time to retain. The caller is responsible for
4107 : /// collecting that information.
4108 : ///
4109 : /// The 'cutoff_horizon' point is used to retain recent versions that might still be
4110 : /// needed by read-only nodes. (As of this writing, the caller just passes
4111 : /// the latest LSN subtracted by a constant, and doesn't do anything smart
4112 : /// to figure out what read-only nodes might actually need.)
4113 : ///
4114 : /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
4115 : /// whether a record is needed for PITR.
4116 : ///
4117 : /// NOTE: This function holds a short-lived lock to protect the 'gc_info'
4118 : /// field, so that the three values passed as argument are stored
4119 : /// atomically. But the caller is responsible for ensuring that no new
4120 : /// branches are created that would need to be included in 'retain_lsns',
4121 : /// for example. The caller should hold `Tenant::gc_cs` lock to ensure
4122 : /// that.
4123 : ///
4124 1678 : #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
4125 : pub(super) async fn update_gc_info(
4126 : &self,
4127 : retain_lsns: Vec<Lsn>,
4128 : cutoff_horizon: Lsn,
4129 : pitr: Duration,
4130 : cancel: &CancellationToken,
4131 : ctx: &RequestContext,
4132 : ) -> anyhow::Result<()> {
4133 : // First, calculate pitr_cutoff_timestamp and then convert it to LSN.
4134 : //
4135 : // Some unit tests depend on garbage-collection working even when
4136 : // CLOG data is missing, so that find_lsn_for_timestamp() doesn't
4137 : // work, so avoid calling it altogether if time-based retention is not
4138 : // configured. It would be pointless anyway.
4139 : let pitr_cutoff = if pitr != Duration::ZERO {
4140 : let now = SystemTime::now();
4141 : if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
4142 : let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
4143 :
4144 : match self
4145 : .find_lsn_for_timestamp(pitr_timestamp, cancel, ctx)
4146 : .await?
4147 : {
4148 : LsnForTimestamp::Present(lsn) => lsn,
4149 : LsnForTimestamp::Future(lsn) => {
4150 : // The timestamp is in the future. That sounds impossible,
4151 : // but what it really means is that there hasn't been
4152 : // any commits since the cutoff timestamp.
4153 0 : debug!("future({})", lsn);
4154 : cutoff_horizon
4155 : }
4156 : LsnForTimestamp::Past(lsn) => {
4157 0 : debug!("past({})", lsn);
4158 : // conservative, safe default is to remove nothing, when we
4159 : // have no commit timestamp data available
4160 : *self.get_latest_gc_cutoff_lsn()
4161 : }
4162 : LsnForTimestamp::NoData(lsn) => {
4163 0 : debug!("nodata({})", lsn);
4164 : // conservative, safe default is to remove nothing, when we
4165 : // have no commit timestamp data available
4166 : *self.get_latest_gc_cutoff_lsn()
4167 : }
4168 : }
4169 : } else {
4170 : // If we don't have enough data to convert to LSN,
4171 : // play safe and don't remove any layers.
4172 : *self.get_latest_gc_cutoff_lsn()
4173 : }
4174 : } else {
4175 : // No time-based retention was configured. Set time-based cutoff to
4176 : // same as LSN based.
4177 : cutoff_horizon
4178 : };
4179 :
4180 : // Grab the lock and update the values
4181 : *self.gc_info.write().unwrap() = GcInfo {
4182 : retain_lsns,
4183 : horizon_cutoff: cutoff_horizon,
4184 : pitr_cutoff,
4185 : };
4186 :
4187 : Ok(())
4188 : }
4189 :
4190 : /// Garbage collect layer files on a timeline that are no longer needed.
4191 : ///
4192 : /// Currently, we don't make any attempt at removing unneeded page versions
4193 : /// within a layer file. We can only remove the whole file if it's fully
4194 : /// obsolete.
4195 776 : pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
4196 776 : // this is most likely the background tasks, but it might be the spawned task from
4197 776 : // immediate_gc
4198 776 : let cancel = crate::task_mgr::shutdown_token();
4199 776 : let _g = tokio::select! {
4200 776 : guard = self.gc_lock.lock() => guard,
4201 : _ = self.cancel.cancelled() => return Ok(GcResult::default()),
4202 : _ = cancel.cancelled() => return Ok(GcResult::default()),
4203 : };
4204 776 : let timer = self.metrics.garbage_collect_histo.start_timer();
4205 :
4206 0 : fail_point!("before-timeline-gc");
4207 :
4208 : // Is the timeline being deleted?
4209 776 : if self.is_stopping() {
4210 0 : anyhow::bail!("timeline is Stopping");
4211 776 : }
4212 776 :
4213 776 : let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
4214 776 : let gc_info = self.gc_info.read().unwrap();
4215 776 :
4216 776 : let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
4217 776 : let pitr_cutoff = gc_info.pitr_cutoff;
4218 776 : let retain_lsns = gc_info.retain_lsns.clone();
4219 776 : (horizon_cutoff, pitr_cutoff, retain_lsns)
4220 776 : };
4221 776 :
4222 776 : let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
4223 :
4224 776 : let res = self
4225 776 : .gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
4226 776 : .instrument(
4227 776 : info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
4228 : )
4229 207 : .await?;
4230 :
4231 : // only record successes
4232 776 : timer.stop_and_record();
4233 776 :
4234 776 : Ok(res)
4235 776 : }
4236 :
4237 776 : async fn gc_timeline(
4238 776 : &self,
4239 776 : horizon_cutoff: Lsn,
4240 776 : pitr_cutoff: Lsn,
4241 776 : retain_lsns: Vec<Lsn>,
4242 776 : new_gc_cutoff: Lsn,
4243 776 : ) -> anyhow::Result<GcResult> {
4244 776 : let now = SystemTime::now();
4245 776 : let mut result: GcResult = GcResult::default();
4246 776 :
4247 776 : // Nothing to GC. Return early.
4248 776 : let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
4249 776 : if latest_gc_cutoff >= new_gc_cutoff {
4250 95 : info!(
4251 95 : "Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
4252 95 : );
4253 95 : return Ok(result);
4254 681 : }
4255 :
4256 : // We need to ensure that no one tries to read page versions or create
4257 : // branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
4258 : // for details. This will block until the old value is no longer in use.
4259 : //
4260 : // The GC cutoff should only ever move forwards.
4261 681 : let waitlist = {
4262 681 : let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
4263 681 : ensure!(
4264 681 : *write_guard <= new_gc_cutoff,
4265 0 : "Cannot move GC cutoff LSN backwards (was {}, new {})",
4266 0 : *write_guard,
4267 : new_gc_cutoff
4268 : );
4269 681 : write_guard.store_and_unlock(new_gc_cutoff)
4270 681 : };
4271 681 : waitlist.wait().await;
4272 :
4273 681 : info!("GC starting");
4274 :
4275 0 : debug!("retain_lsns: {:?}", retain_lsns);
4276 :
4277 681 : let mut layers_to_remove = Vec::new();
4278 681 : let mut wanted_image_layers = KeySpaceRandomAccum::default();
4279 :
4280 : // Scan all layers in the timeline (remote or on-disk).
4281 : //
4282 : // Garbage collect the layer if all conditions are satisfied:
4283 : // 1. it is older than cutoff LSN;
4284 : // 2. it is older than PITR interval;
4285 : // 3. it doesn't need to be retained for 'retain_lsns';
4286 : // 4. newer on-disk image layers cover the layer's whole key range
4287 : //
4288 : // TODO holding a write lock is too agressive and avoidable
4289 681 : let mut guard = self.layers.write().await;
4290 681 : let layers = guard.layer_map();
4291 16763 : 'outer: for l in layers.iter_historic_layers() {
4292 16763 : result.layers_total += 1;
4293 16763 :
4294 16763 : // 1. Is it newer than GC horizon cutoff point?
4295 16763 : if l.get_lsn_range().end > horizon_cutoff {
4296 0 : debug!(
4297 0 : "keeping {} because it's newer than horizon_cutoff {}",
4298 0 : l.filename(),
4299 0 : horizon_cutoff,
4300 0 : );
4301 2056 : result.layers_needed_by_cutoff += 1;
4302 2056 : continue 'outer;
4303 14707 : }
4304 14707 :
4305 14707 : // 2. It is newer than PiTR cutoff point?
4306 14707 : if l.get_lsn_range().end > pitr_cutoff {
4307 0 : debug!(
4308 0 : "keeping {} because it's newer than pitr_cutoff {}",
4309 0 : l.filename(),
4310 0 : pitr_cutoff,
4311 0 : );
4312 59 : result.layers_needed_by_pitr += 1;
4313 59 : continue 'outer;
4314 14648 : }
4315 :
4316 : // 3. Is it needed by a child branch?
4317 : // NOTE With that we would keep data that
4318 : // might be referenced by child branches forever.
4319 : // We can track this in child timeline GC and delete parent layers when
4320 : // they are no longer needed. This might be complicated with long inheritance chains.
4321 : //
4322 : // TODO Vec is not a great choice for `retain_lsns`
4323 14669 : for retain_lsn in &retain_lsns {
4324 : // start_lsn is inclusive
4325 608 : if &l.get_lsn_range().start <= retain_lsn {
4326 0 : debug!(
4327 0 : "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
4328 0 : l.filename(),
4329 0 : retain_lsn,
4330 0 : l.is_incremental(),
4331 0 : );
4332 587 : result.layers_needed_by_branches += 1;
4333 587 : continue 'outer;
4334 21 : }
4335 : }
4336 :
4337 : // 4. Is there a later on-disk layer for this relation?
4338 : //
4339 : // The end-LSN is exclusive, while disk_consistent_lsn is
4340 : // inclusive. For example, if disk_consistent_lsn is 100, it is
4341 : // OK for a delta layer to have end LSN 101, but if the end LSN
4342 : // is 102, then it might not have been fully flushed to disk
4343 : // before crash.
4344 : //
4345 : // For example, imagine that the following layers exist:
4346 : //
4347 : // 1000 - image (A)
4348 : // 1000-2000 - delta (B)
4349 : // 2000 - image (C)
4350 : // 2000-3000 - delta (D)
4351 : // 3000 - image (E)
4352 : //
4353 : // If GC horizon is at 2500, we can remove layers A and B, but
4354 : // we cannot remove C, even though it's older than 2500, because
4355 : // the delta layer 2000-3000 depends on it.
4356 14061 : if !layers
4357 14061 : .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
4358 : {
4359 0 : debug!("keeping {} because it is the latest layer", l.filename());
4360 : // Collect delta key ranges that need image layers to allow garbage
4361 : // collecting the layers.
4362 : // It is not so obvious whether we need to propagate information only about
4363 : // delta layers. Image layers can form "stairs" preventing old image from been deleted.
4364 : // But image layers are in any case less sparse than delta layers. Also we need some
4365 : // protection from replacing recent image layers with new one after each GC iteration.
4366 12981 : if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
4367 0 : wanted_image_layers.add_range(l.get_key_range());
4368 12981 : }
4369 12981 : result.layers_not_updated += 1;
4370 12981 : continue 'outer;
4371 1080 : }
4372 :
4373 : // We didn't find any reason to keep this file, so remove it.
4374 0 : debug!(
4375 0 : "garbage collecting {} is_dropped: xx is_incremental: {}",
4376 0 : l.filename(),
4377 0 : l.is_incremental(),
4378 0 : );
4379 1080 : layers_to_remove.push(l);
4380 : }
4381 681 : self.wanted_image_layers
4382 681 : .lock()
4383 681 : .unwrap()
4384 681 : .replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
4385 681 :
4386 681 : if !layers_to_remove.is_empty() {
4387 : // Persist the new GC cutoff value in the metadata file, before
4388 : // we actually remove anything.
4389 : //
4390 : // This does not in fact have any effect as we no longer consider local metadata unless
4391 : // running without remote storage.
4392 : //
4393 : // This unconditionally schedules also an index_part.json update, even though, we will
4394 : // be doing one a bit later with the unlinked gc'd layers.
4395 : //
4396 : // TODO: remove when implementing <https://github.com/neondatabase/neon/issues/4099>.
4397 20 : self.update_metadata_file(self.disk_consistent_lsn.load(), None)
4398 20 : .await?;
4399 :
4400 20 : let gc_layers = layers_to_remove
4401 20 : .iter()
4402 1080 : .map(|x| guard.get_from_desc(x))
4403 20 : .collect::<Vec<Layer>>();
4404 20 :
4405 20 : result.layers_removed = gc_layers.len() as u64;
4406 :
4407 20 : if let Some(remote_client) = self.remote_client.as_ref() {
4408 20 : remote_client.schedule_gc_update(&gc_layers)?;
4409 0 : }
4410 :
4411 20 : guard.finish_gc_timeline(&gc_layers);
4412 20 :
4413 20 : #[cfg(feature = "testing")]
4414 20 : {
4415 20 : result.doomed_layers = gc_layers;
4416 20 : }
4417 661 : }
4418 :
4419 681 : info!(
4420 681 : "GC completed removing {} layers, cutoff {}",
4421 681 : result.layers_removed, new_gc_cutoff
4422 681 : );
4423 :
4424 681 : result.elapsed = now.elapsed()?;
4425 681 : Ok(result)
4426 776 : }
4427 :
4428 : /// Reconstruct a value, using the given base image and WAL records in 'data'.
4429 7320969 : async fn reconstruct_value(
4430 7320969 : &self,
4431 7320969 : key: Key,
4432 7320969 : request_lsn: Lsn,
4433 7320969 : mut data: ValueReconstructState,
4434 7320978 : ) -> Result<Bytes, PageReconstructError> {
4435 7320978 : // Perform WAL redo if needed
4436 7320978 : data.records.reverse();
4437 7320978 :
4438 7320978 : // If we have a page image, and no WAL, we're all set
4439 7320978 : if data.records.is_empty() {
4440 5001176 : if let Some((img_lsn, img)) = &data.img {
4441 0 : trace!(
4442 0 : "found page image for key {} at {}, no WAL redo required, req LSN {}",
4443 0 : key,
4444 0 : img_lsn,
4445 0 : request_lsn,
4446 0 : );
4447 5001176 : Ok(img.clone())
4448 : } else {
4449 0 : Err(PageReconstructError::from(anyhow!(
4450 0 : "base image for {key} at {request_lsn} not found"
4451 0 : )))
4452 : }
4453 : } else {
4454 : // We need to do WAL redo.
4455 : //
4456 : // If we don't have a base image, then the oldest WAL record better initialize
4457 : // the page
4458 2319802 : if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
4459 0 : Err(PageReconstructError::from(anyhow!(
4460 0 : "Base image for {} at {} not found, but got {} WAL records",
4461 0 : key,
4462 0 : request_lsn,
4463 0 : data.records.len()
4464 0 : )))
4465 : } else {
4466 2319802 : if data.img.is_some() {
4467 0 : trace!(
4468 0 : "found {} WAL records and a base image for {} at {}, performing WAL redo",
4469 0 : data.records.len(),
4470 0 : key,
4471 0 : request_lsn
4472 0 : );
4473 : } else {
4474 0 : trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
4475 : };
4476 :
4477 2319802 : let last_rec_lsn = data.records.last().unwrap().0;
4478 :
4479 2319802 : let img = match self
4480 2319802 : .walredo_mgr
4481 2319802 : .as_ref()
4482 2319802 : .context("timeline has no walredo manager")
4483 2319802 : .map_err(PageReconstructError::WalRedo)?
4484 2319802 : .request_redo(key, request_lsn, data.img, data.records, self.pg_version)
4485 0 : .await
4486 2319801 : .context("reconstruct a page image")
4487 : {
4488 2319801 : Ok(img) => img,
4489 0 : Err(e) => return Err(PageReconstructError::WalRedo(e)),
4490 : };
4491 :
4492 2319801 : if img.len() == page_cache::PAGE_SZ {
4493 2316191 : let cache = page_cache::get();
4494 2316191 : if let Err(e) = cache
4495 2316191 : .memorize_materialized_page(
4496 2316191 : self.tenant_shard_id,
4497 2316191 : self.timeline_id,
4498 2316191 : key,
4499 2316191 : last_rec_lsn,
4500 2316191 : &img,
4501 2316191 : )
4502 8804 : .await
4503 2316191 : .context("Materialized page memoization failed")
4504 : {
4505 0 : return Err(PageReconstructError::from(e));
4506 2316191 : }
4507 3610 : }
4508 :
4509 2319801 : Ok(img)
4510 : }
4511 : }
4512 7320977 : }
4513 :
4514 2 : pub(crate) async fn spawn_download_all_remote_layers(
4515 2 : self: Arc<Self>,
4516 2 : request: DownloadRemoteLayersTaskSpawnRequest,
4517 2 : ) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
4518 2 : use pageserver_api::models::DownloadRemoteLayersTaskState;
4519 2 :
4520 2 : // this is not really needed anymore; it has tests which really check the return value from
4521 2 : // http api. it would be better not to maintain this anymore.
4522 2 :
4523 2 : let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
4524 2 : if let Some(st) = &*status_guard {
4525 1 : match &st.state {
4526 : DownloadRemoteLayersTaskState::Running => {
4527 0 : return Err(st.clone());
4528 : }
4529 : DownloadRemoteLayersTaskState::ShutDown
4530 1 : | DownloadRemoteLayersTaskState::Completed => {
4531 1 : *status_guard = None;
4532 1 : }
4533 : }
4534 1 : }
4535 :
4536 2 : let self_clone = Arc::clone(&self);
4537 2 : let task_id = task_mgr::spawn(
4538 2 : task_mgr::BACKGROUND_RUNTIME.handle(),
4539 2 : task_mgr::TaskKind::DownloadAllRemoteLayers,
4540 2 : Some(self.tenant_shard_id),
4541 2 : Some(self.timeline_id),
4542 2 : "download all remote layers task",
4543 : false,
4544 2 : async move {
4545 10 : self_clone.download_all_remote_layers(request).await;
4546 2 : let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
4547 2 : match &mut *status_guard {
4548 : None => {
4549 0 : warn!("tasks status is supposed to be Some(), since we are running");
4550 : }
4551 2 : Some(st) => {
4552 2 : let exp_task_id = format!("{}", task_mgr::current_task_id().unwrap());
4553 2 : if st.task_id != exp_task_id {
4554 0 : warn!("task id changed while we were still running, expecting {} but have {}", exp_task_id, st.task_id);
4555 2 : } else {
4556 2 : st.state = DownloadRemoteLayersTaskState::Completed;
4557 2 : }
4558 : }
4559 : };
4560 2 : Ok(())
4561 2 : }
4562 2 : .instrument(info_span!(parent: None, "download_all_remote_layers", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
4563 : );
4564 :
4565 2 : let initial_info = DownloadRemoteLayersTaskInfo {
4566 2 : task_id: format!("{task_id}"),
4567 2 : state: DownloadRemoteLayersTaskState::Running,
4568 2 : total_layer_count: 0,
4569 2 : successful_download_count: 0,
4570 2 : failed_download_count: 0,
4571 2 : };
4572 2 : *status_guard = Some(initial_info.clone());
4573 2 :
4574 2 : Ok(initial_info)
4575 2 : }
4576 :
4577 2 : async fn download_all_remote_layers(
4578 2 : self: &Arc<Self>,
4579 2 : request: DownloadRemoteLayersTaskSpawnRequest,
4580 2 : ) {
4581 : use pageserver_api::models::DownloadRemoteLayersTaskState;
4582 :
4583 2 : let remaining = {
4584 2 : let guard = self.layers.read().await;
4585 2 : guard
4586 2 : .layer_map()
4587 2 : .iter_historic_layers()
4588 10 : .map(|desc| guard.get_from_desc(&desc))
4589 2 : .collect::<Vec<_>>()
4590 2 : };
4591 2 : let total_layer_count = remaining.len();
4592 2 :
4593 2 : macro_rules! lock_status {
4594 2 : ($st:ident) => {
4595 2 : let mut st = self.download_all_remote_layers_task_info.write().unwrap();
4596 2 : let st = st
4597 2 : .as_mut()
4598 2 : .expect("this function is only called after the task has been spawned");
4599 2 : assert_eq!(
4600 2 : st.task_id,
4601 2 : format!(
4602 2 : "{}",
4603 2 : task_mgr::current_task_id().expect("we run inside a task_mgr task")
4604 2 : )
4605 2 : );
4606 2 : let $st = st;
4607 2 : };
4608 2 : }
4609 2 :
4610 2 : {
4611 2 : lock_status!(st);
4612 2 : st.total_layer_count = total_layer_count as u64;
4613 2 : }
4614 2 :
4615 2 : let mut remaining = remaining.into_iter();
4616 2 : let mut have_remaining = true;
4617 2 : let mut js = tokio::task::JoinSet::new();
4618 2 :
4619 2 : let cancel = task_mgr::shutdown_token();
4620 2 :
4621 2 : let limit = request.max_concurrent_downloads;
4622 :
4623 : loop {
4624 12 : while js.len() < limit.get() && have_remaining && !cancel.is_cancelled() {
4625 12 : let Some(next) = remaining.next() else {
4626 2 : have_remaining = false;
4627 2 : break;
4628 : };
4629 :
4630 10 : let span = tracing::info_span!("download", layer = %next);
4631 :
4632 10 : js.spawn(
4633 10 : async move {
4634 26 : let res = next.download().await;
4635 10 : (next, res)
4636 10 : }
4637 10 : .instrument(span),
4638 10 : );
4639 : }
4640 :
4641 12 : while let Some(res) = js.join_next().await {
4642 10 : match res {
4643 : Ok((_, Ok(_))) => {
4644 5 : lock_status!(st);
4645 5 : st.successful_download_count += 1;
4646 : }
4647 5 : Ok((layer, Err(e))) => {
4648 5 : tracing::error!(%layer, "download failed: {e:#}");
4649 5 : lock_status!(st);
4650 5 : st.failed_download_count += 1;
4651 : }
4652 0 : Err(je) if je.is_cancelled() => unreachable!("not used here"),
4653 0 : Err(je) if je.is_panic() => {
4654 0 : lock_status!(st);
4655 0 : st.failed_download_count += 1;
4656 : }
4657 0 : Err(je) => tracing::warn!("unknown joinerror: {je:?}"),
4658 : }
4659 : }
4660 :
4661 2 : if js.is_empty() && (!have_remaining || cancel.is_cancelled()) {
4662 2 : break;
4663 0 : }
4664 : }
4665 :
4666 : {
4667 2 : lock_status!(st);
4668 2 : st.state = DownloadRemoteLayersTaskState::Completed;
4669 2 : }
4670 2 : }
4671 :
4672 55 : pub(crate) fn get_download_all_remote_layers_task_info(
4673 55 : &self,
4674 55 : ) -> Option<DownloadRemoteLayersTaskInfo> {
4675 55 : self.download_all_remote_layers_task_info
4676 55 : .read()
4677 55 : .unwrap()
4678 55 : .clone()
4679 55 : }
4680 : }
4681 :
4682 : impl Timeline {
4683 : /// Returns non-remote layers for eviction.
4684 35 : pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
4685 35 : let guard = self.layers.read().await;
4686 35 : let mut max_layer_size: Option<u64> = None;
4687 :
4688 35 : let resident_layers = guard
4689 35 : .resident_layers()
4690 524 : .map(|layer| {
4691 524 : let file_size = layer.layer_desc().file_size;
4692 524 : max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
4693 524 :
4694 524 : let last_activity_ts = layer.access_stats().latest_activity_or_now();
4695 524 :
4696 524 : EvictionCandidate {
4697 524 : layer: layer.into(),
4698 524 : last_activity_ts,
4699 524 : relative_last_activity: finite_f32::FiniteF32::ZERO,
4700 524 : }
4701 524 : })
4702 35 : .collect()
4703 0 : .await;
4704 :
4705 35 : DiskUsageEvictionInfo {
4706 35 : max_layer_size,
4707 35 : resident_layers,
4708 35 : }
4709 35 : }
4710 :
4711 22735 : pub(crate) fn get_shard_index(&self) -> ShardIndex {
4712 22735 : ShardIndex {
4713 22735 : shard_number: self.tenant_shard_id.shard_number,
4714 22735 : shard_count: self.tenant_shard_id.shard_count,
4715 22735 : }
4716 22735 : }
4717 : }
4718 :
4719 : type TraversalPathItem = (
4720 : ValueReconstructResult,
4721 : Lsn,
4722 : Box<dyn Send + FnOnce() -> TraversalId>,
4723 : );
4724 :
4725 : /// Helper function for get_reconstruct_data() to add the path of layers traversed
4726 : /// to an error, as anyhow context information.
4727 1275 : fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructError {
4728 1275 : // We want the original 'msg' to be the outermost context. The outermost context
4729 1275 : // is the most high-level information, which also gets propagated to the client.
4730 1275 : let mut msg_iter = path
4731 1275 : .into_iter()
4732 1775 : .map(|(r, c, l)| {
4733 1775 : format!(
4734 1775 : "layer traversal: result {:?}, cont_lsn {}, layer: {}",
4735 1775 : r,
4736 1775 : c,
4737 1775 : l(),
4738 1775 : )
4739 1775 : })
4740 1275 : .chain(std::iter::once(msg));
4741 1275 : // Construct initial message from the first traversed layer
4742 1275 : let err = anyhow!(msg_iter.next().unwrap());
4743 1275 :
4744 1275 : // Append all subsequent traversals, and the error message 'msg', as contexts.
4745 1775 : let msg = msg_iter.fold(err, |err, msg| err.context(msg));
4746 1275 : PageReconstructError::from(msg)
4747 1275 : }
4748 :
4749 : /// Various functions to mutate the timeline.
4750 : // TODO Currently, Deref is used to allow easy access to read methods from this trait.
4751 : // This is probably considered a bad practice in Rust and should be fixed eventually,
4752 : // but will cause large code changes.
4753 : pub(crate) struct TimelineWriter<'a> {
4754 : tl: &'a Timeline,
4755 : _write_guard: tokio::sync::MutexGuard<'a, ()>,
4756 : }
4757 :
4758 : impl Deref for TimelineWriter<'_> {
4759 : type Target = Timeline;
4760 :
4761 730583 : fn deref(&self) -> &Self::Target {
4762 730583 : self.tl
4763 730583 : }
4764 : }
4765 :
4766 : impl<'a> TimelineWriter<'a> {
4767 : /// Put a new page version that can be constructed from a WAL record
4768 : ///
4769 : /// This will implicitly extend the relation, if the page is beyond the
4770 : /// current end-of-file.
4771 1214102 : pub(crate) async fn put(
4772 1214102 : &self,
4773 1214102 : key: Key,
4774 1214102 : lsn: Lsn,
4775 1214102 : value: &Value,
4776 1214102 : ctx: &RequestContext,
4777 1214102 : ) -> anyhow::Result<()> {
4778 1214102 : self.tl.put_value(key, lsn, value, ctx).await
4779 1214102 : }
4780 :
4781 1674304 : pub(crate) async fn put_batch(
4782 1674304 : &self,
4783 1674304 : batch: &HashMap<Key, Vec<(Lsn, Value)>>,
4784 1674304 : ctx: &RequestContext,
4785 1674304 : ) -> anyhow::Result<()> {
4786 1674304 : self.tl.put_values(batch, ctx).await
4787 1674303 : }
4788 :
4789 19261 : pub(crate) async fn delete_batch(&self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
4790 19261 : self.tl.put_tombstones(batch).await
4791 19261 : }
4792 :
4793 : /// Track the end of the latest digested WAL record.
4794 : /// Remember the (end of) last valid WAL record remembered in the timeline.
4795 : ///
4796 : /// Call this after you have finished writing all the WAL up to 'lsn'.
4797 : ///
4798 : /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for
4799 : /// the 'lsn' or anything older. The previous last record LSN is stored alongside
4800 : /// the latest and can be read.
4801 76004085 : pub(crate) fn finish_write(&self, new_lsn: Lsn) {
4802 76004085 : self.tl.finish_write(new_lsn);
4803 76004085 : }
4804 :
4805 640015 : pub(crate) fn update_current_logical_size(&self, delta: i64) {
4806 640015 : self.tl.update_current_logical_size(delta)
4807 640015 : }
4808 : }
4809 :
4810 : // We need TimelineWriter to be send in upcoming conversion of
4811 : // Timeline::layers to tokio::sync::RwLock.
4812 2 : #[test]
4813 2 : fn is_send() {
4814 2 : fn _assert_send<T: Send>() {}
4815 2 : _assert_send::<TimelineWriter<'_>>();
4816 2 : }
4817 :
4818 : /// Add a suffix to a layer file's name: .{num}.old
4819 : /// Uses the first available num (starts at 0)
4820 1 : fn rename_to_backup(path: &Utf8Path) -> anyhow::Result<()> {
4821 1 : let filename = path
4822 1 : .file_name()
4823 1 : .ok_or_else(|| anyhow!("Path {path} don't have a file name"))?;
4824 1 : let mut new_path = path.to_owned();
4825 :
4826 1 : for i in 0u32.. {
4827 1 : new_path.set_file_name(format!("{filename}.{i}.old"));
4828 1 : if !new_path.exists() {
4829 1 : std::fs::rename(path, &new_path)
4830 1 : .with_context(|| format!("rename {path:?} to {new_path:?}"))?;
4831 1 : return Ok(());
4832 0 : }
4833 : }
4834 :
4835 0 : bail!("couldn't find an unused backup number for {:?}", path)
4836 1 : }
4837 :
4838 : #[cfg(test)]
4839 : mod tests {
4840 : use utils::{id::TimelineId, lsn::Lsn};
4841 :
4842 : use crate::tenant::{
4843 : harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline,
4844 : };
4845 :
4846 2 : #[tokio::test]
4847 2 : async fn two_layer_eviction_attempts_at_the_same_time() {
4848 2 : let harness =
4849 2 : TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
4850 2 :
4851 2 : let ctx = any_context();
4852 2 : let tenant = harness.try_load(&ctx).await.unwrap();
4853 2 : let timeline = tenant
4854 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
4855 7 : .await
4856 2 : .unwrap();
4857 2 :
4858 2 : let layer = find_some_layer(&timeline).await;
4859 2 : let layer = layer
4860 2 : .keep_resident()
4861 2 : .await
4862 2 : .expect("no download => no downloading errors")
4863 2 : .expect("should had been resident")
4864 2 : .drop_eviction_guard();
4865 2 :
4866 2 : let first = async { layer.evict_and_wait().await };
4867 2 : let second = async { layer.evict_and_wait().await };
4868 2 :
4869 3 : let (first, second) = tokio::join!(first, second);
4870 2 :
4871 2 : let res = layer.keep_resident().await;
4872 2 : assert!(matches!(res, Ok(None)), "{res:?}");
4873 2 :
4874 2 : match (first, second) {
4875 2 : (Ok(()), Ok(())) => {
4876 0 : // because there are no more timeline locks being taken on eviction path, we can
4877 0 : // witness all three outcomes here.
4878 0 : }
4879 2 : (Ok(()), Err(EvictionError::NotFound)) | (Err(EvictionError::NotFound), Ok(())) => {
4880 2 : // if one completes before the other, this is fine just as well.
4881 2 : }
4882 2 : other => unreachable!("unexpected {:?}", other),
4883 2 : }
4884 2 : }
4885 :
4886 2 : fn any_context() -> crate::context::RequestContext {
4887 2 : use crate::context::*;
4888 2 : use crate::task_mgr::*;
4889 2 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
4890 2 : }
4891 :
4892 2 : async fn find_some_layer(timeline: &Timeline) -> Layer {
4893 2 : let layers = timeline.layers.read().await;
4894 2 : let desc = layers
4895 2 : .layer_map()
4896 2 : .iter_historic_layers()
4897 2 : .next()
4898 2 : .expect("must find one layer to evict");
4899 2 :
4900 2 : layers.get_from_desc(&desc)
4901 2 : }
4902 : }
|