Line data Source code
1 : pub mod delete;
2 : mod eviction_task;
3 : mod init;
4 : pub mod layer_manager;
5 : pub(crate) mod logical_size;
6 : pub mod span;
7 : pub mod uninit;
8 : mod walreceiver;
9 :
10 : use anyhow::{anyhow, bail, ensure, Context, Result};
11 : use bytes::Bytes;
12 : use camino::{Utf8Path, Utf8PathBuf};
13 : use enumset::EnumSet;
14 : use fail::fail_point;
15 : use futures::stream::StreamExt;
16 : use itertools::Itertools;
17 : use pageserver_api::{
18 : keyspace::{key_range_size, KeySpaceAccum},
19 : models::{
20 : DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
21 : LayerMapInfo, TimelineState,
22 : },
23 : reltag::BlockNumber,
24 : shard::{ShardIdentity, TenantShardId},
25 : };
26 : use rand::Rng;
27 : use serde_with::serde_as;
28 : use storage_broker::BrokerClientChannel;
29 : use tokio::{
30 : runtime::Handle,
31 : sync::{oneshot, watch},
32 : };
33 : use tokio_util::sync::CancellationToken;
34 : use tracing::*;
35 : use utils::sync::gate::Gate;
36 :
37 : use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet};
38 : use std::ops::{Deref, Range};
39 : use std::pin::pin;
40 : use std::sync::atomic::Ordering as AtomicOrdering;
41 : use std::sync::{Arc, Mutex, RwLock, Weak};
42 : use std::time::{Duration, Instant, SystemTime};
43 : use std::{
44 : cmp::{max, min, Ordering},
45 : ops::ControlFlow,
46 : };
47 :
48 : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
49 : use crate::tenant::{
50 : layer_map::{LayerMap, SearchResult},
51 : metadata::{save_metadata, TimelineMetadata},
52 : par_fsync,
53 : };
54 : use crate::{
55 : context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder},
56 : disk_usage_eviction_task::DiskUsageEvictionInfo,
57 : };
58 : use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
59 : use crate::{
60 : disk_usage_eviction_task::finite_f32,
61 : tenant::storage_layer::{
62 : AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
63 : LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
64 : ValueReconstructState,
65 : },
66 : };
67 : use crate::{
68 : disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
69 : };
70 : use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
71 :
72 : use crate::config::PageServerConf;
73 : use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
74 : use crate::metrics::{
75 : TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
76 : };
77 : use crate::pgdatadir_mapping::CalculateLogicalSizeError;
78 : use crate::tenant::config::TenantConfOpt;
79 : use pageserver_api::key::{is_inherited_key, is_rel_fsm_block_key, is_rel_vm_block_key};
80 : use pageserver_api::reltag::RelTag;
81 : use pageserver_api::shard::ShardIndex;
82 :
83 : use postgres_connection::PgConnectionConfig;
84 : use postgres_ffi::to_pg_timestamp;
85 : use utils::{
86 : completion,
87 : generation::Generation,
88 : id::TimelineId,
89 : lsn::{AtomicLsn, Lsn, RecordLsn},
90 : seqwait::SeqWait,
91 : simple_rcu::{Rcu, RcuReadGuard},
92 : };
93 :
94 : use crate::page_cache;
95 : use crate::repository::GcResult;
96 : use crate::repository::{Key, Value};
97 : use crate::task_mgr;
98 : use crate::task_mgr::TaskKind;
99 : use crate::ZERO_PAGE;
100 :
101 : use self::delete::DeleteTimelineFlow;
102 : pub(super) use self::eviction_task::EvictionTaskTenantState;
103 : use self::eviction_task::EvictionTaskTimelineState;
104 : use self::layer_manager::LayerManager;
105 : use self::logical_size::LogicalSize;
106 : use self::walreceiver::{WalReceiver, WalReceiverConf};
107 :
108 : use super::config::TenantConf;
109 : use super::remote_timeline_client::index::IndexPart;
110 : use super::remote_timeline_client::RemoteTimelineClient;
111 : use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
112 : use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
113 :
114 45 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
115 : pub(super) enum FlushLoopState {
116 : NotStarted,
117 : Running {
118 : #[cfg(test)]
119 : expect_initdb_optimization: bool,
120 : #[cfg(test)]
121 : initdb_optimization_count: usize,
122 : },
123 : Exited,
124 : }
125 :
126 : /// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
127 0 : #[derive(Debug, Clone, PartialEq, Eq)]
128 : pub(crate) struct Hole {
129 : key_range: Range<Key>,
130 : coverage_size: usize,
131 : }
132 :
133 : impl Ord for Hole {
134 274 : fn cmp(&self, other: &Self) -> Ordering {
135 274 : other.coverage_size.cmp(&self.coverage_size) // inverse order
136 274 : }
137 : }
138 :
139 : impl PartialOrd for Hole {
140 274 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
141 274 : Some(self.cmp(other))
142 274 : }
143 : }
144 :
145 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
146 : /// Can be removed after all refactors are done.
147 295 : fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
148 295 : drop(rlock)
149 295 : }
150 :
151 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
152 : /// Can be removed after all refactors are done.
153 2016 : fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
154 2016 : drop(rlock)
155 2016 : }
156 :
157 : /// The outward-facing resources required to build a Timeline
158 : pub struct TimelineResources {
159 : pub remote_client: Option<RemoteTimelineClient>,
160 : pub deletion_queue_client: DeletionQueueClient,
161 : }
162 :
163 : pub struct Timeline {
164 : conf: &'static PageServerConf,
165 : tenant_conf: Arc<RwLock<AttachedTenantConf>>,
166 :
167 : myself: Weak<Self>,
168 :
169 : pub(crate) tenant_shard_id: TenantShardId,
170 : pub timeline_id: TimelineId,
171 :
172 : /// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
173 : /// Never changes for the lifetime of this [`Timeline`] object.
174 : ///
175 : /// This duplicates the generation stored in LocationConf, but that structure is mutable:
176 : /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
177 : pub(crate) generation: Generation,
178 :
179 : /// The detailed sharding information from our parent Tenant. This enables us to map keys
180 : /// to shards, and is constant through the lifetime of this Timeline.
181 : shard_identity: ShardIdentity,
182 :
183 : pub pg_version: u32,
184 :
185 : /// The tuple has two elements.
186 : /// 1. `LayerFileManager` keeps track of the various physical representations of the layer files (inmem, local, remote).
187 : /// 2. `LayerMap`, the acceleration data structure for `get_reconstruct_data`.
188 : ///
189 : /// `LayerMap` maps out the `(PAGE,LSN) / (KEY,LSN)` space, which is composed of `(KeyRange, LsnRange)` rectangles.
190 : /// We describe these rectangles through the `PersistentLayerDesc` struct.
191 : ///
192 : /// When we want to reconstruct a page, we first find the `PersistentLayerDesc`'s that we need for page reconstruction,
193 : /// using `LayerMap`. Then, we use `LayerFileManager` to get the `PersistentLayer`'s that correspond to the
194 : /// `PersistentLayerDesc`'s.
195 : ///
196 : /// Hence, it's important to keep things coherent. The `LayerFileManager` must always have an entry for all
197 : /// `PersistentLayerDesc`'s in the `LayerMap`. If it doesn't, `LayerFileManager::get_from_desc` will panic at
198 : /// runtime, e.g., during page reconstruction.
199 : ///
200 : /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
201 : /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
202 : pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
203 :
204 : /// Set of key ranges which should be covered by image layers to
205 : /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
206 : /// It is used by compaction task when it checks if new image layer should be created.
207 : /// Newly created image layer doesn't help to remove the delta layer, until the
208 : /// newly created image layer falls off the PITR horizon. So on next GC cycle,
209 : /// gc_timeline may still want the new image layer to be created. To avoid redundant
210 : /// image layers creation we should check if image layer exists but beyond PITR horizon.
211 : /// This is why we need remember GC cutoff LSN.
212 : ///
213 : wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
214 :
215 : last_freeze_at: AtomicLsn,
216 : // Atomic would be more appropriate here.
217 : last_freeze_ts: RwLock<Instant>,
218 :
219 : // WAL redo manager. `None` only for broken tenants.
220 : walredo_mgr: Option<Arc<super::WalRedoManager>>,
221 :
222 : /// Remote storage client.
223 : /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
224 : pub remote_client: Option<Arc<RemoteTimelineClient>>,
225 :
226 : // What page versions do we hold in the repository? If we get a
227 : // request > last_record_lsn, we need to wait until we receive all
228 : // the WAL up to the request. The SeqWait provides functions for
229 : // that. TODO: If we get a request for an old LSN, such that the
230 : // versions have already been garbage collected away, we should
231 : // throw an error, but we don't track that currently.
232 : //
233 : // last_record_lsn.load().last points to the end of last processed WAL record.
234 : //
235 : // We also remember the starting point of the previous record in
236 : // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
237 : // first WAL record when the node is started up. But here, we just
238 : // keep track of it.
239 : last_record_lsn: SeqWait<RecordLsn, Lsn>,
240 :
241 : // All WAL records have been processed and stored durably on files on
242 : // local disk, up to this LSN. On crash and restart, we need to re-process
243 : // the WAL starting from this point.
244 : //
245 : // Some later WAL records might have been processed and also flushed to disk
246 : // already, so don't be surprised to see some, but there's no guarantee on
247 : // them yet.
248 : disk_consistent_lsn: AtomicLsn,
249 :
250 : // Parent timeline that this timeline was branched from, and the LSN
251 : // of the branch point.
252 : ancestor_timeline: Option<Arc<Timeline>>,
253 : ancestor_lsn: Lsn,
254 :
255 : pub(super) metrics: TimelineMetrics,
256 :
257 : // `Timeline` doesn't write these metrics itself, but it manages the lifetime. Code
258 : // in `crate::page_service` writes these metrics.
259 : pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
260 :
261 : /// Ensures layers aren't frozen by checkpointer between
262 : /// [`Timeline::get_layer_for_write`] and layer reads.
263 : /// Locked automatically by [`TimelineWriter`] and checkpointer.
264 : /// Must always be acquired before the layer map/individual layer lock
265 : /// to avoid deadlock.
266 : write_lock: tokio::sync::Mutex<()>,
267 :
268 : /// Used to avoid multiple `flush_loop` tasks running
269 : pub(super) flush_loop_state: Mutex<FlushLoopState>,
270 :
271 : /// layer_flush_start_tx can be used to wake up the layer-flushing task.
272 : /// The value is a counter, incremented every time a new flush cycle is requested.
273 : /// The flush cycle counter is sent back on the layer_flush_done channel when
274 : /// the flush finishes. You can use that to wait for the flush to finish.
275 : layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
276 : /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
277 : layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
278 :
279 : // Needed to ensure that we can't create a branch at a point that was already garbage collected
280 : pub latest_gc_cutoff_lsn: Rcu<Lsn>,
281 :
282 : // List of child timelines and their branch points. This is needed to avoid
283 : // garbage collecting data that is still needed by the child timelines.
284 : pub gc_info: std::sync::RwLock<GcInfo>,
285 :
286 : // It may change across major versions so for simplicity
287 : // keep it after running initdb for a timeline.
288 : // It is needed in checks when we want to error on some operations
289 : // when they are requested for pre-initdb lsn.
290 : // It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn",
291 : // though let's keep them both for better error visibility.
292 : pub initdb_lsn: Lsn,
293 :
294 : /// When did we last calculate the partitioning?
295 : partitioning: Mutex<(KeyPartitioning, Lsn)>,
296 :
297 : /// Configuration: how often should the partitioning be recalculated.
298 : repartition_threshold: u64,
299 :
300 : /// Current logical size of the "datadir", at the last LSN.
301 : current_logical_size: LogicalSize,
302 :
303 : /// Information about the last processed message by the WAL receiver,
304 : /// or None if WAL receiver has not received anything for this timeline
305 : /// yet.
306 : pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
307 : pub walreceiver: Mutex<Option<WalReceiver>>,
308 :
309 : /// Relation size cache
310 : pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
311 :
312 : download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
313 :
314 : state: watch::Sender<TimelineState>,
315 :
316 : /// Prevent two tasks from deleting the timeline at the same time. If held, the
317 : /// timeline is being deleted. If 'true', the timeline has already been deleted.
318 : pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
319 :
320 : eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
321 :
322 : /// Load or creation time information about the disk_consistent_lsn and when the loading
323 : /// happened. Used for consumption metrics.
324 : pub(crate) loaded_at: (Lsn, SystemTime),
325 :
326 : /// Gate to prevent shutdown completing while I/O is still happening to this timeline's data
327 : pub(crate) gate: Gate,
328 :
329 : /// Cancellation token scoped to this timeline: anything doing long-running work relating
330 : /// to the timeline should drop out when this token fires.
331 : pub(crate) cancel: CancellationToken,
332 :
333 : /// Make sure we only have one running compaction at a time in tests.
334 : ///
335 : /// Must only be taken in two places:
336 : /// - [`Timeline::compact`] (this file)
337 : /// - [`delete::delete_local_layer_files`]
338 : ///
339 : /// Timeline deletion will acquire both compaction and gc locks in whatever order.
340 : compaction_lock: tokio::sync::Mutex<()>,
341 :
342 : /// Make sure we only have one running gc at a time.
343 : ///
344 : /// Must only be taken in two places:
345 : /// - [`Timeline::gc`] (this file)
346 : /// - [`delete::delete_local_layer_files`]
347 : ///
348 : /// Timeline deletion will acquire both compaction and gc locks in whatever order.
349 : gc_lock: tokio::sync::Mutex<()>,
350 : }
351 :
352 : pub struct WalReceiverInfo {
353 : pub wal_source_connconf: PgConnectionConfig,
354 : pub last_received_msg_lsn: Lsn,
355 : pub last_received_msg_ts: u128,
356 : }
357 :
358 : ///
359 : /// Information about how much history needs to be retained, needed by
360 : /// Garbage Collection.
361 : ///
362 : pub struct GcInfo {
363 : /// Specific LSNs that are needed.
364 : ///
365 : /// Currently, this includes all points where child branches have
366 : /// been forked off from. In the future, could also include
367 : /// explicit user-defined snapshot points.
368 : pub retain_lsns: Vec<Lsn>,
369 :
370 : /// In addition to 'retain_lsns', keep everything newer than this
371 : /// point.
372 : ///
373 : /// This is calculated by subtracting 'gc_horizon' setting from
374 : /// last-record LSN
375 : ///
376 : /// FIXME: is this inclusive or exclusive?
377 : pub horizon_cutoff: Lsn,
378 :
379 : /// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this
380 : /// point.
381 : ///
382 : /// This is calculated by finding a number such that a record is needed for PITR
383 : /// if only if its LSN is larger than 'pitr_cutoff'.
384 : pub pitr_cutoff: Lsn,
385 : }
386 :
387 : /// An error happened in a get() operation.
388 76 : #[derive(thiserror::Error, Debug)]
389 : pub(crate) enum PageReconstructError {
390 : #[error(transparent)]
391 : Other(#[from] anyhow::Error),
392 :
393 : #[error("Ancestor LSN wait error: {0}")]
394 : AncestorLsnTimeout(#[from] WaitLsnError),
395 :
396 : #[error("timeline shutting down")]
397 : Cancelled,
398 :
399 : /// The ancestor of this is being stopped
400 : #[error("ancestor timeline {0} is being stopped")]
401 : AncestorStopping(TimelineId),
402 :
403 : /// An error happened replaying WAL records
404 : #[error(transparent)]
405 : WalRedo(anyhow::Error),
406 : }
407 :
408 : impl PageReconstructError {
409 : /// Returns true if this error indicates a tenant/timeline shutdown alike situation
410 0 : pub(crate) fn is_stopping(&self) -> bool {
411 0 : use PageReconstructError::*;
412 0 : match self {
413 0 : Other(_) => false,
414 0 : AncestorLsnTimeout(_) => false,
415 0 : Cancelled | AncestorStopping(_) => true,
416 0 : WalRedo(_) => false,
417 : }
418 0 : }
419 : }
420 :
421 0 : #[derive(thiserror::Error, Debug)]
422 : enum CreateImageLayersError {
423 : #[error("timeline shutting down")]
424 : Cancelled,
425 :
426 : #[error(transparent)]
427 : GetVectoredError(GetVectoredError),
428 :
429 : #[error(transparent)]
430 : PageReconstructError(PageReconstructError),
431 :
432 : #[error(transparent)]
433 : Other(#[from] anyhow::Error),
434 : }
435 :
436 0 : #[derive(thiserror::Error, Debug)]
437 : enum FlushLayerError {
438 : /// Timeline cancellation token was cancelled
439 : #[error("timeline shutting down")]
440 : Cancelled,
441 :
442 : #[error(transparent)]
443 : CreateImageLayersError(CreateImageLayersError),
444 :
445 : #[error(transparent)]
446 : Other(#[from] anyhow::Error),
447 : }
448 :
449 0 : #[derive(thiserror::Error, Debug)]
450 : pub(crate) enum GetVectoredError {
451 : #[error("timeline shutting down")]
452 : Cancelled,
453 :
454 : #[error("Requested too many keys: {0} > {}", Timeline::MAX_GET_VECTORED_KEYS)]
455 : Oversized(u64),
456 :
457 : #[error("Requested at invalid LSN: {0}")]
458 : InvalidLsn(Lsn),
459 : }
460 :
461 0 : #[derive(thiserror::Error, Debug)]
462 : pub(crate) enum GetReadyAncestorError {
463 : #[error("ancestor timeline {0} is being stopped")]
464 : AncestorStopping(TimelineId),
465 :
466 : #[error("Ancestor LSN wait error: {0}")]
467 : AncestorLsnTimeout(#[from] WaitLsnError),
468 :
469 : #[error("Cancelled")]
470 : Cancelled,
471 :
472 : #[error(transparent)]
473 : Other(#[from] anyhow::Error),
474 : }
475 :
476 0 : #[derive(Clone, Copy)]
477 : pub enum LogicalSizeCalculationCause {
478 : Initial,
479 : ConsumptionMetricsSyntheticSize,
480 : EvictionTaskImitation,
481 : TenantSizeHandler,
482 : }
483 :
484 : pub enum GetLogicalSizePriority {
485 : User,
486 : Background,
487 : }
488 :
489 850 : #[derive(enumset::EnumSetType)]
490 : pub(crate) enum CompactFlags {
491 : ForceRepartition,
492 : }
493 :
494 : impl std::fmt::Debug for Timeline {
495 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
496 0 : write!(f, "Timeline<{}>", self.timeline_id)
497 0 : }
498 : }
499 :
500 48 : #[derive(thiserror::Error, Debug)]
501 : pub(crate) enum WaitLsnError {
502 : // Called on a timeline which is shutting down
503 : #[error("Shutdown")]
504 : Shutdown,
505 :
506 : // Called on an timeline not in active state or shutting down
507 : #[error("Bad state (not active)")]
508 : BadState,
509 :
510 : // Timeout expired while waiting for LSN to catch up with goal.
511 : #[error("{0}")]
512 : Timeout(String),
513 : }
514 :
515 : // The impls below achieve cancellation mapping for errors.
516 : // Perhaps there's a way of achieving this with less cruft.
517 :
518 : impl From<CreateImageLayersError> for CompactionError {
519 0 : fn from(e: CreateImageLayersError) -> Self {
520 0 : match e {
521 0 : CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
522 0 : _ => CompactionError::Other(e.into()),
523 : }
524 0 : }
525 : }
526 :
527 : impl From<CreateImageLayersError> for FlushLayerError {
528 0 : fn from(e: CreateImageLayersError) -> Self {
529 0 : match e {
530 0 : CreateImageLayersError::Cancelled => FlushLayerError::Cancelled,
531 0 : any => FlushLayerError::CreateImageLayersError(any),
532 : }
533 0 : }
534 : }
535 :
536 : impl From<PageReconstructError> for CreateImageLayersError {
537 0 : fn from(e: PageReconstructError) -> Self {
538 0 : match e {
539 0 : PageReconstructError::Cancelled => CreateImageLayersError::Cancelled,
540 0 : _ => CreateImageLayersError::PageReconstructError(e),
541 : }
542 0 : }
543 : }
544 :
545 : impl From<GetVectoredError> for CreateImageLayersError {
546 0 : fn from(e: GetVectoredError) -> Self {
547 0 : match e {
548 0 : GetVectoredError::Cancelled => CreateImageLayersError::Cancelled,
549 0 : _ => CreateImageLayersError::GetVectoredError(e),
550 : }
551 0 : }
552 : }
553 :
554 : impl From<GetReadyAncestorError> for PageReconstructError {
555 4 : fn from(e: GetReadyAncestorError) -> Self {
556 4 : use GetReadyAncestorError::*;
557 4 : match e {
558 2 : AncestorStopping(tid) => PageReconstructError::AncestorStopping(tid),
559 0 : AncestorLsnTimeout(wait_err) => PageReconstructError::AncestorLsnTimeout(wait_err),
560 0 : Cancelled => PageReconstructError::Cancelled,
561 2 : Other(other) => PageReconstructError::Other(other),
562 : }
563 4 : }
564 : }
565 :
566 : /// Public interface functions
567 : impl Timeline {
568 : /// Get the LSN where this branch was created
569 3669 : pub(crate) fn get_ancestor_lsn(&self) -> Lsn {
570 3669 : self.ancestor_lsn
571 3669 : }
572 :
573 : /// Get the ancestor's timeline id
574 5252 : pub(crate) fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
575 5252 : self.ancestor_timeline
576 5252 : .as_ref()
577 5252 : .map(|ancestor| ancestor.timeline_id)
578 5252 : }
579 :
580 : /// Lock and get timeline's GC cutoff
581 4514450 : pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
582 4514450 : self.latest_gc_cutoff_lsn.read()
583 4514450 : }
584 :
585 : /// Look up given page version.
586 : ///
587 : /// If a remote layer file is needed, it is downloaded as part of this
588 : /// call.
589 : ///
590 : /// NOTE: It is considered an error to 'get' a key that doesn't exist. The
591 : /// abstraction above this needs to store suitable metadata to track what
592 : /// data exists with what keys, in separate metadata entries. If a
593 : /// non-existent key is requested, we may incorrectly return a value from
594 : /// an ancestor branch, for example, or waste a lot of cycles chasing the
595 : /// non-existing key.
596 : ///
597 : /// # Cancel-Safety
598 : ///
599 : /// This method is cancellation-safe.
600 7427636 : pub(crate) async fn get(
601 7427636 : &self,
602 7427636 : key: Key,
603 7427636 : lsn: Lsn,
604 7427636 : ctx: &RequestContext,
605 7427661 : ) -> Result<Bytes, PageReconstructError> {
606 7427661 : if !lsn.is_valid() {
607 1 : return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
608 7427660 : }
609 :
610 : // This check is debug-only because of the cost of hashing, and because it's a double-check: we
611 : // already checked the key against the shard_identity when looking up the Timeline from
612 : // page_service.
613 7427660 : debug_assert!(!self.shard_identity.is_key_disposable(&key));
614 :
615 : // XXX: structured stats collection for layer eviction here.
616 0 : trace!(
617 0 : "get page request for {}@{} from task kind {:?}",
618 0 : key,
619 0 : lsn,
620 0 : ctx.task_kind()
621 0 : );
622 :
623 : // Check the page cache. We will get back the most recent page with lsn <= `lsn`.
624 : // The cached image can be returned directly if there is no WAL between the cached image
625 : // and requested LSN. The cached image can also be used to reduce the amount of WAL needed
626 : // for redo.
627 7427660 : let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
628 1797170 : Some((cached_lsn, cached_img)) => {
629 1797170 : match cached_lsn.cmp(&lsn) {
630 1720526 : Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
631 : Ordering::Equal => {
632 76644 : MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
633 76644 : return Ok(cached_img); // exact LSN match, return the image
634 : }
635 : Ordering::Greater => {
636 0 : unreachable!("the returned lsn should never be after the requested lsn")
637 : }
638 : }
639 1720526 : Some((cached_lsn, cached_img))
640 : }
641 5630490 : None => None,
642 : };
643 :
644 7351016 : let mut reconstruct_state = ValueReconstructState {
645 7351016 : records: Vec::new(),
646 7351016 : img: cached_page_img,
647 7351016 : };
648 7351016 :
649 7351016 : let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
650 7351016 : let path = self
651 7351016 : .get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
652 2225518 : .await?;
653 7349680 : timer.stop_and_record();
654 7349680 :
655 7349680 : let start = Instant::now();
656 7349680 : let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
657 7349680 : let elapsed = start.elapsed();
658 7349680 : crate::metrics::RECONSTRUCT_TIME
659 7349680 : .for_result(&res)
660 7349680 : .observe(elapsed.as_secs_f64());
661 7349680 :
662 7349680 : if cfg!(feature = "testing") && res.is_err() {
663 : // it can only be walredo issue
664 : use std::fmt::Write;
665 :
666 0 : let mut msg = String::new();
667 0 :
668 0 : path.into_iter().for_each(|(res, cont_lsn, layer)| {
669 0 : writeln!(
670 0 : msg,
671 0 : "- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
672 0 : layer(),
673 0 : )
674 0 : .expect("string grows")
675 0 : });
676 :
677 : // this is to rule out or provide evidence that we could in some cases read a duplicate
678 : // walrecord
679 0 : tracing::info!("walredo failed, path:\n{msg}");
680 7349680 : }
681 :
682 7349680 : res
683 7427653 : }
684 :
685 : pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
686 :
687 : /// Look up multiple page versions at a given LSN
688 : ///
689 : /// This naive implementation will be replaced with a more efficient one
690 : /// which actually vectorizes the read path.
691 78952 : pub(crate) async fn get_vectored(
692 78952 : &self,
693 78952 : key_ranges: &[Range<Key>],
694 78952 : lsn: Lsn,
695 78952 : ctx: &RequestContext,
696 78952 : ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
697 78952 : if !lsn.is_valid() {
698 0 : return Err(GetVectoredError::InvalidLsn(lsn));
699 78952 : }
700 78952 :
701 78952 : let key_count = key_ranges
702 78952 : .iter()
703 80151 : .map(|range| key_range_size(range) as u64)
704 78952 : .sum();
705 78952 : if key_count > Timeline::MAX_GET_VECTORED_KEYS {
706 0 : return Err(GetVectoredError::Oversized(key_count));
707 78952 : }
708 78952 :
709 78952 : let _timer = crate::metrics::GET_VECTORED_LATENCY
710 78952 : .for_task_kind(ctx.task_kind())
711 78952 : .map(|t| t.start_timer());
712 78952 :
713 78952 : let mut values = BTreeMap::new();
714 159103 : for range in key_ranges {
715 80151 : let mut key = range.start;
716 312439 : while key != range.end {
717 232288 : assert!(!self.shard_identity.is_key_disposable(&key));
718 :
719 232288 : let block = self.get(key, lsn, ctx).await;
720 :
721 : if matches!(
722 0 : block,
723 : Err(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_))
724 : ) {
725 0 : return Err(GetVectoredError::Cancelled);
726 232288 : }
727 232288 :
728 232288 : values.insert(key, block);
729 232288 : key = key.next();
730 : }
731 : }
732 :
733 78952 : Ok(values)
734 78952 : }
735 :
736 : /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
737 9570130 : pub(crate) fn get_last_record_lsn(&self) -> Lsn {
738 9570130 : self.last_record_lsn.load().last
739 9570130 : }
740 :
741 3090 : pub(crate) fn get_prev_record_lsn(&self) -> Lsn {
742 3090 : self.last_record_lsn.load().prev
743 3090 : }
744 :
745 : /// Atomically get both last and prev.
746 1080 : pub(crate) fn get_last_record_rlsn(&self) -> RecordLsn {
747 1080 : self.last_record_lsn.load()
748 1080 : }
749 :
750 798339 : pub(crate) fn get_disk_consistent_lsn(&self) -> Lsn {
751 798339 : self.disk_consistent_lsn.load()
752 798339 : }
753 :
754 : /// remote_consistent_lsn from the perspective of the tenant's current generation,
755 : /// not validated with control plane yet.
756 : /// See [`Self::get_remote_consistent_lsn_visible`].
757 3090 : pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
758 3090 : if let Some(remote_client) = &self.remote_client {
759 3090 : remote_client.remote_consistent_lsn_projected()
760 : } else {
761 0 : None
762 : }
763 3090 : }
764 :
765 : /// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
766 : /// i.e. a value of remote_consistent_lsn_projected which has undergone
767 : /// generation validation in the deletion queue.
768 795952 : pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
769 795952 : if let Some(remote_client) = &self.remote_client {
770 795952 : remote_client.remote_consistent_lsn_visible()
771 : } else {
772 0 : None
773 : }
774 795952 : }
775 :
776 : /// The sum of the file size of all historic layers in the layer map.
777 : /// This method makes no distinction between local and remote layers.
778 : /// Hence, the result **does not represent local filesystem usage**.
779 3570 : pub(crate) async fn layer_size_sum(&self) -> u64 {
780 3570 : let guard = self.layers.read().await;
781 3570 : let layer_map = guard.layer_map();
782 3570 : let mut size = 0;
783 327026 : for l in layer_map.iter_historic_layers() {
784 327026 : size += l.file_size();
785 327026 : }
786 3570 : size
787 3570 : }
788 :
789 12 : pub(crate) fn resident_physical_size(&self) -> u64 {
790 12 : self.metrics.resident_physical_size_get()
791 12 : }
792 :
793 : ///
794 : /// Wait until WAL has been received and processed up to this LSN.
795 : ///
796 : /// You should call this before any of the other get_* or list_* functions. Calling
797 : /// those functions with an LSN that has been processed yet is an error.
798 : ///
799 1527253 : pub(crate) async fn wait_lsn(
800 1527253 : &self,
801 1527253 : lsn: Lsn,
802 1527253 : _ctx: &RequestContext, /* Prepare for use by cancellation */
803 1527255 : ) -> Result<(), WaitLsnError> {
804 1527255 : if self.cancel.is_cancelled() {
805 0 : return Err(WaitLsnError::Shutdown);
806 1527255 : } else if !self.is_active() {
807 0 : return Err(WaitLsnError::BadState);
808 1527255 : }
809 :
810 : // This should never be called from the WAL receiver, because that could lead
811 : // to a deadlock.
812 : debug_assert!(
813 1527255 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
814 0 : "wait_lsn cannot be called in WAL receiver"
815 : );
816 : debug_assert!(
817 1527255 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
818 0 : "wait_lsn cannot be called in WAL receiver"
819 : );
820 : debug_assert!(
821 1527255 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
822 0 : "wait_lsn cannot be called in WAL receiver"
823 : );
824 :
825 1527255 : let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
826 1527255 :
827 1527255 : match self
828 1527255 : .last_record_lsn
829 1527255 : .wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
830 110510 : .await
831 : {
832 1527231 : Ok(()) => Ok(()),
833 24 : Err(e) => {
834 24 : use utils::seqwait::SeqWaitError::*;
835 24 : match e {
836 0 : Shutdown => Err(WaitLsnError::Shutdown),
837 : Timeout => {
838 : // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
839 24 : drop(_timer);
840 24 : let walreceiver_status = self.walreceiver_status();
841 24 : Err(WaitLsnError::Timeout(format!(
842 24 : "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
843 24 : lsn,
844 24 : self.get_last_record_lsn(),
845 24 : self.get_disk_consistent_lsn(),
846 24 : walreceiver_status,
847 24 : )))
848 : }
849 : }
850 : }
851 : }
852 1527255 : }
853 :
854 3114 : pub(crate) fn walreceiver_status(&self) -> String {
855 3114 : match &*self.walreceiver.lock().unwrap() {
856 107 : None => "stopping or stopped".to_string(),
857 3007 : Some(walreceiver) => match walreceiver.status() {
858 1865 : Some(status) => status.to_human_readable_string(),
859 1142 : None => "Not active".to_string(),
860 : },
861 : }
862 3114 : }
863 :
864 : /// Check that it is valid to request operations with that lsn.
865 678 : pub(crate) fn check_lsn_is_in_scope(
866 678 : &self,
867 678 : lsn: Lsn,
868 678 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
869 678 : ) -> anyhow::Result<()> {
870 678 : ensure!(
871 678 : lsn >= **latest_gc_cutoff_lsn,
872 16 : "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
873 16 : lsn,
874 16 : **latest_gc_cutoff_lsn,
875 : );
876 662 : Ok(())
877 678 : }
878 :
879 : /// Flush to disk all data that was written with the put_* functions
880 3904 : #[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
881 : pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
882 : self.freeze_inmem_layer(false).await;
883 : self.flush_frozen_layers_and_wait().await
884 : }
885 :
886 : /// Outermost timeline compaction operation; downloads needed layers.
887 1644 : pub(crate) async fn compact(
888 1644 : self: &Arc<Self>,
889 1644 : cancel: &CancellationToken,
890 1644 : flags: EnumSet<CompactFlags>,
891 1644 : ctx: &RequestContext,
892 1644 : ) -> Result<(), CompactionError> {
893 1644 : // most likely the cancellation token is from background task, but in tests it could be the
894 1644 : // request task as well.
895 1644 :
896 1644 : let prepare = async move {
897 1644 : let guard = self.compaction_lock.lock().await;
898 :
899 1644 : let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
900 1644 : BackgroundLoopKind::Compaction,
901 1644 : ctx,
902 1644 : )
903 1 : .await;
904 :
905 1644 : (guard, permit)
906 1644 : };
907 :
908 : // this wait probably never needs any "long time spent" logging, because we already nag if
909 : // compaction task goes over it's period (20s) which is quite often in production.
910 1644 : let (_guard, _permit) = tokio::select! {
911 1644 : tuple = prepare => { tuple },
912 : _ = self.cancel.cancelled() => return Ok(()),
913 : _ = cancel.cancelled() => return Ok(()),
914 : };
915 :
916 1644 : let last_record_lsn = self.get_last_record_lsn();
917 1644 :
918 1644 : // Last record Lsn could be zero in case the timeline was just created
919 1644 : if !last_record_lsn.is_valid() {
920 0 : warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
921 0 : return Ok(());
922 1644 : }
923 1644 :
924 1644 : // High level strategy for compaction / image creation:
925 1644 : //
926 1644 : // 1. First, calculate the desired "partitioning" of the
927 1644 : // currently in-use key space. The goal is to partition the
928 1644 : // key space into roughly fixed-size chunks, but also take into
929 1644 : // account any existing image layers, and try to align the
930 1644 : // chunk boundaries with the existing image layers to avoid
931 1644 : // too much churn. Also try to align chunk boundaries with
932 1644 : // relation boundaries. In principle, we don't know about
933 1644 : // relation boundaries here, we just deal with key-value
934 1644 : // pairs, and the code in pgdatadir_mapping.rs knows how to
935 1644 : // map relations into key-value pairs. But in practice we know
936 1644 : // that 'field6' is the block number, and the fields 1-5
937 1644 : // identify a relation. This is just an optimization,
938 1644 : // though.
939 1644 : //
940 1644 : // 2. Once we know the partitioning, for each partition,
941 1644 : // decide if it's time to create a new image layer. The
942 1644 : // criteria is: there has been too much "churn" since the last
943 1644 : // image layer? The "churn" is fuzzy concept, it's a
944 1644 : // combination of too many delta files, or too much WAL in
945 1644 : // total in the delta file. Or perhaps: if creating an image
946 1644 : // file would allow to delete some older files.
947 1644 : //
948 1644 : // 3. After that, we compact all level0 delta files if there
949 1644 : // are too many of them. While compacting, we also garbage
950 1644 : // collect any page versions that are no longer needed because
951 1644 : // of the new image layers we created in step 2.
952 1644 : //
953 1644 : // TODO: This high level strategy hasn't been implemented yet.
954 1644 : // Below are functions compact_level0() and create_image_layers()
955 1644 : // but they are a bit ad hoc and don't quite work like it's explained
956 1644 : // above. Rewrite it.
957 1644 :
958 1644 : // Is the timeline being deleted?
959 1644 : if self.is_stopping() {
960 0 : trace!("Dropping out of compaction on timeline shutdown");
961 0 : return Err(CompactionError::ShuttingDown);
962 1644 : }
963 1644 :
964 1644 : let target_file_size = self.get_checkpoint_distance();
965 1644 :
966 1644 : // Define partitioning schema if needed
967 1644 :
968 1644 : // FIXME: the match should only cover repartitioning, not the next steps
969 1644 : match self
970 1644 : .repartition(
971 1644 : self.get_last_record_lsn(),
972 1644 : self.get_compaction_target_size(),
973 1644 : flags,
974 1644 : ctx,
975 1644 : )
976 171920 : .await
977 : {
978 1641 : Ok((partitioning, lsn)) => {
979 1641 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
980 1641 : let image_ctx = RequestContextBuilder::extend(ctx)
981 1641 : .access_stats_behavior(AccessStatsBehavior::Skip)
982 1641 : .build();
983 1641 :
984 1641 : // 2. Compact
985 1641 : let timer = self.metrics.compact_time_histo.start_timer();
986 326269 : self.compact_level0(target_file_size, ctx).await?;
987 1640 : timer.stop_and_record();
988 :
989 : // 3. Create new image layers for partitions that have been modified
990 : // "enough".
991 1640 : let layers = self
992 1640 : .create_image_layers(&partitioning, lsn, false, &image_ctx)
993 56359 : .await
994 1640 : .map_err(anyhow::Error::from)?;
995 1640 : if let Some(remote_client) = &self.remote_client {
996 8032 : for layer in layers {
997 6392 : remote_client.schedule_layer_file_upload(layer)?;
998 : }
999 0 : }
1000 :
1001 1640 : if let Some(remote_client) = &self.remote_client {
1002 : // should any new image layer been created, not uploading index_part will
1003 : // result in a mismatch between remote_physical_size and layermap calculated
1004 : // size, which will fail some tests, but should not be an issue otherwise.
1005 1640 : remote_client.schedule_index_upload_for_file_changes()?;
1006 0 : }
1007 : }
1008 2 : Err(err) => {
1009 2 : // no partitioning? This is normal, if the timeline was just created
1010 2 : // as an empty timeline. Also in unit tests, when we use the timeline
1011 2 : // as a simple key-value store, ignoring the datadir layout. Log the
1012 2 : // error but continue.
1013 2 : //
1014 2 : // Suppress error when it's due to cancellation
1015 2 : if !self.cancel.is_cancelled() {
1016 1 : error!("could not compact, repartitioning keyspace failed: {err:?}");
1017 1 : }
1018 : }
1019 : };
1020 :
1021 1642 : Ok(())
1022 1642 : }
1023 :
1024 : /// Mutate the timeline with a [`TimelineWriter`].
1025 3350865 : pub(crate) async fn writer(&self) -> TimelineWriter<'_> {
1026 3350865 : TimelineWriter {
1027 3350865 : tl: self,
1028 3350865 : _write_guard: self.write_lock.lock().await,
1029 : }
1030 3350865 : }
1031 :
1032 : /// Check if more than 'checkpoint_distance' of WAL has been accumulated in
1033 : /// the in-memory layer, and initiate flushing it if so.
1034 : ///
1035 : /// Also flush after a period of time without new data -- it helps
1036 : /// safekeepers to regard pageserver as caught up and suspend activity.
1037 1397166 : pub(crate) async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
1038 1397166 : let last_lsn = self.get_last_record_lsn();
1039 1396420 : let open_layer_size = {
1040 1397166 : let guard = self.layers.read().await;
1041 1397166 : let layers = guard.layer_map();
1042 1397166 : let Some(open_layer) = layers.open_layer.as_ref() else {
1043 746 : return Ok(());
1044 : };
1045 1396420 : open_layer.size().await?
1046 : };
1047 1396420 : let last_freeze_at = self.last_freeze_at.load();
1048 1396420 : let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
1049 1396420 : let distance = last_lsn.widening_sub(last_freeze_at);
1050 1396420 : // Checkpointing the open layer can be triggered by layer size or LSN range.
1051 1396420 : // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
1052 1396420 : // we want to stay below that with a big margin. The LSN distance determines how
1053 1396420 : // much WAL the safekeepers need to store.
1054 1396420 : if distance >= self.get_checkpoint_distance().into()
1055 1395842 : || open_layer_size > self.get_checkpoint_distance()
1056 1392900 : || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
1057 : {
1058 3520 : info!(
1059 3520 : "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
1060 3520 : distance,
1061 3520 : open_layer_size,
1062 3520 : last_freeze_ts.elapsed()
1063 3520 : );
1064 :
1065 3520 : self.freeze_inmem_layer(true).await;
1066 3520 : self.last_freeze_at.store(last_lsn);
1067 3520 : *(self.last_freeze_ts.write().unwrap()) = Instant::now();
1068 3520 :
1069 3520 : // Wake up the layer flusher
1070 3520 : self.flush_frozen_layers();
1071 1392900 : }
1072 1396420 : Ok(())
1073 1397166 : }
1074 :
1075 1257 : pub(crate) fn activate(
1076 1257 : self: &Arc<Self>,
1077 1257 : broker_client: BrokerClientChannel,
1078 1257 : background_jobs_can_start: Option<&completion::Barrier>,
1079 1257 : ctx: &RequestContext,
1080 1257 : ) {
1081 1257 : if self.tenant_shard_id.is_zero() {
1082 1206 : // Logical size is only maintained accurately on shard zero.
1083 1206 : self.spawn_initial_logical_size_computation_task(ctx);
1084 1206 : }
1085 1257 : self.launch_wal_receiver(ctx, broker_client);
1086 1257 : self.set_state(TimelineState::Active);
1087 1257 : self.launch_eviction_task(background_jobs_can_start);
1088 1257 : }
1089 :
1090 : /// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
1091 : /// also to remote storage. This method can easily take multiple seconds for a busy timeline.
1092 : ///
1093 : /// While we are flushing, we continue to accept read I/O.
1094 261 : pub(crate) async fn flush_and_shutdown(&self) {
1095 261 : debug_assert_current_span_has_tenant_and_timeline_id();
1096 :
1097 : // Stop ingesting data, so that we are not still writing to an InMemoryLayer while
1098 : // trying to flush
1099 0 : tracing::debug!("Waiting for WalReceiverManager...");
1100 261 : task_mgr::shutdown_tasks(
1101 261 : Some(TaskKind::WalReceiverManager),
1102 261 : Some(self.tenant_shard_id),
1103 261 : Some(self.timeline_id),
1104 261 : )
1105 161 : .await;
1106 :
1107 : // Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
1108 261 : self.last_record_lsn.shutdown();
1109 261 :
1110 261 : // now all writers to InMemory layer are gone, do the final flush if requested
1111 261 : match self.freeze_and_flush().await {
1112 : Ok(_) => {
1113 : // drain the upload queue
1114 216 : if let Some(client) = self.remote_client.as_ref() {
1115 : // if we did not wait for completion here, it might be our shutdown process
1116 : // didn't wait for remote uploads to complete at all, as new tasks can forever
1117 : // be spawned.
1118 : //
1119 : // what is problematic is the shutting down of RemoteTimelineClient, because
1120 : // obviously it does not make sense to stop while we wait for it, but what
1121 : // about corner cases like s3 suddenly hanging up?
1122 216 : if let Err(e) = client.shutdown().await {
1123 : // Non-fatal. Shutdown is infallible. Failures to flush just mean that
1124 : // we have some extra WAL replay to do next time the timeline starts.
1125 0 : warn!("failed to flush to remote storage: {e:#}");
1126 216 : }
1127 0 : }
1128 : }
1129 45 : Err(e) => {
1130 : // Non-fatal. Shutdown is infallible. Failures to flush just mean that
1131 : // we have some extra WAL replay to do next time the timeline starts.
1132 45 : warn!("failed to freeze and flush: {e:#}");
1133 : }
1134 : }
1135 :
1136 435 : self.shutdown().await;
1137 261 : }
1138 :
1139 : /// Shut down immediately, without waiting for any open layers to flush to disk. This is a subset of
1140 : /// the graceful [`Timeline::flush_and_shutdown`] function.
1141 592 : pub(crate) async fn shutdown(&self) {
1142 592 : debug_assert_current_span_has_tenant_and_timeline_id();
1143 :
1144 : // Signal any subscribers to our cancellation token to drop out
1145 0 : tracing::debug!("Cancelling CancellationToken");
1146 592 : self.cancel.cancel();
1147 592 :
1148 592 : // Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
1149 592 : // while doing so.
1150 592 : self.last_record_lsn.shutdown();
1151 592 :
1152 592 : // Shut down the layer flush task before the remote client, as one depends on the other
1153 592 : task_mgr::shutdown_tasks(
1154 592 : Some(TaskKind::LayerFlushTask),
1155 592 : Some(self.tenant_shard_id),
1156 592 : Some(self.timeline_id),
1157 592 : )
1158 504 : .await;
1159 :
1160 : // Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
1161 : // case our caller wants to use that for a deletion
1162 592 : if let Some(remote_client) = self.remote_client.as_ref() {
1163 592 : match remote_client.stop() {
1164 592 : Ok(()) => {}
1165 0 : Err(StopError::QueueUninitialized) => {
1166 0 : // Shutting down during initialization is legal
1167 0 : }
1168 : }
1169 0 : }
1170 :
1171 0 : tracing::debug!("Waiting for tasks...");
1172 :
1173 619 : task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;
1174 :
1175 : // Finally wait until any gate-holders are complete
1176 592 : self.gate.close().await;
1177 592 : }
1178 :
1179 2279 : pub(crate) fn set_state(&self, new_state: TimelineState) {
1180 2279 : match (self.current_state(), new_state) {
1181 2279 : (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
1182 125 : info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
1183 : }
1184 0 : (st, TimelineState::Loading) => {
1185 0 : error!("ignoring transition from {st:?} into Loading state");
1186 : }
1187 7 : (TimelineState::Broken { .. }, new_state) => {
1188 7 : error!("Ignoring state update {new_state:?} for broken timeline");
1189 : }
1190 : (TimelineState::Stopping, TimelineState::Active) => {
1191 0 : error!("Not activating a Stopping timeline");
1192 : }
1193 2147 : (_, new_state) => {
1194 2147 : self.state.send_replace(new_state);
1195 2147 : }
1196 : }
1197 2279 : }
1198 :
1199 19 : pub(crate) fn set_broken(&self, reason: String) {
1200 19 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
1201 19 : let broken_state = TimelineState::Broken {
1202 19 : reason,
1203 19 : backtrace: backtrace_str,
1204 19 : };
1205 19 : self.set_state(broken_state);
1206 19 :
1207 19 : // Although the Broken state is not equivalent to shutdown() (shutdown will be called
1208 19 : // later when this tenant is detach or the process shuts down), firing the cancellation token
1209 19 : // here avoids the need for other tasks to watch for the Broken state explicitly.
1210 19 : self.cancel.cancel();
1211 19 : }
1212 :
1213 2277272 : pub(crate) fn current_state(&self) -> TimelineState {
1214 2277272 : self.state.borrow().clone()
1215 2277272 : }
1216 :
1217 944 : pub(crate) fn is_broken(&self) -> bool {
1218 944 : matches!(&*self.state.borrow(), TimelineState::Broken { .. })
1219 944 : }
1220 :
1221 1542206 : pub(crate) fn is_active(&self) -> bool {
1222 1542206 : self.current_state() == TimelineState::Active
1223 1542206 : }
1224 :
1225 2854 : pub(crate) fn is_stopping(&self) -> bool {
1226 2854 : self.current_state() == TimelineState::Stopping
1227 2854 : }
1228 :
1229 1256 : pub(crate) fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
1230 1256 : self.state.subscribe()
1231 1256 : }
1232 :
1233 1299917 : pub(crate) async fn wait_to_become_active(
1234 1299917 : &self,
1235 1299917 : _ctx: &RequestContext, // Prepare for use by cancellation
1236 1299919 : ) -> Result<(), TimelineState> {
1237 1299919 : let mut receiver = self.state.subscribe();
1238 1299942 : loop {
1239 1299942 : let current_state = receiver.borrow().clone();
1240 1299942 : match current_state {
1241 : TimelineState::Loading => {
1242 23 : receiver
1243 23 : .changed()
1244 23 : .await
1245 23 : .expect("holding a reference to self");
1246 : }
1247 : TimelineState::Active { .. } => {
1248 1299914 : return Ok(());
1249 : }
1250 : TimelineState::Broken { .. } | TimelineState::Stopping => {
1251 : // There's no chance the timeline can transition back into ::Active
1252 5 : return Err(current_state);
1253 : }
1254 : }
1255 : }
1256 1299919 : }
1257 :
1258 116 : pub(crate) async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
1259 116 : let guard = self.layers.read().await;
1260 116 : let layer_map = guard.layer_map();
1261 116 : let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
1262 116 : if let Some(open_layer) = &layer_map.open_layer {
1263 27 : in_memory_layers.push(open_layer.info());
1264 89 : }
1265 116 : for frozen_layer in &layer_map.frozen_layers {
1266 0 : in_memory_layers.push(frozen_layer.info());
1267 0 : }
1268 :
1269 116 : let mut historic_layers = Vec::new();
1270 3151 : for historic_layer in layer_map.iter_historic_layers() {
1271 3151 : let historic_layer = guard.get_from_desc(&historic_layer);
1272 3151 : historic_layers.push(historic_layer.info(reset));
1273 3151 : }
1274 :
1275 116 : LayerMapInfo {
1276 116 : in_memory_layers,
1277 116 : historic_layers,
1278 116 : }
1279 116 : }
1280 :
1281 4 : #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
1282 : pub(crate) async fn download_layer(
1283 : &self,
1284 : layer_file_name: &str,
1285 : ) -> anyhow::Result<Option<bool>> {
1286 : let Some(layer) = self.find_layer(layer_file_name).await else {
1287 : return Ok(None);
1288 : };
1289 :
1290 : if self.remote_client.is_none() {
1291 : return Ok(Some(false));
1292 : }
1293 :
1294 : layer.download().await?;
1295 :
1296 : Ok(Some(true))
1297 : }
1298 :
1299 : /// Evict just one layer.
1300 : ///
1301 : /// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`.
1302 2242 : pub(crate) async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
1303 2242 : let _gate = self
1304 2242 : .gate
1305 2242 : .enter()
1306 2242 : .map_err(|_| anyhow::anyhow!("Shutting down"))?;
1307 :
1308 2242 : let Some(local_layer) = self.find_layer(layer_file_name).await else {
1309 0 : return Ok(None);
1310 : };
1311 :
1312 2242 : match local_layer.evict_and_wait().await {
1313 2242 : Ok(()) => Ok(Some(true)),
1314 0 : Err(EvictionError::NotFound) => Ok(Some(false)),
1315 0 : Err(EvictionError::Downloaded) => Ok(Some(false)),
1316 : }
1317 2242 : }
1318 : }
1319 :
1320 : /// Number of times we will compute partition within a checkpoint distance.
1321 : const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
1322 :
1323 : // Private functions
1324 : impl Timeline {
1325 607 : pub(crate) fn get_lazy_slru_download(&self) -> bool {
1326 607 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1327 607 : tenant_conf
1328 607 : .lazy_slru_download
1329 607 : .unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
1330 607 : }
1331 :
1332 2795496 : fn get_checkpoint_distance(&self) -> u64 {
1333 2795496 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1334 2795496 : tenant_conf
1335 2795496 : .checkpoint_distance
1336 2795496 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
1337 2795496 : }
1338 :
1339 1392900 : fn get_checkpoint_timeout(&self) -> Duration {
1340 1392900 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1341 1392900 : tenant_conf
1342 1392900 : .checkpoint_timeout
1343 1392900 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
1344 1392900 : }
1345 :
1346 1724 : fn get_compaction_target_size(&self) -> u64 {
1347 1724 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1348 1724 : tenant_conf
1349 1724 : .compaction_target_size
1350 1724 : .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
1351 1724 : }
1352 :
1353 1641 : fn get_compaction_threshold(&self) -> usize {
1354 1641 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1355 1641 : tenant_conf
1356 1641 : .compaction_threshold
1357 1641 : .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
1358 1641 : }
1359 :
1360 36566 : fn get_image_creation_threshold(&self) -> usize {
1361 36566 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1362 36566 : tenant_conf
1363 36566 : .image_creation_threshold
1364 36566 : .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
1365 36566 : }
1366 :
1367 2599 : fn get_eviction_policy(&self) -> EvictionPolicy {
1368 2599 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1369 2599 : tenant_conf
1370 2599 : .eviction_policy
1371 2599 : .unwrap_or(self.conf.default_tenant_conf.eviction_policy)
1372 2599 : }
1373 :
1374 1651 : fn get_evictions_low_residence_duration_metric_threshold(
1375 1651 : tenant_conf: &TenantConfOpt,
1376 1651 : default_tenant_conf: &TenantConf,
1377 1651 : ) -> Duration {
1378 1651 : tenant_conf
1379 1651 : .evictions_low_residence_duration_metric_threshold
1380 1651 : .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
1381 1651 : }
1382 :
1383 13896 : fn get_gc_feedback(&self) -> bool {
1384 13896 : let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
1385 13896 : tenant_conf
1386 13896 : .gc_feedback
1387 13896 : .unwrap_or(self.conf.default_tenant_conf.gc_feedback)
1388 13896 : }
1389 :
1390 61 : pub(super) fn tenant_conf_updated(&self) {
1391 61 : // NB: Most tenant conf options are read by background loops, so,
1392 61 : // changes will automatically be picked up.
1393 61 :
1394 61 : // The threshold is embedded in the metric. So, we need to update it.
1395 61 : {
1396 61 : let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
1397 61 : &self.tenant_conf.read().unwrap().tenant_conf,
1398 61 : &self.conf.default_tenant_conf,
1399 61 : );
1400 61 :
1401 61 : let tenant_id_str = self.tenant_shard_id.tenant_id.to_string();
1402 61 : let shard_id_str = format!("{}", self.tenant_shard_id.shard_slug());
1403 61 :
1404 61 : let timeline_id_str = self.timeline_id.to_string();
1405 61 : self.metrics
1406 61 : .evictions_with_low_residence_duration
1407 61 : .write()
1408 61 : .unwrap()
1409 61 : .change_threshold(
1410 61 : &tenant_id_str,
1411 61 : &shard_id_str,
1412 61 : &timeline_id_str,
1413 61 : new_threshold,
1414 61 : );
1415 61 : }
1416 61 : }
1417 :
1418 : /// Open a Timeline handle.
1419 : ///
1420 : /// Loads the metadata for the timeline into memory, but not the layer map.
1421 : #[allow(clippy::too_many_arguments)]
1422 1590 : pub(super) fn new(
1423 1590 : conf: &'static PageServerConf,
1424 1590 : tenant_conf: Arc<RwLock<AttachedTenantConf>>,
1425 1590 : metadata: &TimelineMetadata,
1426 1590 : ancestor: Option<Arc<Timeline>>,
1427 1590 : timeline_id: TimelineId,
1428 1590 : tenant_shard_id: TenantShardId,
1429 1590 : generation: Generation,
1430 1590 : shard_identity: ShardIdentity,
1431 1590 : walredo_mgr: Option<Arc<super::WalRedoManager>>,
1432 1590 : resources: TimelineResources,
1433 1590 : pg_version: u32,
1434 1590 : state: TimelineState,
1435 1590 : cancel: CancellationToken,
1436 1590 : ) -> Arc<Self> {
1437 1590 : let disk_consistent_lsn = metadata.disk_consistent_lsn();
1438 1590 : let (state, _) = watch::channel(state);
1439 1590 :
1440 1590 : let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
1441 1590 : let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
1442 1590 :
1443 1590 : let tenant_conf_guard = tenant_conf.read().unwrap();
1444 1590 :
1445 1590 : let evictions_low_residence_duration_metric_threshold =
1446 1590 : Self::get_evictions_low_residence_duration_metric_threshold(
1447 1590 : &tenant_conf_guard.tenant_conf,
1448 1590 : &conf.default_tenant_conf,
1449 1590 : );
1450 1590 : drop(tenant_conf_guard);
1451 1590 :
1452 1590 : Arc::new_cyclic(|myself| {
1453 1590 : let mut result = Timeline {
1454 1590 : conf,
1455 1590 : tenant_conf,
1456 1590 : myself: myself.clone(),
1457 1590 : timeline_id,
1458 1590 : tenant_shard_id,
1459 1590 : generation,
1460 1590 : shard_identity,
1461 1590 : pg_version,
1462 1590 : layers: Default::default(),
1463 1590 : wanted_image_layers: Mutex::new(None),
1464 1590 :
1465 1590 : walredo_mgr,
1466 1590 : walreceiver: Mutex::new(None),
1467 1590 :
1468 1590 : remote_client: resources.remote_client.map(Arc::new),
1469 1590 :
1470 1590 : // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
1471 1590 : last_record_lsn: SeqWait::new(RecordLsn {
1472 1590 : last: disk_consistent_lsn,
1473 1590 : prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
1474 1590 : }),
1475 1590 : disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
1476 1590 :
1477 1590 : last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
1478 1590 : last_freeze_ts: RwLock::new(Instant::now()),
1479 1590 :
1480 1590 : loaded_at: (disk_consistent_lsn, SystemTime::now()),
1481 1590 :
1482 1590 : ancestor_timeline: ancestor,
1483 1590 : ancestor_lsn: metadata.ancestor_lsn(),
1484 1590 :
1485 1590 : metrics: TimelineMetrics::new(
1486 1590 : &tenant_shard_id,
1487 1590 : &timeline_id,
1488 1590 : crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
1489 1590 : "mtime",
1490 1590 : evictions_low_residence_duration_metric_threshold,
1491 1590 : ),
1492 1590 : ),
1493 1590 :
1494 1590 : query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new(
1495 1590 : &tenant_shard_id,
1496 1590 : &timeline_id,
1497 1590 : ),
1498 1590 :
1499 1590 : flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
1500 1590 :
1501 1590 : layer_flush_start_tx,
1502 1590 : layer_flush_done_tx,
1503 1590 :
1504 1590 : write_lock: tokio::sync::Mutex::new(()),
1505 1590 :
1506 1590 : gc_info: std::sync::RwLock::new(GcInfo {
1507 1590 : retain_lsns: Vec::new(),
1508 1590 : horizon_cutoff: Lsn(0),
1509 1590 : pitr_cutoff: Lsn(0),
1510 1590 : }),
1511 1590 :
1512 1590 : latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
1513 1590 : initdb_lsn: metadata.initdb_lsn(),
1514 1590 :
1515 1590 : current_logical_size: if disk_consistent_lsn.is_valid() {
1516 : // we're creating timeline data with some layer files existing locally,
1517 : // need to recalculate timeline's logical size based on data in the layers.
1518 909 : LogicalSize::deferred_initial(disk_consistent_lsn)
1519 : } else {
1520 : // we're creating timeline data without any layers existing locally,
1521 : // initial logical size is 0.
1522 681 : LogicalSize::empty_initial()
1523 : },
1524 1590 : partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
1525 1590 : repartition_threshold: 0,
1526 1590 :
1527 1590 : last_received_wal: Mutex::new(None),
1528 1590 : rel_size_cache: RwLock::new(HashMap::new()),
1529 1590 :
1530 1590 : download_all_remote_layers_task_info: RwLock::new(None),
1531 1590 :
1532 1590 : state,
1533 1590 :
1534 1590 : eviction_task_timeline_state: tokio::sync::Mutex::new(
1535 1590 : EvictionTaskTimelineState::default(),
1536 1590 : ),
1537 1590 : delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
1538 1590 :
1539 1590 : cancel,
1540 1590 : gate: Gate::default(),
1541 1590 :
1542 1590 : compaction_lock: tokio::sync::Mutex::default(),
1543 1590 : gc_lock: tokio::sync::Mutex::default(),
1544 1590 : };
1545 1590 : result.repartition_threshold =
1546 1590 : result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
1547 1590 : result
1548 1590 : .metrics
1549 1590 : .last_record_gauge
1550 1590 : .set(disk_consistent_lsn.0 as i64);
1551 1590 : result
1552 1590 : })
1553 1590 : }
1554 :
1555 2243 : pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
1556 2243 : let Ok(guard) = self.gate.enter() else {
1557 0 : info!("cannot start flush loop when the timeline gate has already been closed");
1558 0 : return;
1559 : };
1560 2243 : let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
1561 2243 : match *flush_loop_state {
1562 1573 : FlushLoopState::NotStarted => (),
1563 : FlushLoopState::Running { .. } => {
1564 670 : info!(
1565 670 : "skipping attempt to start flush_loop twice {}/{}",
1566 670 : self.tenant_shard_id, self.timeline_id
1567 670 : );
1568 670 : return;
1569 : }
1570 : FlushLoopState::Exited => {
1571 0 : warn!(
1572 0 : "ignoring attempt to restart exited flush_loop {}/{}",
1573 0 : self.tenant_shard_id, self.timeline_id
1574 0 : );
1575 0 : return;
1576 : }
1577 : }
1578 :
1579 1573 : let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
1580 1573 : let self_clone = Arc::clone(self);
1581 1573 :
1582 1573 : debug!("spawning flush loop");
1583 1573 : *flush_loop_state = FlushLoopState::Running {
1584 1573 : #[cfg(test)]
1585 1573 : expect_initdb_optimization: false,
1586 1573 : #[cfg(test)]
1587 1573 : initdb_optimization_count: 0,
1588 1573 : };
1589 1573 : task_mgr::spawn(
1590 1573 : task_mgr::BACKGROUND_RUNTIME.handle(),
1591 1573 : task_mgr::TaskKind::LayerFlushTask,
1592 1573 : Some(self.tenant_shard_id),
1593 1573 : Some(self.timeline_id),
1594 1573 : "layer flush task",
1595 : false,
1596 1573 : async move {
1597 1573 : let _guard = guard;
1598 1573 : let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
1599 27794 : self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
1600 591 : let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
1601 591 : assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
1602 591 : *flush_loop_state = FlushLoopState::Exited;
1603 591 : Ok(())
1604 591 : }
1605 1573 : .instrument(info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
1606 : );
1607 2243 : }
1608 :
1609 : /// Creates and starts the wal receiver.
1610 : ///
1611 : /// This function is expected to be called at most once per Timeline's lifecycle
1612 : /// when the timeline is activated.
1613 1257 : fn launch_wal_receiver(
1614 1257 : self: &Arc<Self>,
1615 1257 : ctx: &RequestContext,
1616 1257 : broker_client: BrokerClientChannel,
1617 1257 : ) {
1618 1257 : info!(
1619 1257 : "launching WAL receiver for timeline {} of tenant {}",
1620 1257 : self.timeline_id, self.tenant_shard_id
1621 1257 : );
1622 :
1623 1257 : let tenant_conf_guard = self.tenant_conf.read().unwrap();
1624 1257 : let wal_connect_timeout = tenant_conf_guard
1625 1257 : .tenant_conf
1626 1257 : .walreceiver_connect_timeout
1627 1257 : .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
1628 1257 : let lagging_wal_timeout = tenant_conf_guard
1629 1257 : .tenant_conf
1630 1257 : .lagging_wal_timeout
1631 1257 : .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
1632 1257 : let max_lsn_wal_lag = tenant_conf_guard
1633 1257 : .tenant_conf
1634 1257 : .max_lsn_wal_lag
1635 1257 : .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
1636 1257 : drop(tenant_conf_guard);
1637 1257 :
1638 1257 : let mut guard = self.walreceiver.lock().unwrap();
1639 1257 : assert!(
1640 1257 : guard.is_none(),
1641 0 : "multiple launches / re-launches of WAL receiver are not supported"
1642 : );
1643 1257 : *guard = Some(WalReceiver::start(
1644 1257 : Arc::clone(self),
1645 1257 : WalReceiverConf {
1646 1257 : wal_connect_timeout,
1647 1257 : lagging_wal_timeout,
1648 1257 : max_lsn_wal_lag,
1649 1257 : auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
1650 1257 : availability_zone: self.conf.availability_zone.clone(),
1651 1257 : ingest_batch_size: self.conf.ingest_batch_size,
1652 1257 : },
1653 1257 : broker_client,
1654 1257 : ctx,
1655 1257 : ));
1656 1257 : }
1657 :
1658 : /// Initialize with an empty layer map. Used when creating a new timeline.
1659 1145 : pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
1660 1145 : let mut layers = self.layers.try_write().expect(
1661 1145 : "in the context where we call this function, no other task has access to the object",
1662 1145 : );
1663 1145 : layers.initialize_empty(Lsn(start_lsn.0));
1664 1145 : }
1665 :
1666 : /// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
1667 : /// files.
1668 433 : pub(super) async fn load_layer_map(
1669 433 : &self,
1670 433 : disk_consistent_lsn: Lsn,
1671 433 : index_part: Option<IndexPart>,
1672 433 : ) -> anyhow::Result<()> {
1673 : use init::{Decision::*, Discovered, DismissedLayer};
1674 : use LayerFileName::*;
1675 :
1676 433 : let mut guard = self.layers.write().await;
1677 :
1678 433 : let timer = self.metrics.load_layer_map_histo.start_timer();
1679 433 :
1680 433 : // Scan timeline directory and create ImageFileName and DeltaFilename
1681 433 : // structs representing all files on disk
1682 433 : let timeline_path = self
1683 433 : .conf
1684 433 : .timeline_path(&self.tenant_shard_id, &self.timeline_id);
1685 433 : let conf = self.conf;
1686 433 : let span = tracing::Span::current();
1687 433 :
1688 433 : // Copy to move into the task we're about to spawn
1689 433 : let generation = self.generation;
1690 433 : let shard = self.get_shard_index();
1691 433 : let this = self.myself.upgrade().expect("&self method holds the arc");
1692 :
1693 433 : let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
1694 433 : move || {
1695 433 : let _g = span.entered();
1696 433 : let discovered = init::scan_timeline_dir(&timeline_path)?;
1697 433 : let mut discovered_layers = Vec::with_capacity(discovered.len());
1698 433 : let mut unrecognized_files = Vec::new();
1699 433 :
1700 433 : let mut path = timeline_path;
1701 :
1702 13739 : for discovered in discovered {
1703 13306 : let (name, kind) = match discovered {
1704 12821 : Discovered::Layer(file_name, file_size) => {
1705 12821 : discovered_layers.push((file_name, file_size));
1706 12821 : continue;
1707 : }
1708 : Discovered::Metadata | Discovered::IgnoredBackup => {
1709 433 : continue;
1710 : }
1711 0 : Discovered::Unknown(file_name) => {
1712 0 : // we will later error if there are any
1713 0 : unrecognized_files.push(file_name);
1714 0 : continue;
1715 : }
1716 51 : Discovered::Ephemeral(name) => (name, "old ephemeral file"),
1717 1 : Discovered::Temporary(name) => (name, "temporary timeline file"),
1718 0 : Discovered::TemporaryDownload(name) => (name, "temporary download"),
1719 : };
1720 52 : path.push(Utf8Path::new(&name));
1721 52 : init::cleanup(&path, kind)?;
1722 52 : path.pop();
1723 : }
1724 :
1725 433 : if !unrecognized_files.is_empty() {
1726 : // assume that if there are any there are many many.
1727 0 : let n = unrecognized_files.len();
1728 0 : let first = &unrecognized_files[..n.min(10)];
1729 0 : anyhow::bail!(
1730 0 : "unrecognized files in timeline dir (total {n}), first 10: {first:?}"
1731 0 : );
1732 433 : }
1733 433 :
1734 433 : let decided = init::reconcile(
1735 433 : discovered_layers,
1736 433 : index_part.as_ref(),
1737 433 : disk_consistent_lsn,
1738 433 : generation,
1739 433 : shard,
1740 433 : );
1741 433 :
1742 433 : let mut loaded_layers = Vec::new();
1743 433 : let mut needs_cleanup = Vec::new();
1744 433 : let mut total_physical_size = 0;
1745 :
1746 54013 : for (name, decision) in decided {
1747 53580 : let decision = match decision {
1748 11910 : Ok(UseRemote { local, remote }) => {
1749 11910 : // Remote is authoritative, but we may still choose to retain
1750 11910 : // the local file if the contents appear to match
1751 11910 : if local.file_size() == remote.file_size() {
1752 : // Use the local file, but take the remote metadata so that we pick up
1753 : // the correct generation.
1754 11909 : UseLocal(remote)
1755 : } else {
1756 1 : path.push(name.file_name());
1757 1 : init::cleanup_local_file_for_remote(&path, &local, &remote)?;
1758 1 : path.pop();
1759 1 : UseRemote { local, remote }
1760 : }
1761 : }
1762 41277 : Ok(decision) => decision,
1763 1 : Err(DismissedLayer::Future { local }) => {
1764 1 : if local.is_some() {
1765 0 : path.push(name.file_name());
1766 0 : init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
1767 0 : path.pop();
1768 1 : }
1769 1 : needs_cleanup.push(name);
1770 1 : continue;
1771 : }
1772 392 : Err(DismissedLayer::LocalOnly(local)) => {
1773 392 : path.push(name.file_name());
1774 392 : init::cleanup_local_only_file(&path, &name, &local)?;
1775 392 : path.pop();
1776 392 : // this file never existed remotely, we will have to do rework
1777 392 : continue;
1778 : }
1779 : };
1780 :
1781 53187 : match &name {
1782 23047 : Delta(d) => assert!(d.lsn_range.end <= disk_consistent_lsn + 1),
1783 30140 : Image(i) => assert!(i.lsn <= disk_consistent_lsn),
1784 : }
1785 :
1786 53187 : tracing::debug!(layer=%name, ?decision, "applied");
1787 :
1788 53187 : let layer = match decision {
1789 12428 : UseLocal(m) => {
1790 12428 : total_physical_size += m.file_size();
1791 12428 : Layer::for_resident(conf, &this, name, m).drop_eviction_guard()
1792 : }
1793 40758 : Evicted(remote) | UseRemote { remote, .. } => {
1794 40759 : Layer::for_evicted(conf, &this, name, remote)
1795 : }
1796 : };
1797 :
1798 53187 : loaded_layers.push(layer);
1799 : }
1800 433 : Ok((loaded_layers, needs_cleanup, total_physical_size))
1801 433 : }
1802 433 : })
1803 433 : .await
1804 433 : .map_err(anyhow::Error::new)
1805 433 : .and_then(|x| x)?;
1806 :
1807 433 : let num_layers = loaded_layers.len();
1808 433 :
1809 433 : guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
1810 :
1811 433 : if let Some(rtc) = self.remote_client.as_ref() {
1812 433 : rtc.schedule_layer_file_deletion(&needs_cleanup)?;
1813 433 : rtc.schedule_index_upload_for_file_changes()?;
1814 : // This barrier orders above DELETEs before any later operations.
1815 : // This is critical because code executing after the barrier might
1816 : // create again objects with the same key that we just scheduled for deletion.
1817 : // For example, if we just scheduled deletion of an image layer "from the future",
1818 : // later compaction might run again and re-create the same image layer.
1819 : // "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
1820 : // "same" here means same key range and LSN.
1821 : //
1822 : // Without a barrier between above DELETEs and the re-creation's PUTs,
1823 : // the upload queue may execute the PUT first, then the DELETE.
1824 : // In our example, we will end up with an IndexPart referencing a non-existent object.
1825 : //
1826 : // 1. a future image layer is created and uploaded
1827 : // 2. ps restart
1828 : // 3. the future layer from (1) is deleted during load layer map
1829 : // 4. image layer is re-created and uploaded
1830 : // 5. deletion queue would like to delete (1) but actually deletes (4)
1831 : // 6. delete by name works as expected, but it now deletes the wrong (later) version
1832 : //
1833 : // See https://github.com/neondatabase/neon/issues/5878
1834 : //
1835 : // NB: generation numbers naturally protect against this because they disambiguate
1836 : // (1) and (4)
1837 433 : rtc.schedule_barrier()?;
1838 : // Tenant::create_timeline will wait for these uploads to happen before returning, or
1839 : // on retry.
1840 0 : }
1841 :
1842 433 : info!(
1843 433 : "loaded layer map with {} layers at {}, total physical size: {}",
1844 433 : num_layers, disk_consistent_lsn, total_physical_size
1845 433 : );
1846 :
1847 433 : timer.stop_and_record();
1848 433 : Ok(())
1849 433 : }
1850 :
1851 : /// Retrieve current logical size of the timeline.
1852 : ///
1853 : /// The size could be lagging behind the actual number, in case
1854 : /// the initial size calculation has not been run (gets triggered on the first size access).
1855 : ///
1856 : /// return size and boolean flag that shows if the size is exact
1857 705403 : pub(crate) fn get_current_logical_size(
1858 705403 : self: &Arc<Self>,
1859 705403 : priority: GetLogicalSizePriority,
1860 705403 : ctx: &RequestContext,
1861 705403 : ) -> logical_size::CurrentLogicalSize {
1862 705403 : if !self.tenant_shard_id.is_zero() {
1863 : // Logical size is only accurately maintained on shard zero: when called elsewhere, for example
1864 : // when HTTP API is serving a GET for timeline zero, return zero
1865 183 : return logical_size::CurrentLogicalSize::Approximate(logical_size::Approximate::zero());
1866 705220 : }
1867 705220 :
1868 705220 : let current_size = self.current_logical_size.current_size();
1869 705220 : debug!("Current size: {current_size:?}");
1870 :
1871 705220 : match (current_size.accuracy(), priority) {
1872 704506 : (logical_size::Accuracy::Exact, _) => (), // nothing to do
1873 135 : (logical_size::Accuracy::Approximate, GetLogicalSizePriority::Background) => {
1874 135 : // background task will eventually deliver an exact value, we're in no rush
1875 135 : }
1876 : (logical_size::Accuracy::Approximate, GetLogicalSizePriority::User) => {
1877 : // background task is not ready, but user is asking for it now;
1878 : // => make the background task skip the line
1879 : // (The alternative would be to calculate the size here, but,
1880 : // it can actually take a long time if the user has a lot of rels.
1881 : // And we'll inevitable need it again; So, let the background task do the work.)
1882 579 : match self
1883 579 : .current_logical_size
1884 579 : .cancel_wait_for_background_loop_concurrency_limit_semaphore
1885 579 : .get()
1886 : {
1887 579 : Some(cancel) => cancel.cancel(),
1888 : None => {
1889 0 : let state = self.current_state();
1890 0 : if matches!(
1891 0 : state,
1892 : TimelineState::Broken { .. } | TimelineState::Stopping
1893 0 : ) {
1894 0 :
1895 0 : // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
1896 0 : // Don't make noise.
1897 0 : } else {
1898 0 : warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work");
1899 : }
1900 : }
1901 : };
1902 : }
1903 : }
1904 :
1905 705220 : if let CurrentLogicalSize::Approximate(_) = ¤t_size {
1906 714 : if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler {
1907 303 : let first = self
1908 303 : .current_logical_size
1909 303 : .did_return_approximate_to_walreceiver
1910 303 : .compare_exchange(
1911 303 : false,
1912 303 : true,
1913 303 : AtomicOrdering::Relaxed,
1914 303 : AtomicOrdering::Relaxed,
1915 303 : )
1916 303 : .is_ok();
1917 303 : if first {
1918 59 : crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc();
1919 244 : }
1920 411 : }
1921 704506 : }
1922 :
1923 705220 : current_size
1924 705403 : }
1925 :
1926 1206 : fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &RequestContext) {
1927 1206 : let Some(initial_part_end) = self.current_logical_size.initial_part_end else {
1928 : // nothing to do for freshly created timelines;
1929 570 : assert_eq!(
1930 570 : self.current_logical_size.current_size().accuracy(),
1931 570 : logical_size::Accuracy::Exact,
1932 570 : );
1933 570 : self.current_logical_size.initialized.add_permits(1);
1934 570 : return;
1935 : };
1936 :
1937 636 : let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new();
1938 636 : let token = cancel_wait_for_background_loop_concurrency_limit_semaphore.clone();
1939 636 : self.current_logical_size
1940 636 : .cancel_wait_for_background_loop_concurrency_limit_semaphore.set(token)
1941 636 : .expect("initial logical size calculation task must be spawned exactly once per Timeline object");
1942 636 :
1943 636 : let self_clone = Arc::clone(self);
1944 636 : let background_ctx = ctx.detached_child(
1945 636 : TaskKind::InitialLogicalSizeCalculation,
1946 636 : DownloadBehavior::Download,
1947 636 : );
1948 636 : task_mgr::spawn(
1949 636 : task_mgr::BACKGROUND_RUNTIME.handle(),
1950 636 : task_mgr::TaskKind::InitialLogicalSizeCalculation,
1951 636 : Some(self.tenant_shard_id),
1952 636 : Some(self.timeline_id),
1953 636 : "initial size calculation",
1954 : false,
1955 : // NB: don't log errors here, task_mgr will do that.
1956 636 : async move {
1957 636 : let cancel = task_mgr::shutdown_token();
1958 636 : self_clone
1959 636 : .initial_logical_size_calculation_task(
1960 636 : initial_part_end,
1961 636 : cancel_wait_for_background_loop_concurrency_limit_semaphore,
1962 636 : cancel,
1963 636 : background_ctx,
1964 636 : )
1965 76082 : .await;
1966 633 : Ok(())
1967 633 : }
1968 636 : .instrument(info_span!(parent: None, "initial_size_calculation", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id)),
1969 : );
1970 1206 : }
1971 :
1972 636 : async fn initial_logical_size_calculation_task(
1973 636 : self: Arc<Self>,
1974 636 : initial_part_end: Lsn,
1975 636 : skip_concurrency_limiter: CancellationToken,
1976 636 : cancel: CancellationToken,
1977 636 : background_ctx: RequestContext,
1978 636 : ) {
1979 633 : scopeguard::defer! {
1980 633 : // Irrespective of the outcome of this operation, we should unblock anyone waiting for it.
1981 633 : self.current_logical_size.initialized.add_permits(1);
1982 633 : }
1983 :
1984 : enum BackgroundCalculationError {
1985 : Cancelled,
1986 : Other(anyhow::Error),
1987 : }
1988 :
1989 636 : let try_once = |attempt: usize| {
1990 636 : let background_ctx = &background_ctx;
1991 636 : let self_ref = &self;
1992 636 : let skip_concurrency_limiter = &skip_concurrency_limiter;
1993 636 : async move {
1994 636 : let cancel = task_mgr::shutdown_token();
1995 636 : let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
1996 636 : BackgroundLoopKind::InitialLogicalSizeCalculation,
1997 636 : background_ctx,
1998 636 : );
1999 :
2000 : use crate::metrics::initial_logical_size::StartCircumstances;
2001 636 : let (_maybe_permit, circumstances) = tokio::select! {
2002 200 : permit = wait_for_permit => {
2003 : (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit)
2004 : }
2005 : _ = self_ref.cancel.cancelled() => {
2006 : return Err(BackgroundCalculationError::Cancelled);
2007 : }
2008 : _ = cancel.cancelled() => {
2009 : return Err(BackgroundCalculationError::Cancelled);
2010 : },
2011 : () = skip_concurrency_limiter.cancelled() => {
2012 : // Some action that is part of a end user interaction requested logical size
2013 : // => break out of the rate limit
2014 : // TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime;
2015 : // but then again what happens if they cancel; also, we should just be using
2016 : // one runtime across the entire process, so, let's leave this for now.
2017 : (None, StartCircumstances::SkippedConcurrencyLimiter)
2018 : }
2019 : };
2020 :
2021 623 : let metrics_guard = if attempt == 1 {
2022 623 : crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances)
2023 : } else {
2024 0 : crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
2025 : };
2026 :
2027 623 : match self_ref
2028 623 : .logical_size_calculation_task(
2029 623 : initial_part_end,
2030 623 : LogicalSizeCalculationCause::Initial,
2031 623 : background_ctx,
2032 623 : )
2033 75577 : .await
2034 : {
2035 589 : Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
2036 : Err(CalculateLogicalSizeError::Cancelled) => {
2037 29 : Err(BackgroundCalculationError::Cancelled)
2038 : }
2039 2 : Err(CalculateLogicalSizeError::Other(err)) => {
2040 1 : if let Some(PageReconstructError::AncestorStopping(_)) =
2041 2 : err.root_cause().downcast_ref()
2042 : {
2043 0 : Err(BackgroundCalculationError::Cancelled)
2044 : } else {
2045 2 : Err(BackgroundCalculationError::Other(err))
2046 : }
2047 : }
2048 : }
2049 633 : }
2050 636 : };
2051 :
2052 636 : let retrying = async {
2053 636 : let mut attempt = 0;
2054 636 : loop {
2055 636 : attempt += 1;
2056 636 :
2057 76082 : match try_once(attempt).await {
2058 589 : Ok(res) => return ControlFlow::Continue(res),
2059 42 : Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
2060 2 : Err(BackgroundCalculationError::Other(e)) => {
2061 2 : warn!(attempt, "initial size calculation failed: {e:?}");
2062 : // exponential back-off doesn't make sense at these long intervals;
2063 : // use fixed retry interval with generous jitter instead
2064 2 : let sleep_duration = Duration::from_secs(
2065 2 : u64::try_from(
2066 2 : // 1hour base
2067 2 : (60_i64 * 60_i64)
2068 2 : // 10min jitter
2069 2 : + rand::thread_rng().gen_range(-10 * 60..10 * 60),
2070 2 : )
2071 2 : .expect("10min < 1hour"),
2072 2 : );
2073 2 : tokio::time::sleep(sleep_duration).await;
2074 : }
2075 : }
2076 : }
2077 631 : };
2078 :
2079 636 : let (calculated_size, metrics_guard) = tokio::select! {
2080 631 : res = retrying => {
2081 : match res {
2082 : ControlFlow::Continue(calculated_size) => calculated_size,
2083 : ControlFlow::Break(()) => return,
2084 : }
2085 : }
2086 : _ = cancel.cancelled() => {
2087 : return;
2088 : }
2089 : };
2090 :
2091 : // we cannot query current_logical_size.current_size() to know the current
2092 : // *negative* value, only truncated to u64.
2093 589 : let added = self
2094 589 : .current_logical_size
2095 589 : .size_added_after_initial
2096 589 : .load(AtomicOrdering::Relaxed);
2097 589 :
2098 589 : let sum = calculated_size.saturating_add_signed(added);
2099 589 :
2100 589 : // set the gauge value before it can be set in `update_current_logical_size`.
2101 589 : self.metrics.current_logical_size_gauge.set(sum);
2102 589 :
2103 589 : self.current_logical_size
2104 589 : .initial_logical_size
2105 589 : .set((calculated_size, metrics_guard.calculation_result_saved()))
2106 589 : .ok()
2107 589 : .expect("only this task sets it");
2108 633 : }
2109 :
2110 35 : pub(crate) fn spawn_ondemand_logical_size_calculation(
2111 35 : self: &Arc<Self>,
2112 35 : lsn: Lsn,
2113 35 : cause: LogicalSizeCalculationCause,
2114 35 : ctx: RequestContext,
2115 35 : ) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
2116 35 : let (sender, receiver) = oneshot::channel();
2117 35 : let self_clone = Arc::clone(self);
2118 35 : // XXX if our caller loses interest, i.e., ctx is cancelled,
2119 35 : // we should stop the size calculation work and return an error.
2120 35 : // That would require restructuring this function's API to
2121 35 : // return the result directly, instead of a Receiver for the result.
2122 35 : let ctx = ctx.detached_child(
2123 35 : TaskKind::OndemandLogicalSizeCalculation,
2124 35 : DownloadBehavior::Download,
2125 35 : );
2126 35 : task_mgr::spawn(
2127 35 : task_mgr::BACKGROUND_RUNTIME.handle(),
2128 35 : task_mgr::TaskKind::OndemandLogicalSizeCalculation,
2129 35 : Some(self.tenant_shard_id),
2130 35 : Some(self.timeline_id),
2131 35 : "ondemand logical size calculation",
2132 35 : false,
2133 35 : async move {
2134 35 : let res = self_clone
2135 35 : .logical_size_calculation_task(lsn, cause, &ctx)
2136 1983 : .await;
2137 34 : let _ = sender.send(res).ok();
2138 34 : Ok(()) // Receiver is responsible for handling errors
2139 35 : }
2140 35 : .in_current_span(),
2141 35 : );
2142 35 : receiver
2143 35 : }
2144 :
2145 : /// # Cancel-Safety
2146 : ///
2147 : /// This method is cancellation-safe.
2148 1316 : #[instrument(skip_all)]
2149 : async fn logical_size_calculation_task(
2150 : self: &Arc<Self>,
2151 : lsn: Lsn,
2152 : cause: LogicalSizeCalculationCause,
2153 : ctx: &RequestContext,
2154 : ) -> Result<u64, CalculateLogicalSizeError> {
2155 : crate::span::debug_assert_current_span_has_tenant_and_timeline_id();
2156 : // We should never be calculating logical sizes on shard !=0, because these shards do not have
2157 : // accurate relation sizes, and they do not emit consumption metrics.
2158 : debug_assert!(self.tenant_shard_id.is_zero());
2159 :
2160 : let _guard = self.gate.enter();
2161 :
2162 : let self_calculation = Arc::clone(self);
2163 :
2164 658 : let mut calculation = pin!(async {
2165 658 : let ctx = ctx.attached_child();
2166 658 : self_calculation
2167 658 : .calculate_logical_size(lsn, cause, &ctx)
2168 77562 : .await
2169 654 : });
2170 :
2171 78214 : tokio::select! {
2172 653 : res = &mut calculation => { res }
2173 : _ = self.cancel.cancelled() => {
2174 0 : debug!("cancelling logical size calculation for timeline shutdown");
2175 : calculation.await
2176 : }
2177 : _ = task_mgr::shutdown_watcher() => {
2178 0 : debug!("cancelling logical size calculation for task shutdown");
2179 : calculation.await
2180 : }
2181 : }
2182 : }
2183 :
2184 : /// Calculate the logical size of the database at the latest LSN.
2185 : ///
2186 : /// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
2187 : /// especially if we need to download remote layers.
2188 : ///
2189 : /// # Cancel-Safety
2190 : ///
2191 : /// This method is cancellation-safe.
2192 665 : async fn calculate_logical_size(
2193 665 : &self,
2194 665 : up_to_lsn: Lsn,
2195 665 : cause: LogicalSizeCalculationCause,
2196 665 : ctx: &RequestContext,
2197 665 : ) -> Result<u64, CalculateLogicalSizeError> {
2198 665 : info!(
2199 665 : "Calculating logical size for timeline {} at {}",
2200 665 : self.timeline_id, up_to_lsn
2201 665 : );
2202 : // These failpoints are used by python tests to ensure that we don't delete
2203 : // the timeline while the logical size computation is ongoing.
2204 : // The first failpoint is used to make this function pause.
2205 : // Then the python test initiates timeline delete operation in a thread.
2206 : // It waits for a few seconds, then arms the second failpoint and disables
2207 : // the first failpoint. The second failpoint prints an error if the timeline
2208 : // delete code has deleted the on-disk state while we're still running here.
2209 : // It shouldn't do that. If it does it anyway, the error will be caught
2210 : // by the test suite, highlighting the problem.
2211 0 : fail::fail_point!("timeline-calculate-logical-size-pause");
2212 665 : fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
2213 2 : if !self
2214 2 : .conf
2215 2 : .metadata_path(&self.tenant_shard_id, &self.timeline_id)
2216 2 : .exists()
2217 : {
2218 0 : error!("timeline-calculate-logical-size-pre metadata file does not exist")
2219 2 : }
2220 : // need to return something
2221 2 : Ok(0)
2222 665 : });
2223 : // See if we've already done the work for initial size calculation.
2224 : // This is a short-cut for timelines that are mostly unused.
2225 663 : if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
2226 6 : return Ok(size);
2227 657 : }
2228 657 : let storage_time_metrics = match cause {
2229 : LogicalSizeCalculationCause::Initial
2230 : | LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
2231 645 : | LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
2232 : LogicalSizeCalculationCause::EvictionTaskImitation => {
2233 12 : &self.metrics.imitate_logical_size_histo
2234 : }
2235 : };
2236 657 : let timer = storage_time_metrics.start_timer();
2237 657 : let logical_size = self
2238 657 : .get_current_logical_size_non_incremental(up_to_lsn, ctx)
2239 78021 : .await?;
2240 0 : debug!("calculated logical size: {logical_size}");
2241 622 : timer.stop_and_record();
2242 622 : Ok(logical_size)
2243 661 : }
2244 :
2245 : /// Update current logical size, adding `delta' to the old value.
2246 638850 : fn update_current_logical_size(&self, delta: i64) {
2247 638850 : let logical_size = &self.current_logical_size;
2248 638850 : logical_size.increment_size(delta);
2249 638850 :
2250 638850 : // Also set the value in the prometheus gauge. Note that
2251 638850 : // there is a race condition here: if this is is called by two
2252 638850 : // threads concurrently, the prometheus gauge might be set to
2253 638850 : // one value while current_logical_size is set to the
2254 638850 : // other.
2255 638850 : match logical_size.current_size() {
2256 638751 : CurrentLogicalSize::Exact(ref new_current_size) => self
2257 638751 : .metrics
2258 638751 : .current_logical_size_gauge
2259 638751 : .set(new_current_size.into()),
2260 99 : CurrentLogicalSize::Approximate(_) => {
2261 99 : // don't update the gauge yet, this allows us not to update the gauge back and
2262 99 : // forth between the initial size calculation task.
2263 99 : }
2264 : }
2265 638850 : }
2266 :
2267 2244 : async fn find_layer(&self, layer_file_name: &str) -> Option<Layer> {
2268 2244 : let guard = self.layers.read().await;
2269 784845 : for historic_layer in guard.layer_map().iter_historic_layers() {
2270 784845 : let historic_layer_name = historic_layer.filename().file_name();
2271 784845 : if layer_file_name == historic_layer_name {
2272 2244 : return Some(guard.get_from_desc(&historic_layer));
2273 782601 : }
2274 : }
2275 :
2276 0 : None
2277 2244 : }
2278 :
2279 : /// The timeline heatmap is a hint to secondary locations from the primary location,
2280 : /// indicating which layers are currently on-disk on the primary.
2281 : ///
2282 : /// None is returned if the Timeline is in a state where uploading a heatmap
2283 : /// doesn't make sense, such as shutting down or initializing. The caller
2284 : /// should treat this as a cue to simply skip doing any heatmap uploading
2285 : /// for this timeline.
2286 12 : pub(crate) async fn generate_heatmap(&self) -> Option<HeatMapTimeline> {
2287 : // no point in heatmaps without remote client
2288 12 : let _remote_client = self.remote_client.as_ref()?;
2289 :
2290 12 : if !self.is_active() {
2291 0 : return None;
2292 12 : }
2293 :
2294 12 : let guard = self.layers.read().await;
2295 :
2296 2515 : let resident = guard.resident_layers().map(|layer| {
2297 2515 : let last_activity_ts = layer.access_stats().latest_activity_or_now();
2298 2515 :
2299 2515 : HeatMapLayer::new(
2300 2515 : layer.layer_desc().filename(),
2301 2515 : layer.metadata().into(),
2302 2515 : last_activity_ts,
2303 2515 : )
2304 2515 : });
2305 :
2306 12 : let layers = resident.collect().await;
2307 :
2308 12 : Some(HeatMapTimeline::new(self.timeline_id, layers))
2309 12 : }
2310 : }
2311 :
2312 : type TraversalId = String;
2313 :
2314 : trait TraversalLayerExt {
2315 : fn traversal_id(&self) -> TraversalId;
2316 : }
2317 :
2318 : impl TraversalLayerExt for Layer {
2319 1452 : fn traversal_id(&self) -> TraversalId {
2320 1452 : self.local_path().to_string()
2321 1452 : }
2322 : }
2323 :
2324 : impl TraversalLayerExt for Arc<InMemoryLayer> {
2325 332 : fn traversal_id(&self) -> TraversalId {
2326 332 : format!("timeline {} in-memory {self}", self.get_timeline_id())
2327 332 : }
2328 : }
2329 :
2330 : impl Timeline {
2331 : ///
2332 : /// Get a handle to a Layer for reading.
2333 : ///
2334 : /// The returned Layer might be from an ancestor timeline, if the
2335 : /// segment hasn't been updated on this timeline yet.
2336 : ///
2337 : /// This function takes the current timeline's locked LayerMap as an argument,
2338 : /// so callers can avoid potential race conditions.
2339 : ///
2340 : /// # Cancel-Safety
2341 : ///
2342 : /// This method is cancellation-safe.
2343 7350991 : async fn get_reconstruct_data(
2344 7350991 : &self,
2345 7350991 : key: Key,
2346 7350991 : request_lsn: Lsn,
2347 7350991 : reconstruct_state: &mut ValueReconstructState,
2348 7350991 : ctx: &RequestContext,
2349 7351016 : ) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
2350 7351016 : // Start from the current timeline.
2351 7351016 : let mut timeline_owned;
2352 7351016 : let mut timeline = self;
2353 7351016 :
2354 7351016 : let mut read_count = scopeguard::guard(0, |cnt| {
2355 7351010 : crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
2356 7351016 : });
2357 7351016 :
2358 7351016 : // For debugging purposes, collect the path of layers that we traversed
2359 7351016 : // through. It's included in the error message if we fail to find the key.
2360 7351016 : let mut traversal_path = Vec::<TraversalPathItem>::new();
2361 :
2362 7351016 : let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
2363 1720526 : *cached_lsn
2364 : } else {
2365 5630490 : Lsn(0)
2366 : };
2367 :
2368 : // 'prev_lsn' tracks the last LSN that we were at in our search. It's used
2369 : // to check that each iteration make some progress, to break infinite
2370 : // looping if something goes wrong.
2371 7351016 : let mut prev_lsn = Lsn(u64::MAX);
2372 7351016 :
2373 7351016 : let mut result = ValueReconstructResult::Continue;
2374 7351016 : let mut cont_lsn = Lsn(request_lsn.0 + 1);
2375 :
2376 31616645 : 'outer: loop {
2377 31616645 : if self.cancel.is_cancelled() {
2378 34 : return Err(PageReconstructError::Cancelled);
2379 31616611 : }
2380 31616611 :
2381 31616611 : // The function should have updated 'state'
2382 31616611 : //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
2383 31616611 : match result {
2384 5661473 : ValueReconstructResult::Complete => return Ok(traversal_path),
2385 : ValueReconstructResult::Continue => {
2386 : // If we reached an earlier cached page image, we're done.
2387 25955133 : if cont_lsn == cached_lsn + 1 {
2388 1688207 : MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
2389 1688207 : return Ok(traversal_path);
2390 24266926 : }
2391 24266926 : if prev_lsn <= cont_lsn {
2392 : // Didn't make any progress in last iteration. Error out to avoid
2393 : // getting stuck in the loop.
2394 1264 : return Err(layer_traversal_error(format!(
2395 1264 : "could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
2396 1264 : key,
2397 1264 : Lsn(cont_lsn.0 - 1),
2398 1264 : request_lsn,
2399 1264 : timeline.ancestor_lsn
2400 1264 : ), traversal_path));
2401 24265662 : }
2402 24265662 : prev_lsn = cont_lsn;
2403 : }
2404 : ValueReconstructResult::Missing => {
2405 : return Err(layer_traversal_error(
2406 5 : if cfg!(test) {
2407 4 : format!(
2408 4 : "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}\n{}",
2409 4 : key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
2410 4 : )
2411 : } else {
2412 1 : format!(
2413 1 : "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}",
2414 1 : key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn
2415 1 : )
2416 : },
2417 5 : traversal_path,
2418 : ));
2419 : }
2420 : }
2421 :
2422 : // Recurse into ancestor if needed
2423 24265662 : if is_inherited_key(key) && Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
2424 0 : trace!(
2425 0 : "going into ancestor {}, cont_lsn is {}",
2426 0 : timeline.ancestor_lsn,
2427 0 : cont_lsn
2428 0 : );
2429 :
2430 1298662 : timeline_owned = timeline.get_ready_ancestor_timeline(ctx).await?;
2431 1298658 : timeline = &*timeline_owned;
2432 1298658 : prev_lsn = Lsn(u64::MAX);
2433 1298658 : continue 'outer;
2434 22967000 : }
2435 :
2436 22967000 : let guard = timeline.layers.read().await;
2437 22967000 : let layers = guard.layer_map();
2438 :
2439 : // Check the open and frozen in-memory layers first, in order from newest
2440 : // to oldest.
2441 22967000 : if let Some(open_layer) = &layers.open_layer {
2442 17747685 : let start_lsn = open_layer.get_lsn_range().start;
2443 17747685 : if cont_lsn > start_lsn {
2444 : //info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
2445 : // Get all the data needed to reconstruct the page version from this layer.
2446 : // But if we have an older cached page image, no need to go past that.
2447 5734821 : let lsn_floor = max(cached_lsn + 1, start_lsn);
2448 5734821 : result = match open_layer
2449 5734821 : .get_value_reconstruct_data(
2450 5734821 : key,
2451 5734821 : lsn_floor..cont_lsn,
2452 5734821 : reconstruct_state,
2453 5734821 : ctx,
2454 5734821 : )
2455 669074 : .await
2456 : {
2457 5734821 : Ok(result) => result,
2458 0 : Err(e) => return Err(PageReconstructError::from(e)),
2459 : };
2460 5734821 : cont_lsn = lsn_floor;
2461 5734821 : // metrics: open_layer does not count as fs access, so we are not updating `read_count`
2462 5734821 : traversal_path.push((
2463 5734821 : result,
2464 5734821 : cont_lsn,
2465 5734821 : Box::new({
2466 5734821 : let open_layer = Arc::clone(open_layer);
2467 5734821 : move || open_layer.traversal_id()
2468 5734821 : }),
2469 5734821 : ));
2470 5734821 : continue 'outer;
2471 12012864 : }
2472 5219315 : }
2473 17232179 : for frozen_layer in layers.frozen_layers.iter().rev() {
2474 888782 : let start_lsn = frozen_layer.get_lsn_range().start;
2475 888782 : if cont_lsn > start_lsn {
2476 : //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
2477 277852 : let lsn_floor = max(cached_lsn + 1, start_lsn);
2478 277852 : result = match frozen_layer
2479 277852 : .get_value_reconstruct_data(
2480 277852 : key,
2481 277852 : lsn_floor..cont_lsn,
2482 277852 : reconstruct_state,
2483 277852 : ctx,
2484 277852 : )
2485 19683 : .await
2486 : {
2487 277852 : Ok(result) => result,
2488 0 : Err(e) => return Err(PageReconstructError::from(e)),
2489 : };
2490 277852 : cont_lsn = lsn_floor;
2491 277852 : // metrics: open_layer does not count as fs access, so we are not updating `read_count`
2492 277852 : traversal_path.push((
2493 277852 : result,
2494 277852 : cont_lsn,
2495 277852 : Box::new({
2496 277852 : let frozen_layer = Arc::clone(frozen_layer);
2497 277852 : move || frozen_layer.traversal_id()
2498 277852 : }),
2499 277852 : ));
2500 277852 : continue 'outer;
2501 610930 : }
2502 : }
2503 :
2504 16954327 : if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
2505 16818687 : let layer = guard.get_from_desc(&layer);
2506 16818687 : // Get all the data needed to reconstruct the page version from this layer.
2507 16818687 : // But if we have an older cached page image, no need to go past that.
2508 16818687 : let lsn_floor = max(cached_lsn + 1, lsn_floor);
2509 16818687 : result = match layer
2510 16818687 : .get_value_reconstruct_data(key, lsn_floor..cont_lsn, reconstruct_state, ctx)
2511 1002781 : .await
2512 : {
2513 16818658 : Ok(result) => result,
2514 21 : Err(e) => return Err(PageReconstructError::from(e)),
2515 : };
2516 16818658 : cont_lsn = lsn_floor;
2517 16818658 : *read_count += 1;
2518 16818658 : traversal_path.push((
2519 16818658 : result,
2520 16818658 : cont_lsn,
2521 16818658 : Box::new({
2522 16818658 : let layer = layer.to_owned();
2523 16818658 : move || layer.traversal_id()
2524 16818658 : }),
2525 16818658 : ));
2526 16818658 : continue 'outer;
2527 135640 : } else if timeline.ancestor_timeline.is_some() {
2528 : // Nothing on this timeline. Traverse to parent
2529 135639 : result = ValueReconstructResult::Continue;
2530 135639 : cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
2531 135639 : continue 'outer;
2532 : } else {
2533 : // Nothing found
2534 1 : result = ValueReconstructResult::Missing;
2535 1 : continue 'outer;
2536 : }
2537 : }
2538 7351008 : }
2539 :
2540 : /// # Cancel-safety
2541 : ///
2542 : /// This method is cancellation-safe.
2543 7427636 : async fn lookup_cached_page(
2544 7427636 : &self,
2545 7427636 : key: &Key,
2546 7427636 : lsn: Lsn,
2547 7427636 : ctx: &RequestContext,
2548 7427660 : ) -> Option<(Lsn, Bytes)> {
2549 7427660 : let cache = page_cache::get();
2550 :
2551 : // FIXME: It's pointless to check the cache for things that are not 8kB pages.
2552 : // We should look at the key to determine if it's a cacheable object
2553 7427660 : let (lsn, read_guard) = cache
2554 7427660 : .lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
2555 5630490 : .await?;
2556 1797170 : let img = Bytes::from(read_guard.to_vec());
2557 1797170 : Some((lsn, img))
2558 7427660 : }
2559 :
2560 1298660 : async fn get_ready_ancestor_timeline(
2561 1298660 : &self,
2562 1298660 : ctx: &RequestContext,
2563 1298662 : ) -> Result<Arc<Timeline>, GetReadyAncestorError> {
2564 1298662 : let ancestor = match self.get_ancestor_timeline() {
2565 1298662 : Ok(timeline) => timeline,
2566 0 : Err(e) => return Err(GetReadyAncestorError::from(e)),
2567 : };
2568 :
2569 : // It's possible that the ancestor timeline isn't active yet, or
2570 : // is active but hasn't yet caught up to the branch point. Wait
2571 : // for it.
2572 : //
2573 : // This cannot happen while the pageserver is running normally,
2574 : // because you cannot create a branch from a point that isn't
2575 : // present in the pageserver yet. However, we don't wait for the
2576 : // branch point to be uploaded to cloud storage before creating
2577 : // a branch. I.e., the branch LSN need not be remote consistent
2578 : // for the branching operation to succeed.
2579 : //
2580 : // Hence, if we try to load a tenant in such a state where
2581 : // 1. the existence of the branch was persisted (in IndexPart and/or locally)
2582 : // 2. but the ancestor state is behind branch_lsn because it was not yet persisted
2583 : // then we will need to wait for the ancestor timeline to
2584 : // re-stream WAL up to branch_lsn before we access it.
2585 : //
2586 : // How can a tenant get in such a state?
2587 : // - ungraceful pageserver process exit
2588 : // - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
2589 : //
2590 : // NB: this could be avoided by requiring
2591 : // branch_lsn >= remote_consistent_lsn
2592 : // during branch creation.
2593 1298662 : match ancestor.wait_to_become_active(ctx).await {
2594 1298658 : Ok(()) => {}
2595 : Err(TimelineState::Stopping) => {
2596 2 : return Err(GetReadyAncestorError::AncestorStopping(
2597 2 : ancestor.timeline_id,
2598 2 : ));
2599 : }
2600 2 : Err(state) => {
2601 2 : return Err(GetReadyAncestorError::Other(anyhow::anyhow!(
2602 2 : "Timeline {} will not become active. Current state: {:?}",
2603 2 : ancestor.timeline_id,
2604 2 : &state,
2605 2 : )));
2606 : }
2607 : }
2608 1298658 : ancestor
2609 1298658 : .wait_lsn(self.ancestor_lsn, ctx)
2610 6 : .await
2611 1298658 : .map_err(|e| match e {
2612 0 : e @ WaitLsnError::Timeout(_) => GetReadyAncestorError::AncestorLsnTimeout(e),
2613 0 : WaitLsnError::Shutdown => GetReadyAncestorError::Cancelled,
2614 0 : e @ WaitLsnError::BadState => GetReadyAncestorError::Other(anyhow::anyhow!(e)),
2615 1298658 : })?;
2616 :
2617 1298658 : Ok(ancestor)
2618 1298662 : }
2619 :
2620 1298660 : fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
2621 1298660 : let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
2622 0 : format!(
2623 0 : "Ancestor is missing. Timeline id: {} Ancestor id {:?}",
2624 0 : self.timeline_id,
2625 0 : self.get_ancestor_timeline_id(),
2626 0 : )
2627 1298660 : })?;
2628 1298660 : Ok(Arc::clone(ancestor))
2629 1298660 : }
2630 :
2631 1136326 : pub(crate) fn get_shard_identity(&self) -> &ShardIdentity {
2632 1136326 : &self.shard_identity
2633 1136326 : }
2634 :
2635 : ///
2636 : /// Get a handle to the latest layer for appending.
2637 : ///
2638 2946592 : async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
2639 2946592 : let mut guard = self.layers.write().await;
2640 2946592 : let layer = guard
2641 2946592 : .get_layer_for_write(
2642 2946592 : lsn,
2643 2946592 : self.get_last_record_lsn(),
2644 2946592 : self.conf,
2645 2946592 : self.timeline_id,
2646 2946592 : self.tenant_shard_id,
2647 2946592 : )
2648 269 : .await?;
2649 2946592 : Ok(layer)
2650 2946592 : }
2651 :
2652 1214102 : async fn put_value(
2653 1214102 : &self,
2654 1214102 : key: Key,
2655 1214102 : lsn: Lsn,
2656 1214102 : val: &Value,
2657 1214102 : ctx: &RequestContext,
2658 1214102 : ) -> anyhow::Result<()> {
2659 : //info!("PUT: key {} at {}", key, lsn);
2660 1214102 : let layer = self.get_layer_for_write(lsn).await?;
2661 1214102 : layer.put_value(key, lsn, val, ctx).await?;
2662 1214102 : Ok(())
2663 1214102 : }
2664 :
2665 1713238 : async fn put_values(
2666 1713238 : &self,
2667 1713238 : values: &HashMap<Key, Vec<(Lsn, Value)>>,
2668 1713238 : ctx: &RequestContext,
2669 1713239 : ) -> anyhow::Result<()> {
2670 : // Pick the first LSN in the batch to get the layer to write to.
2671 1713239 : for lsns in values.values() {
2672 1713239 : if let Some((lsn, _)) = lsns.first() {
2673 1713239 : let layer = self.get_layer_for_write(*lsn).await?;
2674 1713239 : layer.put_values(values, ctx).await?;
2675 1713238 : break;
2676 0 : }
2677 : }
2678 1713238 : Ok(())
2679 1713238 : }
2680 :
2681 19251 : async fn put_tombstones(&self, tombstones: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
2682 19251 : if let Some((_, lsn)) = tombstones.first() {
2683 19251 : let layer = self.get_layer_for_write(*lsn).await?;
2684 19251 : layer.put_tombstones(tombstones).await?;
2685 0 : }
2686 19251 : Ok(())
2687 19251 : }
2688 :
2689 76398101 : pub(crate) fn finish_write(&self, new_lsn: Lsn) {
2690 76398101 : assert!(new_lsn.is_aligned());
2691 :
2692 76398101 : self.metrics.last_record_gauge.set(new_lsn.0 as i64);
2693 76398101 : self.last_record_lsn.advance(new_lsn);
2694 76398101 : }
2695 :
2696 5472 : async fn freeze_inmem_layer(&self, write_lock_held: bool) {
2697 : // Freeze the current open in-memory layer. It will be written to disk on next
2698 : // iteration.
2699 5472 : let _write_guard = if write_lock_held {
2700 3520 : None
2701 : } else {
2702 1952 : Some(self.write_lock.lock().await)
2703 : };
2704 5472 : let mut guard = self.layers.write().await;
2705 5472 : guard
2706 5472 : .try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at)
2707 10 : .await;
2708 5472 : }
2709 :
2710 : /// Layer flusher task's main loop.
2711 1573 : async fn flush_loop(
2712 1573 : self: &Arc<Self>,
2713 1573 : mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
2714 1573 : ctx: &RequestContext,
2715 1573 : ) {
2716 1573 : info!("started flush loop");
2717 : loop {
2718 6676 : tokio::select! {
2719 12001 : _ = self.cancel.cancelled() => {
2720 12001 : info!("shutting down layer flush task");
2721 12001 : break;
2722 12001 : },
2723 12001 : _ = task_mgr::shutdown_watcher() => {
2724 12001 : info!("shutting down layer flush task");
2725 12001 : break;
2726 12001 : },
2727 12001 : _ = layer_flush_start_rx.changed() => {}
2728 12001 : }
2729 :
2730 0 : trace!("waking up");
2731 5109 : let timer = self.metrics.flush_time_histo.start_timer();
2732 5109 : let flush_counter = *layer_flush_start_rx.borrow();
2733 10340 : let result = loop {
2734 10340 : if self.cancel.is_cancelled() {
2735 0 : info!("dropping out of flush loop for timeline shutdown");
2736 : // Note: we do not bother transmitting into [`layer_flush_done_tx`], because
2737 : // anyone waiting on that will respect self.cancel as well: they will stop
2738 : // waiting at the same time we as drop out of this loop.
2739 0 : return;
2740 10340 : }
2741 :
2742 10340 : let layer_to_flush = {
2743 10340 : let guard = self.layers.read().await;
2744 10340 : guard.layer_map().frozen_layers.front().cloned()
2745 : // drop 'layers' lock to allow concurrent reads and writes
2746 : };
2747 10340 : let Some(layer_to_flush) = layer_to_flush else {
2748 5103 : break Ok(());
2749 : };
2750 22239 : match self.flush_frozen_layer(layer_to_flush, ctx).await {
2751 5231 : Ok(()) => {}
2752 : Err(FlushLayerError::Cancelled) => {
2753 1 : info!("dropping out of flush loop for timeline shutdown");
2754 1 : return;
2755 : }
2756 0 : err @ Err(
2757 : FlushLayerError::Other(_) | FlushLayerError::CreateImageLayersError(_),
2758 : ) => {
2759 0 : error!("could not flush frozen layer: {err:?}");
2760 0 : break err;
2761 : }
2762 : }
2763 : };
2764 : // Notify any listeners that we're done
2765 5103 : let _ = self
2766 5103 : .layer_flush_done_tx
2767 5103 : .send_replace((flush_counter, result));
2768 5103 :
2769 5103 : timer.stop_and_record();
2770 : }
2771 591 : }
2772 :
2773 1952 : async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
2774 1952 : let mut rx = self.layer_flush_done_tx.subscribe();
2775 1952 :
2776 1952 : // Increment the flush cycle counter and wake up the flush task.
2777 1952 : // Remember the new value, so that when we listen for the flush
2778 1952 : // to finish, we know when the flush that we initiated has
2779 1952 : // finished, instead of some other flush that was started earlier.
2780 1952 : let mut my_flush_request = 0;
2781 1952 :
2782 1952 : let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
2783 1952 : if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
2784 45 : anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
2785 1907 : }
2786 1907 :
2787 1907 : self.layer_flush_start_tx.send_modify(|counter| {
2788 1907 : my_flush_request = *counter + 1;
2789 1907 : *counter = my_flush_request;
2790 1907 : });
2791 :
2792 3813 : loop {
2793 3813 : {
2794 3813 : let (last_result_counter, last_result) = &*rx.borrow();
2795 3813 : if *last_result_counter >= my_flush_request {
2796 1906 : if let Err(_err) = last_result {
2797 : // We already logged the original error in
2798 : // flush_loop. We cannot propagate it to the caller
2799 : // here, because it might not be Cloneable
2800 0 : anyhow::bail!(
2801 0 : "Could not flush frozen layer. Request id: {}",
2802 0 : my_flush_request
2803 0 : );
2804 : } else {
2805 1906 : return Ok(());
2806 : }
2807 1907 : }
2808 : }
2809 0 : trace!("waiting for flush to complete");
2810 1907 : tokio::select! {
2811 1906 : rx_e = rx.changed() => {
2812 : rx_e?;
2813 : },
2814 : // Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring
2815 : // the notification from [`flush_loop`] that it completed.
2816 : _ = self.cancel.cancelled() => {
2817 1 : tracing::info!("Cancelled layer flush due on timeline shutdown");
2818 : return Ok(())
2819 : }
2820 : };
2821 0 : trace!("done")
2822 : }
2823 1952 : }
2824 :
2825 3520 : fn flush_frozen_layers(&self) {
2826 3520 : self.layer_flush_start_tx.send_modify(|val| *val += 1);
2827 3520 : }
2828 :
2829 : /// Flush one frozen in-memory layer to disk, as a new delta layer.
2830 10474 : #[instrument(skip_all, fields(layer=%frozen_layer))]
2831 : async fn flush_frozen_layer(
2832 : self: &Arc<Self>,
2833 : frozen_layer: Arc<InMemoryLayer>,
2834 : ctx: &RequestContext,
2835 : ) -> Result<(), FlushLayerError> {
2836 : debug_assert_current_span_has_tenant_and_timeline_id();
2837 : // As a special case, when we have just imported an image into the repository,
2838 : // instead of writing out a L0 delta layer, we directly write out image layer
2839 : // files instead. This is possible as long as *all* the data imported into the
2840 : // repository have the same LSN.
2841 : let lsn_range = frozen_layer.get_lsn_range();
2842 : let (layers_to_upload, delta_layer_to_add) =
2843 : if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
2844 : #[cfg(test)]
2845 : match &mut *self.flush_loop_state.lock().unwrap() {
2846 : FlushLoopState::NotStarted | FlushLoopState::Exited => {
2847 : panic!("flush loop not running")
2848 : }
2849 : FlushLoopState::Running {
2850 : initdb_optimization_count,
2851 : ..
2852 : } => {
2853 : *initdb_optimization_count += 1;
2854 : }
2855 : }
2856 : // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
2857 : // require downloading anything during initial import.
2858 : let (partitioning, _lsn) = self
2859 : .repartition(
2860 : self.initdb_lsn,
2861 : self.get_compaction_target_size(),
2862 : EnumSet::empty(),
2863 : ctx,
2864 : )
2865 : .await?;
2866 :
2867 : if self.cancel.is_cancelled() {
2868 : return Err(FlushLayerError::Cancelled);
2869 : }
2870 :
2871 : // For image layers, we add them immediately into the layer map.
2872 : (
2873 : self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
2874 : .await?,
2875 : None,
2876 : )
2877 : } else {
2878 : #[cfg(test)]
2879 : match &mut *self.flush_loop_state.lock().unwrap() {
2880 : FlushLoopState::NotStarted | FlushLoopState::Exited => {
2881 : panic!("flush loop not running")
2882 : }
2883 : FlushLoopState::Running {
2884 : expect_initdb_optimization,
2885 : ..
2886 : } => {
2887 : assert!(!*expect_initdb_optimization, "expected initdb optimization");
2888 : }
2889 : }
2890 : // Normal case, write out a L0 delta layer file.
2891 : // `create_delta_layer` will not modify the layer map.
2892 : // We will remove frozen layer and add delta layer in one atomic operation later.
2893 : let layer = self.create_delta_layer(&frozen_layer, ctx).await?;
2894 : (
2895 : // FIXME: even though we have a single image and single delta layer assumption
2896 : // we push them to vec
2897 : vec![layer.clone()],
2898 : Some(layer),
2899 : )
2900 : };
2901 :
2902 5236 : pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
2903 :
2904 : if self.cancel.is_cancelled() {
2905 : return Err(FlushLayerError::Cancelled);
2906 : }
2907 :
2908 : let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
2909 : let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
2910 :
2911 : // The new on-disk layers are now in the layer map. We can remove the
2912 : // in-memory layer from the map now. The flushed layer is stored in
2913 : // the mapping in `create_delta_layer`.
2914 : let metadata = {
2915 : let mut guard = self.layers.write().await;
2916 :
2917 : if self.cancel.is_cancelled() {
2918 : return Err(FlushLayerError::Cancelled);
2919 : }
2920 :
2921 : guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
2922 :
2923 : if disk_consistent_lsn != old_disk_consistent_lsn {
2924 : assert!(disk_consistent_lsn > old_disk_consistent_lsn);
2925 : self.disk_consistent_lsn.store(disk_consistent_lsn);
2926 :
2927 : // Schedule remote uploads that will reflect our new disk_consistent_lsn
2928 : Some(self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?)
2929 : } else {
2930 : None
2931 : }
2932 : // release lock on 'layers'
2933 : };
2934 :
2935 : // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
2936 : // a compaction can delete the file and then it won't be available for uploads any more.
2937 : // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
2938 : // race situation.
2939 : // See https://github.com/neondatabase/neon/issues/4526
2940 5235 : pausable_failpoint!("flush-frozen-pausable");
2941 :
2942 : // This failpoint is used by another test case `test_pageserver_recovery`.
2943 0 : fail_point!("flush-frozen-exit");
2944 :
2945 : // Update the metadata file, with new 'disk_consistent_lsn'
2946 : //
2947 : // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
2948 : // *all* the layers, to avoid fsyncing the file multiple times.
2949 :
2950 : // If we updated our disk_consistent_lsn, persist the updated metadata to local disk.
2951 : if let Some(metadata) = metadata {
2952 : save_metadata(
2953 : self.conf,
2954 : &self.tenant_shard_id,
2955 : &self.timeline_id,
2956 : &metadata,
2957 : )
2958 : .await
2959 : .context("save_metadata")?;
2960 : }
2961 : Ok(())
2962 : }
2963 :
2964 : /// Update metadata file
2965 5254 : fn schedule_uploads(
2966 5254 : &self,
2967 5254 : disk_consistent_lsn: Lsn,
2968 5254 : layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
2969 5254 : ) -> anyhow::Result<TimelineMetadata> {
2970 5254 : // We can only save a valid 'prev_record_lsn' value on disk if we
2971 5254 : // flushed *all* in-memory changes to disk. We only track
2972 5254 : // 'prev_record_lsn' in memory for the latest processed record, so we
2973 5254 : // don't remember what the correct value that corresponds to some old
2974 5254 : // LSN is. But if we flush everything, then the value corresponding
2975 5254 : // current 'last_record_lsn' is correct and we can store it on disk.
2976 5254 : let RecordLsn {
2977 5254 : last: last_record_lsn,
2978 5254 : prev: prev_record_lsn,
2979 5254 : } = self.last_record_lsn.load();
2980 5254 : let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
2981 1616 : Some(prev_record_lsn)
2982 : } else {
2983 3638 : None
2984 : };
2985 :
2986 5254 : let ancestor_timeline_id = self
2987 5254 : .ancestor_timeline
2988 5254 : .as_ref()
2989 5254 : .map(|ancestor| ancestor.timeline_id);
2990 5254 :
2991 5254 : let metadata = TimelineMetadata::new(
2992 5254 : disk_consistent_lsn,
2993 5254 : ondisk_prev_record_lsn,
2994 5254 : ancestor_timeline_id,
2995 5254 : self.ancestor_lsn,
2996 5254 : *self.latest_gc_cutoff_lsn.read(),
2997 5254 : self.initdb_lsn,
2998 5254 : self.pg_version,
2999 5254 : );
3000 5254 :
3001 5254 : fail_point!("checkpoint-before-saving-metadata", |x| bail!(
3002 0 : "{}",
3003 0 : x.unwrap()
3004 5254 : ));
3005 :
3006 5254 : if let Some(remote_client) = &self.remote_client {
3007 10489 : for layer in layers_to_upload {
3008 5235 : remote_client.schedule_layer_file_upload(layer)?;
3009 : }
3010 5254 : remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
3011 0 : }
3012 :
3013 5254 : Ok(metadata)
3014 5254 : }
3015 :
3016 19 : async fn update_metadata_file(
3017 19 : &self,
3018 19 : disk_consistent_lsn: Lsn,
3019 19 : layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
3020 19 : ) -> anyhow::Result<()> {
3021 19 : let metadata = self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?;
3022 :
3023 19 : save_metadata(
3024 19 : self.conf,
3025 19 : &self.tenant_shard_id,
3026 19 : &self.timeline_id,
3027 19 : &metadata,
3028 19 : )
3029 0 : .await
3030 19 : .context("save_metadata")?;
3031 :
3032 19 : Ok(())
3033 19 : }
3034 :
3035 2 : pub(crate) async fn preserve_initdb_archive(&self) -> anyhow::Result<()> {
3036 2 : if let Some(remote_client) = &self.remote_client {
3037 2 : remote_client
3038 2 : .preserve_initdb_archive(
3039 2 : &self.tenant_shard_id.tenant_id,
3040 2 : &self.timeline_id,
3041 2 : &self.cancel,
3042 2 : )
3043 2 : .await?;
3044 : } else {
3045 0 : bail!("No remote storage configured, but was asked to backup the initdb archive for {} / {}", self.tenant_shard_id.tenant_id, self.timeline_id);
3046 : }
3047 2 : Ok(())
3048 2 : }
3049 :
3050 : // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
3051 : // in layer map immediately. The caller is responsible to put it into the layer map.
3052 5157 : async fn create_delta_layer(
3053 5157 : self: &Arc<Self>,
3054 5157 : frozen_layer: &Arc<InMemoryLayer>,
3055 5157 : ctx: &RequestContext,
3056 5157 : ) -> anyhow::Result<ResidentLayer> {
3057 5157 : let span = tracing::info_span!("blocking");
3058 5157 : let new_delta: ResidentLayer = tokio::task::spawn_blocking({
3059 5157 : let self_clone = Arc::clone(self);
3060 5157 : let frozen_layer = Arc::clone(frozen_layer);
3061 5157 : let ctx = ctx.attached_child();
3062 5157 : move || {
3063 5157 : // Write it out
3064 5157 : // Keep this inside `spawn_blocking` and `Handle::current`
3065 5157 : // as long as the write path is still sync and the read impl
3066 5157 : // is still not fully async. Otherwise executor threads would
3067 5157 : // be blocked.
3068 5157 : let _g = span.entered();
3069 5157 : let new_delta =
3070 5157 : Handle::current().block_on(frozen_layer.write_to_disk(&self_clone, &ctx))?;
3071 5157 : let new_delta_path = new_delta.local_path().to_owned();
3072 5157 :
3073 5157 : // Sync it to disk.
3074 5157 : //
3075 5157 : // We must also fsync the timeline dir to ensure the directory entries for
3076 5157 : // new layer files are durable.
3077 5157 : //
3078 5157 : // NB: timeline dir must be synced _after_ the file contents are durable.
3079 5157 : // So, two separate fsyncs are required, they mustn't be batched.
3080 5157 : //
3081 5157 : // TODO: If we're running inside 'flush_frozen_layers' and there are multiple
3082 5157 : // files to flush, the fsync overhead can be reduces as follows:
3083 5157 : // 1. write them all to temporary file names
3084 5157 : // 2. fsync them
3085 5157 : // 3. rename to the final name
3086 5157 : // 4. fsync the parent directory.
3087 5157 : // Note that (1),(2),(3) today happen inside write_to_disk().
3088 5157 : //
3089 5157 : // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
3090 5157 : par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
3091 5157 : par_fsync::par_fsync(&[self_clone
3092 5157 : .conf
3093 5157 : .timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id)])
3094 5157 : .context("fsync of timeline dir")?;
3095 :
3096 5156 : anyhow::Ok(new_delta)
3097 5157 : }
3098 5157 : })
3099 5156 : .await
3100 5156 : .context("spawn_blocking")
3101 5156 : .and_then(|x| x)?;
3102 :
3103 5156 : Ok(new_delta)
3104 5156 : }
3105 :
3106 1724 : async fn repartition(
3107 1724 : &self,
3108 1724 : lsn: Lsn,
3109 1724 : partition_size: u64,
3110 1724 : flags: EnumSet<CompactFlags>,
3111 1724 : ctx: &RequestContext,
3112 1724 : ) -> anyhow::Result<(KeyPartitioning, Lsn)> {
3113 1724 : {
3114 1724 : let partitioning_guard = self.partitioning.lock().unwrap();
3115 1724 : let distance = lsn.0 - partitioning_guard.1 .0;
3116 1724 : if partitioning_guard.1 != Lsn(0)
3117 1005 : && distance <= self.repartition_threshold
3118 842 : && !flags.contains(CompactFlags::ForceRepartition)
3119 : {
3120 0 : debug!(
3121 0 : distance,
3122 0 : threshold = self.repartition_threshold,
3123 0 : "no repartitioning needed"
3124 0 : );
3125 840 : return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
3126 884 : }
3127 : }
3128 172295 : let keyspace = self.collect_keyspace(lsn, ctx).await?;
3129 881 : let partitioning = keyspace.partition(partition_size);
3130 881 :
3131 881 : let mut partitioning_guard = self.partitioning.lock().unwrap();
3132 881 : if lsn > partitioning_guard.1 {
3133 881 : *partitioning_guard = (partitioning, lsn);
3134 881 : } else {
3135 0 : warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
3136 : }
3137 881 : Ok((partitioning_guard.0.clone(), partitioning_guard.1))
3138 1723 : }
3139 :
3140 : // Is it time to create a new image layer for the given partition?
3141 36566 : async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
3142 36566 : let threshold = self.get_image_creation_threshold();
3143 :
3144 36566 : let guard = self.layers.read().await;
3145 36566 : let layers = guard.layer_map();
3146 36566 :
3147 36566 : let mut max_deltas = 0;
3148 36566 : {
3149 36566 : let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
3150 36566 : if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
3151 4218 : let img_range =
3152 4218 : partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
3153 4218 : if wanted.overlaps(&img_range) {
3154 : //
3155 : // gc_timeline only pays attention to image layers that are older than the GC cutoff,
3156 : // but create_image_layers creates image layers at last-record-lsn.
3157 : // So it's possible that gc_timeline wants a new image layer to be created for a key range,
3158 : // but the range is already covered by image layers at more recent LSNs. Before we
3159 : // create a new image layer, check if the range is already covered at more recent LSNs.
3160 0 : if !layers
3161 0 : .image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))
3162 : {
3163 0 : debug!(
3164 0 : "Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
3165 0 : img_range.start, img_range.end, cutoff_lsn, lsn
3166 0 : );
3167 0 : return true;
3168 0 : }
3169 4218 : }
3170 32348 : }
3171 : }
3172 :
3173 2258653 : for part_range in &partition.ranges {
3174 2228479 : let image_coverage = layers.image_coverage(part_range, lsn);
3175 4288563 : for (img_range, last_img) in image_coverage {
3176 2066476 : let img_lsn = if let Some(last_img) = last_img {
3177 363037 : last_img.get_lsn_range().end
3178 : } else {
3179 1703439 : Lsn(0)
3180 : };
3181 : // Let's consider an example:
3182 : //
3183 : // delta layer with LSN range 71-81
3184 : // delta layer with LSN range 81-91
3185 : // delta layer with LSN range 91-101
3186 : // image layer at LSN 100
3187 : //
3188 : // If 'lsn' is still 100, i.e. no new WAL has been processed since the last image layer,
3189 : // there's no need to create a new one. We check this case explicitly, to avoid passing
3190 : // a bogus range to count_deltas below, with start > end. It's even possible that there
3191 : // are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
3192 : // after we read last_record_lsn, which is passed here in the 'lsn' argument.
3193 2066476 : if img_lsn < lsn {
3194 2025978 : let num_deltas =
3195 2025978 : layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold));
3196 2025978 :
3197 2025978 : max_deltas = max_deltas.max(num_deltas);
3198 2025978 : if num_deltas >= threshold {
3199 0 : debug!(
3200 0 : "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
3201 0 : img_range.start, img_range.end, num_deltas, img_lsn, lsn
3202 0 : );
3203 6392 : return true;
3204 2019586 : }
3205 40498 : }
3206 : }
3207 : }
3208 :
3209 0 : debug!(
3210 0 : max_deltas,
3211 0 : "none of the partitioned ranges had >= {threshold} deltas"
3212 0 : );
3213 30174 : false
3214 36566 : }
3215 :
3216 3440 : #[tracing::instrument(skip_all, fields(%lsn, %force))]
3217 : async fn create_image_layers(
3218 : self: &Arc<Timeline>,
3219 : partitioning: &KeyPartitioning,
3220 : lsn: Lsn,
3221 : force: bool,
3222 : ctx: &RequestContext,
3223 : ) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
3224 : let timer = self.metrics.create_images_time_histo.start_timer();
3225 : let mut image_layers = Vec::new();
3226 :
3227 : // We need to avoid holes between generated image layers.
3228 : // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
3229 : // image layer with hole between them. In this case such layer can not be utilized by GC.
3230 : //
3231 : // How such hole between partitions can appear?
3232 : // if we have relation with relid=1 and size 100 and relation with relid=2 with size 200 then result of
3233 : // KeySpace::partition may contain partitions <100000000..100000099> and <200000000..200000199>.
3234 : // If there is delta layer <100000000..300000000> then it never be garbage collected because
3235 : // image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
3236 : let mut start = Key::MIN;
3237 :
3238 : for partition in partitioning.parts.iter() {
3239 : let img_range = start..partition.ranges.last().unwrap().end;
3240 : start = img_range.end;
3241 : if force || self.time_for_new_image_layer(partition, lsn).await {
3242 : let mut image_layer_writer = ImageLayerWriter::new(
3243 : self.conf,
3244 : self.timeline_id,
3245 : self.tenant_shard_id,
3246 : &img_range,
3247 : lsn,
3248 : )
3249 : .await?;
3250 :
3251 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
3252 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
3253 0 : "failpoint image-layer-writer-fail-before-finish"
3254 0 : )))
3255 0 : });
3256 :
3257 : let mut key_request_accum = KeySpaceAccum::new();
3258 : for range in &partition.ranges {
3259 : let mut key = range.start;
3260 : while key < range.end {
3261 : if self.shard_identity.is_key_disposable(&key) {
3262 0 : debug!(
3263 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
3264 0 : key,
3265 0 : self.shard_identity.get_shard_number(&key)
3266 0 : );
3267 : key = key.next();
3268 : continue;
3269 : }
3270 :
3271 : key_request_accum.add_key(key);
3272 : if key_request_accum.size() >= Timeline::MAX_GET_VECTORED_KEYS
3273 : || key.next() == range.end
3274 : {
3275 : let results = self
3276 : .get_vectored(
3277 : &key_request_accum.consume_keyspace().ranges,
3278 : lsn,
3279 : ctx,
3280 : )
3281 : .await?;
3282 :
3283 : for (img_key, img) in results {
3284 : let img = match img {
3285 : Ok(img) => img,
3286 : Err(err) => {
3287 : // If we fail to reconstruct a VM or FSM page, we can zero the
3288 : // page without losing any actual user data. That seems better
3289 : // than failing repeatedly and getting stuck.
3290 : //
3291 : // We had a bug at one point, where we truncated the FSM and VM
3292 : // in the pageserver, but the Postgres didn't know about that
3293 : // and continued to generate incremental WAL records for pages
3294 : // that didn't exist in the pageserver. Trying to replay those
3295 : // WAL records failed to find the previous image of the page.
3296 : // This special case allows us to recover from that situation.
3297 : // See https://github.com/neondatabase/neon/issues/2601.
3298 : //
3299 : // Unfortunately we cannot do this for the main fork, or for
3300 : // any metadata keys, keys, as that would lead to actual data
3301 : // loss.
3302 : if is_rel_fsm_block_key(img_key)
3303 : || is_rel_vm_block_key(img_key)
3304 : {
3305 0 : warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
3306 : ZERO_PAGE.clone()
3307 : } else {
3308 : return Err(
3309 : CreateImageLayersError::PageReconstructError(err),
3310 : );
3311 : }
3312 : }
3313 : };
3314 :
3315 : image_layer_writer.put_image(img_key, img).await?;
3316 : }
3317 : }
3318 :
3319 : key = key.next();
3320 : }
3321 : }
3322 : let image_layer = image_layer_writer.finish(self).await?;
3323 : image_layers.push(image_layer);
3324 : }
3325 : }
3326 : // All layers that the GC wanted us to create have now been created.
3327 : //
3328 : // It's possible that another GC cycle happened while we were compacting, and added
3329 : // something new to wanted_image_layers, and we now clear that before processing it.
3330 : // That's OK, because the next GC iteration will put it back in.
3331 : *self.wanted_image_layers.lock().unwrap() = None;
3332 :
3333 : // Sync the new layer to disk before adding it to the layer map, to make sure
3334 : // we don't garbage collect something based on the new layer, before it has
3335 : // reached the disk.
3336 : //
3337 : // We must also fsync the timeline dir to ensure the directory entries for
3338 : // new layer files are durable
3339 : //
3340 : // Compaction creates multiple image layers. It would be better to create them all
3341 : // and fsync them all in parallel.
3342 : let all_paths = image_layers
3343 : .iter()
3344 6472 : .map(|layer| layer.local_path().to_owned())
3345 : .collect::<Vec<_>>();
3346 :
3347 : par_fsync::par_fsync_async(&all_paths)
3348 : .await
3349 : .context("fsync of newly created layer files")?;
3350 :
3351 : if !all_paths.is_empty() {
3352 : par_fsync::par_fsync_async(&[self
3353 : .conf
3354 : .timeline_path(&self.tenant_shard_id, &self.timeline_id)])
3355 : .await
3356 : .context("fsync of timeline dir")?;
3357 : }
3358 :
3359 : let mut guard = self.layers.write().await;
3360 :
3361 : // FIXME: we could add the images to be uploaded *before* returning from here, but right
3362 : // now they are being scheduled outside of write lock
3363 : guard.track_new_image_layers(&image_layers, &self.metrics);
3364 : drop_wlock(guard);
3365 : timer.stop_and_record();
3366 :
3367 : Ok(image_layers)
3368 : }
3369 :
3370 : /// Wait until the background initial logical size calculation is complete, or
3371 : /// this Timeline is shut down. Calling this function will cause the initial
3372 : /// logical size calculation to skip waiting for the background jobs barrier.
3373 256 : pub(crate) async fn await_initial_logical_size(self: Arc<Self>) {
3374 256 : if let Some(await_bg_cancel) = self
3375 256 : .current_logical_size
3376 256 : .cancel_wait_for_background_loop_concurrency_limit_semaphore
3377 256 : .get()
3378 243 : {
3379 243 : await_bg_cancel.cancel();
3380 243 : } else {
3381 : // We should not wait if we were not able to explicitly instruct
3382 : // the logical size cancellation to skip the concurrency limit semaphore.
3383 : // TODO: this is an unexpected case. We should restructure so that it
3384 : // can't happen.
3385 13 : tracing::info!(
3386 13 : "await_initial_logical_size: can't get semaphore cancel token, skipping"
3387 13 : );
3388 : }
3389 :
3390 256 : tokio::select!(
3391 495 : _ = self.current_logical_size.initialized.acquire() => {},
3392 495 : _ = self.cancel.cancelled() => {}
3393 495 : )
3394 244 : }
3395 : }
3396 :
3397 1344 : #[derive(Default)]
3398 : struct CompactLevel0Phase1Result {
3399 : new_layers: Vec<ResidentLayer>,
3400 : deltas_to_compact: Vec<Layer>,
3401 : }
3402 :
3403 : /// Top-level failure to compact.
3404 0 : #[derive(Debug, thiserror::Error)]
3405 : pub(crate) enum CompactionError {
3406 : #[error("The timeline or pageserver is shutting down")]
3407 : ShuttingDown,
3408 : /// Compaction cannot be done right now; page reconstruction and so on.
3409 : #[error(transparent)]
3410 : Other(#[from] anyhow::Error),
3411 : }
3412 :
3413 : #[serde_as]
3414 4116 : #[derive(serde::Serialize)]
3415 : struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
3416 :
3417 11487 : #[derive(Default)]
3418 : enum DurationRecorder {
3419 : #[default]
3420 : NotStarted,
3421 : Recorded(RecordedDuration, tokio::time::Instant),
3422 : }
3423 :
3424 : impl DurationRecorder {
3425 3115 : fn till_now(&self) -> DurationRecorder {
3426 3115 : match self {
3427 : DurationRecorder::NotStarted => {
3428 0 : panic!("must only call on recorded measurements")
3429 : }
3430 3115 : DurationRecorder::Recorded(_, ended) => {
3431 3115 : let now = tokio::time::Instant::now();
3432 3115 : DurationRecorder::Recorded(RecordedDuration(now - *ended), now)
3433 3115 : }
3434 3115 : }
3435 3115 : }
3436 2058 : fn into_recorded(self) -> Option<RecordedDuration> {
3437 2058 : match self {
3438 0 : DurationRecorder::NotStarted => None,
3439 2058 : DurationRecorder::Recorded(recorded, _) => Some(recorded),
3440 : }
3441 2058 : }
3442 : }
3443 :
3444 1641 : #[derive(Default)]
3445 : struct CompactLevel0Phase1StatsBuilder {
3446 : version: Option<u64>,
3447 : tenant_id: Option<TenantShardId>,
3448 : timeline_id: Option<TimelineId>,
3449 : read_lock_acquisition_micros: DurationRecorder,
3450 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
3451 : read_lock_held_key_sort_micros: DurationRecorder,
3452 : read_lock_held_prerequisites_micros: DurationRecorder,
3453 : read_lock_held_compute_holes_micros: DurationRecorder,
3454 : read_lock_drop_micros: DurationRecorder,
3455 : write_layer_files_micros: DurationRecorder,
3456 : level0_deltas_count: Option<usize>,
3457 : new_deltas_count: Option<usize>,
3458 : new_deltas_size: Option<u64>,
3459 : }
3460 :
3461 294 : #[derive(serde::Serialize)]
3462 : struct CompactLevel0Phase1Stats {
3463 : version: u64,
3464 : tenant_id: TenantShardId,
3465 : timeline_id: TimelineId,
3466 : read_lock_acquisition_micros: RecordedDuration,
3467 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
3468 : read_lock_held_key_sort_micros: RecordedDuration,
3469 : read_lock_held_prerequisites_micros: RecordedDuration,
3470 : read_lock_held_compute_holes_micros: RecordedDuration,
3471 : read_lock_drop_micros: RecordedDuration,
3472 : write_layer_files_micros: RecordedDuration,
3473 : level0_deltas_count: usize,
3474 : new_deltas_count: usize,
3475 : new_deltas_size: u64,
3476 : }
3477 :
3478 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
3479 : type Error = anyhow::Error;
3480 :
3481 294 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
3482 294 : Ok(Self {
3483 294 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
3484 294 : tenant_id: value
3485 294 : .tenant_id
3486 294 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
3487 294 : timeline_id: value
3488 294 : .timeline_id
3489 294 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
3490 294 : read_lock_acquisition_micros: value
3491 294 : .read_lock_acquisition_micros
3492 294 : .into_recorded()
3493 294 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
3494 294 : read_lock_held_spawn_blocking_startup_micros: value
3495 294 : .read_lock_held_spawn_blocking_startup_micros
3496 294 : .into_recorded()
3497 294 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
3498 294 : read_lock_held_key_sort_micros: value
3499 294 : .read_lock_held_key_sort_micros
3500 294 : .into_recorded()
3501 294 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
3502 294 : read_lock_held_prerequisites_micros: value
3503 294 : .read_lock_held_prerequisites_micros
3504 294 : .into_recorded()
3505 294 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
3506 294 : read_lock_held_compute_holes_micros: value
3507 294 : .read_lock_held_compute_holes_micros
3508 294 : .into_recorded()
3509 294 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
3510 294 : read_lock_drop_micros: value
3511 294 : .read_lock_drop_micros
3512 294 : .into_recorded()
3513 294 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
3514 294 : write_layer_files_micros: value
3515 294 : .write_layer_files_micros
3516 294 : .into_recorded()
3517 294 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
3518 294 : level0_deltas_count: value
3519 294 : .level0_deltas_count
3520 294 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
3521 294 : new_deltas_count: value
3522 294 : .new_deltas_count
3523 294 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
3524 294 : new_deltas_size: value
3525 294 : .new_deltas_size
3526 294 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
3527 : })
3528 294 : }
3529 : }
3530 :
3531 : impl Timeline {
3532 : /// Level0 files first phase of compaction, explained in the [`Self::compact`] comment.
3533 1641 : async fn compact_level0_phase1(
3534 1641 : self: &Arc<Self>,
3535 1641 : guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
3536 1641 : mut stats: CompactLevel0Phase1StatsBuilder,
3537 1641 : target_file_size: u64,
3538 1641 : ctx: &RequestContext,
3539 1641 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
3540 1641 : stats.read_lock_held_spawn_blocking_startup_micros =
3541 1641 : stats.read_lock_acquisition_micros.till_now(); // set by caller
3542 1641 : let layers = guard.layer_map();
3543 1641 : let level0_deltas = layers.get_level0_deltas()?;
3544 1641 : let mut level0_deltas = level0_deltas
3545 1641 : .into_iter()
3546 7543 : .map(|x| guard.get_from_desc(&x))
3547 1641 : .collect_vec();
3548 1641 : stats.level0_deltas_count = Some(level0_deltas.len());
3549 1641 : // Only compact if enough layers have accumulated.
3550 1641 : let threshold = self.get_compaction_threshold();
3551 1641 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
3552 0 : debug!(
3553 0 : level0_deltas = level0_deltas.len(),
3554 0 : threshold, "too few deltas to compact"
3555 0 : );
3556 1344 : return Ok(CompactLevel0Phase1Result::default());
3557 297 : }
3558 297 :
3559 297 : // This failpoint is used together with `test_duplicate_layers` integration test.
3560 297 : // It returns the compaction result exactly the same layers as input to compaction.
3561 297 : // We want to ensure that this will not cause any problem when updating the layer map
3562 297 : // after the compaction is finished.
3563 297 : //
3564 297 : // Currently, there are two rare edge cases that will cause duplicated layers being
3565 297 : // inserted.
3566 297 : // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
3567 297 : // is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
3568 297 : // map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
3569 297 : // point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
3570 297 : // and this causes an overwrite. This is acceptable because the content is the same, and we should do a
3571 297 : // layer replace instead of the normal remove / upload process.
3572 297 : // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
3573 297 : // size length. Compaction will likely create the same set of n files afterwards.
3574 297 : //
3575 297 : // This failpoint is a superset of both of the cases.
3576 297 : if cfg!(feature = "testing") {
3577 297 : let active = (|| {
3578 297 : ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
3579 297 : false
3580 297 : })();
3581 297 :
3582 297 : if active {
3583 2 : let mut new_layers = Vec::with_capacity(level0_deltas.len());
3584 30 : for delta in &level0_deltas {
3585 : // we are just faking these layers as being produced again for this failpoint
3586 28 : new_layers.push(
3587 28 : delta
3588 28 : .download_and_keep_resident()
3589 0 : .await
3590 28 : .context("download layer for failpoint")?,
3591 : );
3592 : }
3593 2 : tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
3594 2 : return Ok(CompactLevel0Phase1Result {
3595 2 : new_layers,
3596 2 : deltas_to_compact: level0_deltas,
3597 2 : });
3598 295 : }
3599 0 : }
3600 :
3601 : // Gather the files to compact in this iteration.
3602 : //
3603 : // Start with the oldest Level 0 delta file, and collect any other
3604 : // level 0 files that form a contiguous sequence, such that the end
3605 : // LSN of previous file matches the start LSN of the next file.
3606 : //
3607 : // Note that if the files don't form such a sequence, we might
3608 : // "compact" just a single file. That's a bit pointless, but it allows
3609 : // us to get rid of the level 0 file, and compact the other files on
3610 : // the next iteration. This could probably made smarter, but such
3611 : // "gaps" in the sequence of level 0 files should only happen in case
3612 : // of a crash, partial download from cloud storage, or something like
3613 : // that, so it's not a big deal in practice.
3614 8014 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
3615 295 : let mut level0_deltas_iter = level0_deltas.iter();
3616 295 :
3617 295 : let first_level0_delta = level0_deltas_iter.next().unwrap();
3618 295 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
3619 295 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
3620 295 :
3621 295 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
3622 4148 : for l in level0_deltas_iter {
3623 3853 : let lsn_range = &l.layer_desc().lsn_range;
3624 3853 :
3625 3853 : if lsn_range.start != prev_lsn_end {
3626 0 : break;
3627 3853 : }
3628 3853 : deltas_to_compact.push(l.download_and_keep_resident().await?);
3629 3853 : prev_lsn_end = lsn_range.end;
3630 : }
3631 295 : let lsn_range = Range {
3632 295 : start: deltas_to_compact
3633 295 : .first()
3634 295 : .unwrap()
3635 295 : .layer_desc()
3636 295 : .lsn_range
3637 295 : .start,
3638 295 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
3639 295 : };
3640 :
3641 295 : info!(
3642 295 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
3643 295 : lsn_range.start,
3644 295 : lsn_range.end,
3645 295 : deltas_to_compact.len(),
3646 295 : level0_deltas.len()
3647 295 : );
3648 :
3649 4148 : for l in deltas_to_compact.iter() {
3650 4148 : info!("compact includes {l}");
3651 : }
3652 :
3653 : // We don't need the original list of layers anymore. Drop it so that
3654 : // we don't accidentally use it later in the function.
3655 295 : drop(level0_deltas);
3656 295 :
3657 295 : stats.read_lock_held_prerequisites_micros = stats
3658 295 : .read_lock_held_spawn_blocking_startup_micros
3659 295 : .till_now();
3660 295 :
3661 295 : // Determine N largest holes where N is number of compacted layers.
3662 295 : let max_holes = deltas_to_compact.len();
3663 295 : let last_record_lsn = self.get_last_record_lsn();
3664 295 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
3665 295 : let min_hole_coverage_size = 3; // TODO: something more flexible?
3666 295 :
3667 295 : // min-heap (reserve space for one more element added before eviction)
3668 295 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
3669 295 : let mut prev: Option<Key> = None;
3670 295 :
3671 295 : let mut all_keys = Vec::new();
3672 :
3673 4148 : for l in deltas_to_compact.iter() {
3674 4148 : all_keys.extend(l.load_keys(ctx).await?);
3675 : }
3676 :
3677 : // FIXME: should spawn_blocking the rest of this function
3678 :
3679 : // The current stdlib sorting implementation is designed in a way where it is
3680 : // particularly fast where the slice is made up of sorted sub-ranges.
3681 155368940 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
3682 295 :
3683 295 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
3684 :
3685 17304139 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
3686 17304139 : if let Some(prev_key) = prev {
3687 : // just first fast filter
3688 17303844 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
3689 201481 : let key_range = prev_key..next_key;
3690 201481 : // Measuring hole by just subtraction of i128 representation of key range boundaries
3691 201481 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
3692 201481 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
3693 201481 : // That is why it is better to measure size of hole as number of covering image layers.
3694 201481 : let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
3695 201481 : if coverage_size >= min_hole_coverage_size {
3696 213 : heap.push(Hole {
3697 213 : key_range,
3698 213 : coverage_size,
3699 213 : });
3700 213 : if heap.len() > max_holes {
3701 128 : heap.pop(); // remove smallest hole
3702 128 : }
3703 201268 : }
3704 17102363 : }
3705 295 : }
3706 17304139 : prev = Some(next_key.next());
3707 : }
3708 295 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
3709 295 : drop_rlock(guard);
3710 295 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
3711 295 : let mut holes = heap.into_vec();
3712 295 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
3713 295 : let mut next_hole = 0; // index of next hole in holes vector
3714 295 :
3715 295 : // This iterator walks through all key-value pairs from all the layers
3716 295 : // we're compacting, in key, LSN order.
3717 295 : let all_values_iter = all_keys.iter();
3718 295 :
3719 295 : // This iterator walks through all keys and is needed to calculate size used by each key
3720 295 : let mut all_keys_iter = all_keys
3721 295 : .iter()
3722 17292588 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
3723 17292293 : .coalesce(|mut prev, cur| {
3724 17292293 : // Coalesce keys that belong to the same key pair.
3725 17292293 : // This ensures that compaction doesn't put them
3726 17292293 : // into different layer files.
3727 17292293 : // Still limit this by the target file size,
3728 17292293 : // so that we keep the size of the files in
3729 17292293 : // check.
3730 17292293 : if prev.0 == cur.0 && prev.2 < target_file_size {
3731 15012062 : prev.2 += cur.2;
3732 15012062 : Ok(prev)
3733 : } else {
3734 2280231 : Err((prev, cur))
3735 : }
3736 17292293 : });
3737 295 :
3738 295 : // Merge the contents of all the input delta layers into a new set
3739 295 : // of delta layers, based on the current partitioning.
3740 295 : //
3741 295 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
3742 295 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
3743 295 : // would be too large. In that case, we also split on the LSN dimension.
3744 295 : //
3745 295 : // LSN
3746 295 : // ^
3747 295 : // |
3748 295 : // | +-----------+ +--+--+--+--+
3749 295 : // | | | | | | | |
3750 295 : // | +-----------+ | | | | |
3751 295 : // | | | | | | | |
3752 295 : // | +-----------+ ==> | | | | |
3753 295 : // | | | | | | | |
3754 295 : // | +-----------+ | | | | |
3755 295 : // | | | | | | | |
3756 295 : // | +-----------+ +--+--+--+--+
3757 295 : // |
3758 295 : // +--------------> key
3759 295 : //
3760 295 : //
3761 295 : // If one key (X) has a lot of page versions:
3762 295 : //
3763 295 : // LSN
3764 295 : // ^
3765 295 : // | (X)
3766 295 : // | +-----------+ +--+--+--+--+
3767 295 : // | | | | | | | |
3768 295 : // | +-----------+ | | +--+ |
3769 295 : // | | | | | | | |
3770 295 : // | +-----------+ ==> | | | | |
3771 295 : // | | | | | +--+ |
3772 295 : // | +-----------+ | | | | |
3773 295 : // | | | | | | | |
3774 295 : // | +-----------+ +--+--+--+--+
3775 295 : // |
3776 295 : // +--------------> key
3777 295 : // TODO: this actually divides the layers into fixed-size chunks, not
3778 295 : // based on the partitioning.
3779 295 : //
3780 295 : // TODO: we should also opportunistically materialize and
3781 295 : // garbage collect what we can.
3782 295 : let mut new_layers = Vec::new();
3783 295 : let mut prev_key: Option<Key> = None;
3784 295 : let mut writer: Option<DeltaLayerWriter> = None;
3785 295 : let mut key_values_total_size = 0u64;
3786 295 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
3787 295 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
3788 :
3789 : for &DeltaEntry {
3790 17292587 : key, lsn, ref val, ..
3791 17292881 : } in all_values_iter
3792 : {
3793 17292587 : let value = val.load(ctx).await?;
3794 17292586 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
3795 17292586 : // We need to check key boundaries once we reach next key or end of layer with the same key
3796 17292586 : if !same_key || lsn == dup_end_lsn {
3797 2280524 : let mut next_key_size = 0u64;
3798 2280524 : let is_dup_layer = dup_end_lsn.is_valid();
3799 2280524 : dup_start_lsn = Lsn::INVALID;
3800 2280524 : if !same_key {
3801 2280486 : dup_end_lsn = Lsn::INVALID;
3802 2280486 : }
3803 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
3804 2280525 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
3805 2280525 : next_key_size = next_size;
3806 2280525 : if key != next_key {
3807 2280192 : if dup_end_lsn.is_valid() {
3808 12 : // We are writting segment with duplicates:
3809 12 : // place all remaining values of this key in separate segment
3810 12 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
3811 12 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
3812 2280180 : }
3813 2280192 : break;
3814 333 : }
3815 333 : key_values_total_size += next_size;
3816 333 : // Check if it is time to split segment: if total keys size is larger than target file size.
3817 333 : // We need to avoid generation of empty segments if next_size > target_file_size.
3818 333 : if key_values_total_size > target_file_size && lsn != next_lsn {
3819 : // Split key between multiple layers: such layer can contain only single key
3820 38 : dup_start_lsn = if dup_end_lsn.is_valid() {
3821 25 : dup_end_lsn // new segment with duplicates starts where old one stops
3822 : } else {
3823 13 : lsn // start with the first LSN for this key
3824 : };
3825 38 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
3826 38 : break;
3827 295 : }
3828 : }
3829 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
3830 2280524 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
3831 1 : dup_start_lsn = dup_end_lsn;
3832 1 : dup_end_lsn = lsn_range.end;
3833 2280523 : }
3834 2280524 : if writer.is_some() {
3835 2280229 : let written_size = writer.as_mut().unwrap().size();
3836 2280229 : let contains_hole =
3837 2280229 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
3838 : // check if key cause layer overflow or contains hole...
3839 2280229 : if is_dup_layer
3840 2280179 : || dup_end_lsn.is_valid()
3841 2280168 : || written_size + key_values_total_size > target_file_size
3842 2269869 : || contains_hole
3843 : {
3844 : // ... if so, flush previous layer and prepare to write new one
3845 10443 : new_layers.push(
3846 10443 : writer
3847 10443 : .take()
3848 10443 : .unwrap()
3849 10443 : .finish(prev_key.unwrap().next(), self)
3850 1322 : .await?,
3851 : );
3852 10443 : writer = None;
3853 10443 :
3854 10443 : if contains_hole {
3855 85 : // skip hole
3856 85 : next_hole += 1;
3857 10358 : }
3858 2269786 : }
3859 295 : }
3860 : // Remember size of key value because at next iteration we will access next item
3861 2280524 : key_values_total_size = next_key_size;
3862 15012062 : }
3863 17292586 : if writer.is_none() {
3864 10738 : // Create writer if not initiaized yet
3865 10738 : writer = Some(
3866 : DeltaLayerWriter::new(
3867 10738 : self.conf,
3868 10738 : self.timeline_id,
3869 10738 : self.tenant_shard_id,
3870 10738 : key,
3871 10738 : if dup_end_lsn.is_valid() {
3872 : // this is a layer containing slice of values of the same key
3873 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
3874 51 : dup_start_lsn..dup_end_lsn
3875 : } else {
3876 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
3877 10687 : lsn_range.clone()
3878 : },
3879 : )
3880 10 : .await?,
3881 : );
3882 17281848 : }
3883 :
3884 17292586 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
3885 0 : Err(CompactionError::Other(anyhow::anyhow!(
3886 0 : "failpoint delta-layer-writer-fail-before-finish"
3887 0 : )))
3888 17292586 : });
3889 :
3890 17292586 : if !self.shard_identity.is_key_disposable(&key) {
3891 17292586 : writer.as_mut().unwrap().put_value(key, lsn, value).await?;
3892 : } else {
3893 0 : debug!(
3894 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
3895 0 : key,
3896 0 : self.shard_identity.get_shard_number(&key)
3897 0 : );
3898 : }
3899 :
3900 17292586 : if !new_layers.is_empty() {
3901 14046784 : fail_point!("after-timeline-compacted-first-L1");
3902 14046784 : }
3903 :
3904 17292586 : prev_key = Some(key);
3905 : }
3906 294 : if let Some(writer) = writer {
3907 294 : new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
3908 0 : }
3909 :
3910 : // Sync layers
3911 294 : if !new_layers.is_empty() {
3912 : // Print a warning if the created layer is larger than double the target size
3913 : // Add two pages for potential overhead. This should in theory be already
3914 : // accounted for in the target calculation, but for very small targets,
3915 : // we still might easily hit the limit otherwise.
3916 294 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
3917 10736 : for layer in new_layers.iter() {
3918 10736 : if layer.layer_desc().file_size > warn_limit {
3919 0 : warn!(
3920 0 : %layer,
3921 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
3922 0 : );
3923 10736 : }
3924 : }
3925 :
3926 : // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
3927 294 : let layer_paths: Vec<Utf8PathBuf> = new_layers
3928 294 : .iter()
3929 10736 : .map(|l| l.local_path().to_owned())
3930 294 : .collect();
3931 294 :
3932 294 : // Fsync all the layer files and directory using multiple threads to
3933 294 : // minimize latency.
3934 294 : par_fsync::par_fsync_async(&layer_paths)
3935 520 : .await
3936 294 : .context("fsync all new layers")?;
3937 :
3938 294 : let timeline_dir = self
3939 294 : .conf
3940 294 : .timeline_path(&self.tenant_shard_id, &self.timeline_id);
3941 294 :
3942 294 : par_fsync::par_fsync_async(&[timeline_dir])
3943 324 : .await
3944 294 : .context("fsync of timeline dir")?;
3945 0 : }
3946 :
3947 294 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
3948 294 : stats.new_deltas_count = Some(new_layers.len());
3949 10736 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
3950 294 :
3951 294 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
3952 294 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
3953 : {
3954 294 : Ok(stats_json) => {
3955 294 : info!(
3956 294 : stats_json = stats_json.as_str(),
3957 294 : "compact_level0_phase1 stats available"
3958 294 : )
3959 : }
3960 0 : Err(e) => {
3961 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
3962 : }
3963 : }
3964 :
3965 294 : Ok(CompactLevel0Phase1Result {
3966 294 : new_layers,
3967 294 : deltas_to_compact: deltas_to_compact
3968 294 : .into_iter()
3969 4133 : .map(|x| x.drop_eviction_guard())
3970 294 : .collect::<Vec<_>>(),
3971 294 : })
3972 1640 : }
3973 :
3974 : ///
3975 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
3976 : /// as Level 1 files.
3977 : ///
3978 1641 : async fn compact_level0(
3979 1641 : self: &Arc<Self>,
3980 1641 : target_file_size: u64,
3981 1641 : ctx: &RequestContext,
3982 1641 : ) -> Result<(), CompactionError> {
3983 : let CompactLevel0Phase1Result {
3984 1640 : new_layers,
3985 1640 : deltas_to_compact,
3986 : } = {
3987 1641 : let phase1_span = info_span!("compact_level0_phase1");
3988 1641 : let ctx = ctx.attached_child();
3989 1641 : let mut stats = CompactLevel0Phase1StatsBuilder {
3990 1641 : version: Some(2),
3991 1641 : tenant_id: Some(self.tenant_shard_id),
3992 1641 : timeline_id: Some(self.timeline_id),
3993 1641 : ..Default::default()
3994 1641 : };
3995 1641 :
3996 1641 : let begin = tokio::time::Instant::now();
3997 1641 : let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
3998 1641 : let now = tokio::time::Instant::now();
3999 1641 : stats.read_lock_acquisition_micros =
4000 1641 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
4001 1641 : self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
4002 1641 : .instrument(phase1_span)
4003 326194 : .await?
4004 : };
4005 :
4006 1640 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
4007 : // nothing to do
4008 1344 : return Ok(());
4009 296 : }
4010 :
4011 296 : let mut guard = self.layers.write().await;
4012 :
4013 296 : let mut duplicated_layers = HashSet::new();
4014 296 :
4015 296 : let mut insert_layers = Vec::with_capacity(new_layers.len());
4016 :
4017 11060 : for l in &new_layers {
4018 10764 : if guard.contains(l.as_ref()) {
4019 : // expected in tests
4020 28 : tracing::error!(layer=%l, "duplicated L1 layer");
4021 :
4022 : // good ways to cause a duplicate: we repeatedly error after taking the writelock
4023 : // `guard` on self.layers. as of writing this, there are no error returns except
4024 : // for compact_level0_phase1 creating an L0, which does not happen in practice
4025 : // because we have not implemented L0 => L0 compaction.
4026 28 : duplicated_layers.insert(l.layer_desc().key());
4027 10736 : } else if LayerMap::is_l0(l.layer_desc()) {
4028 0 : return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
4029 10736 : } else {
4030 10736 : insert_layers.push(l.clone());
4031 10736 : }
4032 : }
4033 :
4034 296 : let remove_layers = {
4035 296 : let mut deltas_to_compact = deltas_to_compact;
4036 296 : // only remove those inputs which were not outputs
4037 4161 : deltas_to_compact.retain(|l| !duplicated_layers.contains(&l.layer_desc().key()));
4038 296 : deltas_to_compact
4039 296 : };
4040 296 :
4041 296 : // deletion will happen later, the layer file manager calls garbage_collect_on_drop
4042 296 : guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
4043 :
4044 296 : if let Some(remote_client) = self.remote_client.as_ref() {
4045 296 : remote_client.schedule_compaction_update(&remove_layers, &new_layers)?;
4046 0 : }
4047 :
4048 296 : drop_wlock(guard);
4049 296 :
4050 296 : Ok(())
4051 1640 : }
4052 :
4053 : /// Update information about which layer files need to be retained on
4054 : /// garbage collection. This is separate from actually performing the GC,
4055 : /// and is updated more frequently, so that compaction can remove obsolete
4056 : /// page versions more aggressively.
4057 : ///
4058 : /// TODO: that's wishful thinking, compaction doesn't actually do that
4059 : /// currently.
4060 : ///
4061 : /// The caller specifies how much history is needed with the 3 arguments:
4062 : ///
4063 : /// retain_lsns: keep a version of each page at these LSNs
4064 : /// cutoff_horizon: also keep everything newer than this LSN
4065 : /// pitr: the time duration required to keep data for PITR
4066 : ///
4067 : /// The 'retain_lsns' list is currently used to prevent removing files that
4068 : /// are needed by child timelines. In the future, the user might be able to
4069 : /// name additional points in time to retain. The caller is responsible for
4070 : /// collecting that information.
4071 : ///
4072 : /// The 'cutoff_horizon' point is used to retain recent versions that might still be
4073 : /// needed by read-only nodes. (As of this writing, the caller just passes
4074 : /// the latest LSN subtracted by a constant, and doesn't do anything smart
4075 : /// to figure out what read-only nodes might actually need.)
4076 : ///
4077 : /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
4078 : /// whether a record is needed for PITR.
4079 : ///
4080 : /// NOTE: This function holds a short-lived lock to protect the 'gc_info'
4081 : /// field, so that the three values passed as argument are stored
4082 : /// atomically. But the caller is responsible for ensuring that no new
4083 : /// branches are created that would need to be included in 'retain_lsns',
4084 : /// for example. The caller should hold `Tenant::gc_cs` lock to ensure
4085 : /// that.
4086 : ///
4087 1734 : #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
4088 : pub(super) async fn update_gc_info(
4089 : &self,
4090 : retain_lsns: Vec<Lsn>,
4091 : cutoff_horizon: Lsn,
4092 : pitr: Duration,
4093 : cancel: &CancellationToken,
4094 : ctx: &RequestContext,
4095 : ) -> anyhow::Result<()> {
4096 : // First, calculate pitr_cutoff_timestamp and then convert it to LSN.
4097 : //
4098 : // Some unit tests depend on garbage-collection working even when
4099 : // CLOG data is missing, so that find_lsn_for_timestamp() doesn't
4100 : // work, so avoid calling it altogether if time-based retention is not
4101 : // configured. It would be pointless anyway.
4102 : let pitr_cutoff = if pitr != Duration::ZERO {
4103 : let now = SystemTime::now();
4104 : if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
4105 : let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
4106 :
4107 : match self
4108 : .find_lsn_for_timestamp(pitr_timestamp, cancel, ctx)
4109 : .await?
4110 : {
4111 : LsnForTimestamp::Present(lsn) => lsn,
4112 : LsnForTimestamp::Future(lsn) => {
4113 : // The timestamp is in the future. That sounds impossible,
4114 : // but what it really means is that there hasn't been
4115 : // any commits since the cutoff timestamp.
4116 0 : debug!("future({})", lsn);
4117 : cutoff_horizon
4118 : }
4119 : LsnForTimestamp::Past(lsn) => {
4120 0 : debug!("past({})", lsn);
4121 : // conservative, safe default is to remove nothing, when we
4122 : // have no commit timestamp data available
4123 : *self.get_latest_gc_cutoff_lsn()
4124 : }
4125 : LsnForTimestamp::NoData(lsn) => {
4126 0 : debug!("nodata({})", lsn);
4127 : // conservative, safe default is to remove nothing, when we
4128 : // have no commit timestamp data available
4129 : *self.get_latest_gc_cutoff_lsn()
4130 : }
4131 : }
4132 : } else {
4133 : // If we don't have enough data to convert to LSN,
4134 : // play safe and don't remove any layers.
4135 : *self.get_latest_gc_cutoff_lsn()
4136 : }
4137 : } else {
4138 : // No time-based retention was configured. Set time-based cutoff to
4139 : // same as LSN based.
4140 : cutoff_horizon
4141 : };
4142 :
4143 : // Grab the lock and update the values
4144 : *self.gc_info.write().unwrap() = GcInfo {
4145 : retain_lsns,
4146 : horizon_cutoff: cutoff_horizon,
4147 : pitr_cutoff,
4148 : };
4149 :
4150 : Ok(())
4151 : }
4152 :
4153 : /// Garbage collect layer files on a timeline that are no longer needed.
4154 : ///
4155 : /// Currently, we don't make any attempt at removing unneeded page versions
4156 : /// within a layer file. We can only remove the whole file if it's fully
4157 : /// obsolete.
4158 796 : pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
4159 796 : // this is most likely the background tasks, but it might be the spawned task from
4160 796 : // immediate_gc
4161 796 : let cancel = crate::task_mgr::shutdown_token();
4162 796 : let _g = tokio::select! {
4163 795 : guard = self.gc_lock.lock() => guard,
4164 : _ = self.cancel.cancelled() => return Ok(GcResult::default()),
4165 : _ = cancel.cancelled() => return Ok(GcResult::default()),
4166 : };
4167 795 : let timer = self.metrics.garbage_collect_histo.start_timer();
4168 :
4169 0 : fail_point!("before-timeline-gc");
4170 :
4171 : // Is the timeline being deleted?
4172 795 : if self.is_stopping() {
4173 0 : anyhow::bail!("timeline is Stopping");
4174 795 : }
4175 795 :
4176 795 : let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
4177 795 : let gc_info = self.gc_info.read().unwrap();
4178 795 :
4179 795 : let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
4180 795 : let pitr_cutoff = gc_info.pitr_cutoff;
4181 795 : let retain_lsns = gc_info.retain_lsns.clone();
4182 795 : (horizon_cutoff, pitr_cutoff, retain_lsns)
4183 795 : };
4184 795 :
4185 795 : let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
4186 :
4187 795 : let res = self
4188 795 : .gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
4189 795 : .instrument(
4190 795 : info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
4191 : )
4192 232 : .await?;
4193 :
4194 : // only record successes
4195 795 : timer.stop_and_record();
4196 795 :
4197 795 : Ok(res)
4198 796 : }
4199 :
4200 795 : async fn gc_timeline(
4201 795 : &self,
4202 795 : horizon_cutoff: Lsn,
4203 795 : pitr_cutoff: Lsn,
4204 795 : retain_lsns: Vec<Lsn>,
4205 795 : new_gc_cutoff: Lsn,
4206 795 : ) -> anyhow::Result<GcResult> {
4207 795 : let now = SystemTime::now();
4208 795 : let mut result: GcResult = GcResult::default();
4209 795 :
4210 795 : // Nothing to GC. Return early.
4211 795 : let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
4212 795 : if latest_gc_cutoff >= new_gc_cutoff {
4213 84 : info!(
4214 84 : "Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
4215 84 : );
4216 84 : return Ok(result);
4217 711 : }
4218 :
4219 : // We need to ensure that no one tries to read page versions or create
4220 : // branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
4221 : // for details. This will block until the old value is no longer in use.
4222 : //
4223 : // The GC cutoff should only ever move forwards.
4224 711 : let waitlist = {
4225 711 : let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
4226 711 : ensure!(
4227 711 : *write_guard <= new_gc_cutoff,
4228 0 : "Cannot move GC cutoff LSN backwards (was {}, new {})",
4229 0 : *write_guard,
4230 : new_gc_cutoff
4231 : );
4232 711 : write_guard.store_and_unlock(new_gc_cutoff)
4233 711 : };
4234 711 : waitlist.wait().await;
4235 :
4236 711 : info!("GC starting");
4237 :
4238 0 : debug!("retain_lsns: {:?}", retain_lsns);
4239 :
4240 711 : let mut layers_to_remove = Vec::new();
4241 711 : let mut wanted_image_layers = KeySpaceRandomAccum::default();
4242 :
4243 : // Scan all layers in the timeline (remote or on-disk).
4244 : //
4245 : // Garbage collect the layer if all conditions are satisfied:
4246 : // 1. it is older than cutoff LSN;
4247 : // 2. it is older than PITR interval;
4248 : // 3. it doesn't need to be retained for 'retain_lsns';
4249 : // 4. newer on-disk image layers cover the layer's whole key range
4250 : //
4251 : // TODO holding a write lock is too agressive and avoidable
4252 711 : let mut guard = self.layers.write().await;
4253 711 : let layers = guard.layer_map();
4254 17631 : 'outer: for l in layers.iter_historic_layers() {
4255 17631 : result.layers_total += 1;
4256 17631 :
4257 17631 : // 1. Is it newer than GC horizon cutoff point?
4258 17631 : if l.get_lsn_range().end > horizon_cutoff {
4259 0 : debug!(
4260 0 : "keeping {} because it's newer than horizon_cutoff {}",
4261 0 : l.filename(),
4262 0 : horizon_cutoff,
4263 0 : );
4264 2187 : result.layers_needed_by_cutoff += 1;
4265 2187 : continue 'outer;
4266 15444 : }
4267 15444 :
4268 15444 : // 2. It is newer than PiTR cutoff point?
4269 15444 : if l.get_lsn_range().end > pitr_cutoff {
4270 0 : debug!(
4271 0 : "keeping {} because it's newer than pitr_cutoff {}",
4272 0 : l.filename(),
4273 0 : pitr_cutoff,
4274 0 : );
4275 16 : result.layers_needed_by_pitr += 1;
4276 16 : continue 'outer;
4277 15428 : }
4278 :
4279 : // 3. Is it needed by a child branch?
4280 : // NOTE With that we would keep data that
4281 : // might be referenced by child branches forever.
4282 : // We can track this in child timeline GC and delete parent layers when
4283 : // they are no longer needed. This might be complicated with long inheritance chains.
4284 : //
4285 : // TODO Vec is not a great choice for `retain_lsns`
4286 15446 : for retain_lsn in &retain_lsns {
4287 : // start_lsn is inclusive
4288 464 : if &l.get_lsn_range().start <= retain_lsn {
4289 0 : debug!(
4290 0 : "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
4291 0 : l.filename(),
4292 0 : retain_lsn,
4293 0 : l.is_incremental(),
4294 0 : );
4295 446 : result.layers_needed_by_branches += 1;
4296 446 : continue 'outer;
4297 18 : }
4298 : }
4299 :
4300 : // 4. Is there a later on-disk layer for this relation?
4301 : //
4302 : // The end-LSN is exclusive, while disk_consistent_lsn is
4303 : // inclusive. For example, if disk_consistent_lsn is 100, it is
4304 : // OK for a delta layer to have end LSN 101, but if the end LSN
4305 : // is 102, then it might not have been fully flushed to disk
4306 : // before crash.
4307 : //
4308 : // For example, imagine that the following layers exist:
4309 : //
4310 : // 1000 - image (A)
4311 : // 1000-2000 - delta (B)
4312 : // 2000 - image (C)
4313 : // 2000-3000 - delta (D)
4314 : // 3000 - image (E)
4315 : //
4316 : // If GC horizon is at 2500, we can remove layers A and B, but
4317 : // we cannot remove C, even though it's older than 2500, because
4318 : // the delta layer 2000-3000 depends on it.
4319 14982 : if !layers
4320 14982 : .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
4321 : {
4322 0 : debug!("keeping {} because it is the latest layer", l.filename());
4323 : // Collect delta key ranges that need image layers to allow garbage
4324 : // collecting the layers.
4325 : // It is not so obvious whether we need to propagate information only about
4326 : // delta layers. Image layers can form "stairs" preventing old image from been deleted.
4327 : // But image layers are in any case less sparse than delta layers. Also we need some
4328 : // protection from replacing recent image layers with new one after each GC iteration.
4329 13896 : if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
4330 0 : wanted_image_layers.add_range(l.get_key_range());
4331 13896 : }
4332 13896 : result.layers_not_updated += 1;
4333 13896 : continue 'outer;
4334 1086 : }
4335 :
4336 : // We didn't find any reason to keep this file, so remove it.
4337 0 : debug!(
4338 0 : "garbage collecting {} is_dropped: xx is_incremental: {}",
4339 0 : l.filename(),
4340 0 : l.is_incremental(),
4341 0 : );
4342 1086 : layers_to_remove.push(l);
4343 : }
4344 711 : self.wanted_image_layers
4345 711 : .lock()
4346 711 : .unwrap()
4347 711 : .replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
4348 711 :
4349 711 : if !layers_to_remove.is_empty() {
4350 : // Persist the new GC cutoff value in the metadata file, before
4351 : // we actually remove anything.
4352 : //
4353 : // This does not in fact have any effect as we no longer consider local metadata unless
4354 : // running without remote storage.
4355 : //
4356 : // This unconditionally schedules also an index_part.json update, even though, we will
4357 : // be doing one a bit later with the unlinked gc'd layers.
4358 : //
4359 : // TODO: remove when implementing <https://github.com/neondatabase/neon/issues/4099>.
4360 19 : self.update_metadata_file(self.disk_consistent_lsn.load(), None)
4361 0 : .await?;
4362 :
4363 19 : let gc_layers = layers_to_remove
4364 19 : .iter()
4365 1086 : .map(|x| guard.get_from_desc(x))
4366 19 : .collect::<Vec<Layer>>();
4367 19 :
4368 19 : result.layers_removed = gc_layers.len() as u64;
4369 :
4370 19 : if let Some(remote_client) = self.remote_client.as_ref() {
4371 19 : remote_client.schedule_gc_update(&gc_layers)?;
4372 0 : }
4373 :
4374 19 : guard.finish_gc_timeline(&gc_layers);
4375 19 :
4376 19 : #[cfg(feature = "testing")]
4377 19 : {
4378 19 : result.doomed_layers = gc_layers;
4379 19 : }
4380 692 : }
4381 :
4382 711 : info!(
4383 711 : "GC completed removing {} layers, cutoff {}",
4384 711 : result.layers_removed, new_gc_cutoff
4385 711 : );
4386 :
4387 711 : result.elapsed = now.elapsed()?;
4388 711 : Ok(result)
4389 795 : }
4390 :
4391 : /// Reconstruct a value, using the given base image and WAL records in 'data'.
4392 7349656 : async fn reconstruct_value(
4393 7349656 : &self,
4394 7349656 : key: Key,
4395 7349656 : request_lsn: Lsn,
4396 7349656 : mut data: ValueReconstructState,
4397 7349680 : ) -> Result<Bytes, PageReconstructError> {
4398 7349680 : // Perform WAL redo if needed
4399 7349680 : data.records.reverse();
4400 7349680 :
4401 7349680 : // If we have a page image, and no WAL, we're all set
4402 7349680 : if data.records.is_empty() {
4403 5003053 : if let Some((img_lsn, img)) = &data.img {
4404 0 : trace!(
4405 0 : "found page image for key {} at {}, no WAL redo required, req LSN {}",
4406 0 : key,
4407 0 : img_lsn,
4408 0 : request_lsn,
4409 0 : );
4410 5003053 : Ok(img.clone())
4411 : } else {
4412 0 : Err(PageReconstructError::from(anyhow!(
4413 0 : "base image for {key} at {request_lsn} not found"
4414 0 : )))
4415 : }
4416 : } else {
4417 : // We need to do WAL redo.
4418 : //
4419 : // If we don't have a base image, then the oldest WAL record better initialize
4420 : // the page
4421 2346627 : if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
4422 0 : Err(PageReconstructError::from(anyhow!(
4423 0 : "Base image for {} at {} not found, but got {} WAL records",
4424 0 : key,
4425 0 : request_lsn,
4426 0 : data.records.len()
4427 0 : )))
4428 : } else {
4429 2346627 : if data.img.is_some() {
4430 0 : trace!(
4431 0 : "found {} WAL records and a base image for {} at {}, performing WAL redo",
4432 0 : data.records.len(),
4433 0 : key,
4434 0 : request_lsn
4435 0 : );
4436 : } else {
4437 0 : trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
4438 : };
4439 :
4440 2346627 : let last_rec_lsn = data.records.last().unwrap().0;
4441 :
4442 2346627 : let img = match self
4443 2346627 : .walredo_mgr
4444 2346627 : .as_ref()
4445 2346627 : .context("timeline has no walredo manager")
4446 2346627 : .map_err(PageReconstructError::WalRedo)?
4447 2346627 : .request_redo(key, request_lsn, data.img, data.records, self.pg_version)
4448 0 : .await
4449 2346627 : .context("reconstruct a page image")
4450 : {
4451 2346627 : Ok(img) => img,
4452 0 : Err(e) => return Err(PageReconstructError::WalRedo(e)),
4453 : };
4454 :
4455 2346627 : if img.len() == page_cache::PAGE_SZ {
4456 2343539 : let cache = page_cache::get();
4457 2343539 : if let Err(e) = cache
4458 2343539 : .memorize_materialized_page(
4459 2343539 : self.tenant_shard_id,
4460 2343539 : self.timeline_id,
4461 2343539 : key,
4462 2343539 : last_rec_lsn,
4463 2343539 : &img,
4464 2343539 : )
4465 8303 : .await
4466 2343539 : .context("Materialized page memoization failed")
4467 : {
4468 0 : return Err(PageReconstructError::from(e));
4469 2343539 : }
4470 3088 : }
4471 :
4472 2346627 : Ok(img)
4473 : }
4474 : }
4475 7349680 : }
4476 :
4477 2 : pub(crate) async fn spawn_download_all_remote_layers(
4478 2 : self: Arc<Self>,
4479 2 : request: DownloadRemoteLayersTaskSpawnRequest,
4480 2 : ) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
4481 2 : use pageserver_api::models::DownloadRemoteLayersTaskState;
4482 2 :
4483 2 : // this is not really needed anymore; it has tests which really check the return value from
4484 2 : // http api. it would be better not to maintain this anymore.
4485 2 :
4486 2 : let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
4487 2 : if let Some(st) = &*status_guard {
4488 1 : match &st.state {
4489 : DownloadRemoteLayersTaskState::Running => {
4490 0 : return Err(st.clone());
4491 : }
4492 : DownloadRemoteLayersTaskState::ShutDown
4493 1 : | DownloadRemoteLayersTaskState::Completed => {
4494 1 : *status_guard = None;
4495 1 : }
4496 : }
4497 1 : }
4498 :
4499 2 : let self_clone = Arc::clone(&self);
4500 2 : let task_id = task_mgr::spawn(
4501 2 : task_mgr::BACKGROUND_RUNTIME.handle(),
4502 2 : task_mgr::TaskKind::DownloadAllRemoteLayers,
4503 2 : Some(self.tenant_shard_id),
4504 2 : Some(self.timeline_id),
4505 2 : "download all remote layers task",
4506 : false,
4507 2 : async move {
4508 9 : self_clone.download_all_remote_layers(request).await;
4509 2 : let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
4510 2 : match &mut *status_guard {
4511 : None => {
4512 0 : warn!("tasks status is supposed to be Some(), since we are running");
4513 : }
4514 2 : Some(st) => {
4515 2 : let exp_task_id = format!("{}", task_mgr::current_task_id().unwrap());
4516 2 : if st.task_id != exp_task_id {
4517 0 : warn!("task id changed while we were still running, expecting {} but have {}", exp_task_id, st.task_id);
4518 2 : } else {
4519 2 : st.state = DownloadRemoteLayersTaskState::Completed;
4520 2 : }
4521 : }
4522 : };
4523 2 : Ok(())
4524 2 : }
4525 2 : .instrument(info_span!(parent: None, "download_all_remote_layers", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
4526 : );
4527 :
4528 2 : let initial_info = DownloadRemoteLayersTaskInfo {
4529 2 : task_id: format!("{task_id}"),
4530 2 : state: DownloadRemoteLayersTaskState::Running,
4531 2 : total_layer_count: 0,
4532 2 : successful_download_count: 0,
4533 2 : failed_download_count: 0,
4534 2 : };
4535 2 : *status_guard = Some(initial_info.clone());
4536 2 :
4537 2 : Ok(initial_info)
4538 2 : }
4539 :
4540 2 : async fn download_all_remote_layers(
4541 2 : self: &Arc<Self>,
4542 2 : request: DownloadRemoteLayersTaskSpawnRequest,
4543 2 : ) {
4544 : use pageserver_api::models::DownloadRemoteLayersTaskState;
4545 :
4546 2 : let remaining = {
4547 2 : let guard = self.layers.read().await;
4548 2 : guard
4549 2 : .layer_map()
4550 2 : .iter_historic_layers()
4551 10 : .map(|desc| guard.get_from_desc(&desc))
4552 2 : .collect::<Vec<_>>()
4553 2 : };
4554 2 : let total_layer_count = remaining.len();
4555 2 :
4556 2 : macro_rules! lock_status {
4557 2 : ($st:ident) => {
4558 2 : let mut st = self.download_all_remote_layers_task_info.write().unwrap();
4559 2 : let st = st
4560 2 : .as_mut()
4561 2 : .expect("this function is only called after the task has been spawned");
4562 2 : assert_eq!(
4563 2 : st.task_id,
4564 2 : format!(
4565 2 : "{}",
4566 2 : task_mgr::current_task_id().expect("we run inside a task_mgr task")
4567 2 : )
4568 2 : );
4569 2 : let $st = st;
4570 2 : };
4571 2 : }
4572 2 :
4573 2 : {
4574 2 : lock_status!(st);
4575 2 : st.total_layer_count = total_layer_count as u64;
4576 2 : }
4577 2 :
4578 2 : let mut remaining = remaining.into_iter();
4579 2 : let mut have_remaining = true;
4580 2 : let mut js = tokio::task::JoinSet::new();
4581 2 :
4582 2 : let cancel = task_mgr::shutdown_token();
4583 2 :
4584 2 : let limit = request.max_concurrent_downloads;
4585 :
4586 : loop {
4587 12 : while js.len() < limit.get() && have_remaining && !cancel.is_cancelled() {
4588 12 : let Some(next) = remaining.next() else {
4589 2 : have_remaining = false;
4590 2 : break;
4591 : };
4592 :
4593 10 : let span = tracing::info_span!("download", layer = %next);
4594 :
4595 10 : js.spawn(
4596 10 : async move {
4597 26 : let res = next.download().await;
4598 10 : (next, res)
4599 10 : }
4600 10 : .instrument(span),
4601 10 : );
4602 : }
4603 :
4604 12 : while let Some(res) = js.join_next().await {
4605 10 : match res {
4606 : Ok((_, Ok(_))) => {
4607 5 : lock_status!(st);
4608 5 : st.successful_download_count += 1;
4609 : }
4610 5 : Ok((layer, Err(e))) => {
4611 5 : tracing::error!(%layer, "download failed: {e:#}");
4612 5 : lock_status!(st);
4613 5 : st.failed_download_count += 1;
4614 : }
4615 0 : Err(je) if je.is_cancelled() => unreachable!("not used here"),
4616 0 : Err(je) if je.is_panic() => {
4617 0 : lock_status!(st);
4618 0 : st.failed_download_count += 1;
4619 : }
4620 0 : Err(je) => tracing::warn!("unknown joinerror: {je:?}"),
4621 : }
4622 : }
4623 :
4624 2 : if js.is_empty() && (!have_remaining || cancel.is_cancelled()) {
4625 2 : break;
4626 0 : }
4627 : }
4628 :
4629 : {
4630 2 : lock_status!(st);
4631 2 : st.state = DownloadRemoteLayersTaskState::Completed;
4632 2 : }
4633 2 : }
4634 :
4635 49 : pub(crate) fn get_download_all_remote_layers_task_info(
4636 49 : &self,
4637 49 : ) -> Option<DownloadRemoteLayersTaskInfo> {
4638 49 : self.download_all_remote_layers_task_info
4639 49 : .read()
4640 49 : .unwrap()
4641 49 : .clone()
4642 49 : }
4643 : }
4644 :
4645 : impl Timeline {
4646 : /// Returns non-remote layers for eviction.
4647 37 : pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
4648 37 : let guard = self.layers.read().await;
4649 37 : let mut max_layer_size: Option<u64> = None;
4650 :
4651 37 : let resident_layers = guard
4652 37 : .resident_layers()
4653 539 : .map(|layer| {
4654 539 : let file_size = layer.layer_desc().file_size;
4655 539 : max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
4656 539 :
4657 539 : let last_activity_ts = layer.access_stats().latest_activity_or_now();
4658 539 :
4659 539 : EvictionCandidate {
4660 539 : layer: layer.into(),
4661 539 : last_activity_ts,
4662 539 : relative_last_activity: finite_f32::FiniteF32::ZERO,
4663 539 : }
4664 539 : })
4665 37 : .collect()
4666 23 : .await;
4667 :
4668 37 : DiskUsageEvictionInfo {
4669 37 : max_layer_size,
4670 37 : resident_layers,
4671 37 : }
4672 37 : }
4673 :
4674 22798 : pub(crate) fn get_shard_index(&self) -> ShardIndex {
4675 22798 : ShardIndex {
4676 22798 : shard_number: self.tenant_shard_id.shard_number,
4677 22798 : shard_count: self.tenant_shard_id.shard_count,
4678 22798 : }
4679 22798 : }
4680 : }
4681 :
4682 : type TraversalPathItem = (
4683 : ValueReconstructResult,
4684 : Lsn,
4685 : Box<dyn Send + FnOnce() -> TraversalId>,
4686 : );
4687 :
4688 : /// Helper function for get_reconstruct_data() to add the path of layers traversed
4689 : /// to an error, as anyhow context information.
4690 1269 : fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructError {
4691 1269 : // We want the original 'msg' to be the outermost context. The outermost context
4692 1269 : // is the most high-level information, which also gets propagated to the client.
4693 1269 : let mut msg_iter = path
4694 1269 : .into_iter()
4695 1784 : .map(|(r, c, l)| {
4696 1784 : format!(
4697 1784 : "layer traversal: result {:?}, cont_lsn {}, layer: {}",
4698 1784 : r,
4699 1784 : c,
4700 1784 : l(),
4701 1784 : )
4702 1784 : })
4703 1269 : .chain(std::iter::once(msg));
4704 1269 : // Construct initial message from the first traversed layer
4705 1269 : let err = anyhow!(msg_iter.next().unwrap());
4706 1269 :
4707 1269 : // Append all subsequent traversals, and the error message 'msg', as contexts.
4708 1784 : let msg = msg_iter.fold(err, |err, msg| err.context(msg));
4709 1269 : PageReconstructError::from(msg)
4710 1269 : }
4711 :
4712 : /// Various functions to mutate the timeline.
4713 : // TODO Currently, Deref is used to allow easy access to read methods from this trait.
4714 : // This is probably considered a bad practice in Rust and should be fixed eventually,
4715 : // but will cause large code changes.
4716 : pub(crate) struct TimelineWriter<'a> {
4717 : tl: &'a Timeline,
4718 : _write_guard: tokio::sync::MutexGuard<'a, ()>,
4719 : }
4720 :
4721 : impl Deref for TimelineWriter<'_> {
4722 : type Target = Timeline;
4723 :
4724 0 : fn deref(&self) -> &Self::Target {
4725 0 : self.tl
4726 0 : }
4727 : }
4728 :
4729 : impl<'a> TimelineWriter<'a> {
4730 : /// Put a new page version that can be constructed from a WAL record
4731 : ///
4732 : /// This will implicitly extend the relation, if the page is beyond the
4733 : /// current end-of-file.
4734 1214102 : pub(crate) async fn put(
4735 1214102 : &self,
4736 1214102 : key: Key,
4737 1214102 : lsn: Lsn,
4738 1214102 : value: &Value,
4739 1214102 : ctx: &RequestContext,
4740 1214102 : ) -> anyhow::Result<()> {
4741 1214102 : self.tl.put_value(key, lsn, value, ctx).await
4742 1214102 : }
4743 :
4744 1713238 : pub(crate) async fn put_batch(
4745 1713238 : &self,
4746 1713238 : batch: &HashMap<Key, Vec<(Lsn, Value)>>,
4747 1713238 : ctx: &RequestContext,
4748 1713239 : ) -> anyhow::Result<()> {
4749 1713239 : self.tl.put_values(batch, ctx).await
4750 1713238 : }
4751 :
4752 19251 : pub(crate) async fn delete_batch(&self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
4753 19251 : self.tl.put_tombstones(batch).await
4754 19251 : }
4755 :
4756 : /// Track the end of the latest digested WAL record.
4757 : /// Remember the (end of) last valid WAL record remembered in the timeline.
4758 : ///
4759 : /// Call this after you have finished writing all the WAL up to 'lsn'.
4760 : ///
4761 : /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for
4762 : /// the 'lsn' or anything older. The previous last record LSN is stored alongside
4763 : /// the latest and can be read.
4764 76398101 : pub(crate) fn finish_write(&self, new_lsn: Lsn) {
4765 76398101 : self.tl.finish_write(new_lsn);
4766 76398101 : }
4767 :
4768 638850 : pub(crate) fn update_current_logical_size(&self, delta: i64) {
4769 638850 : self.tl.update_current_logical_size(delta)
4770 638850 : }
4771 : }
4772 :
4773 : // We need TimelineWriter to be send in upcoming conversion of
4774 : // Timeline::layers to tokio::sync::RwLock.
4775 2 : #[test]
4776 2 : fn is_send() {
4777 2 : fn _assert_send<T: Send>() {}
4778 2 : _assert_send::<TimelineWriter<'_>>();
4779 2 : }
4780 :
4781 : /// Add a suffix to a layer file's name: .{num}.old
4782 : /// Uses the first available num (starts at 0)
4783 1 : fn rename_to_backup(path: &Utf8Path) -> anyhow::Result<()> {
4784 1 : let filename = path
4785 1 : .file_name()
4786 1 : .ok_or_else(|| anyhow!("Path {path} don't have a file name"))?;
4787 1 : let mut new_path = path.to_owned();
4788 :
4789 1 : for i in 0u32.. {
4790 1 : new_path.set_file_name(format!("{filename}.{i}.old"));
4791 1 : if !new_path.exists() {
4792 1 : std::fs::rename(path, &new_path)
4793 1 : .with_context(|| format!("rename {path:?} to {new_path:?}"))?;
4794 1 : return Ok(());
4795 0 : }
4796 : }
4797 :
4798 0 : bail!("couldn't find an unused backup number for {:?}", path)
4799 1 : }
4800 :
4801 : #[cfg(test)]
4802 : mod tests {
4803 : use utils::{id::TimelineId, lsn::Lsn};
4804 :
4805 : use crate::tenant::{
4806 : harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline,
4807 : };
4808 :
4809 2 : #[tokio::test]
4810 2 : async fn two_layer_eviction_attempts_at_the_same_time() {
4811 2 : let harness =
4812 2 : TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
4813 2 :
4814 2 : let ctx = any_context();
4815 2 : let tenant = harness.try_load(&ctx).await.unwrap();
4816 2 : let timeline = tenant
4817 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
4818 8 : .await
4819 2 : .unwrap();
4820 2 :
4821 2 : let layer = find_some_layer(&timeline).await;
4822 2 : let layer = layer
4823 2 : .keep_resident()
4824 2 : .await
4825 2 : .expect("no download => no downloading errors")
4826 2 : .expect("should had been resident")
4827 2 : .drop_eviction_guard();
4828 2 :
4829 2 : let first = async { layer.evict_and_wait().await };
4830 2 : let second = async { layer.evict_and_wait().await };
4831 2 :
4832 4 : let (first, second) = tokio::join!(first, second);
4833 2 :
4834 5 : let res = layer.keep_resident().await;
4835 2 : assert!(matches!(res, Ok(None)), "{res:?}");
4836 2 :
4837 2 : match (first, second) {
4838 2 : (Ok(()), Ok(())) => {
4839 1 : // because there are no more timeline locks being taken on eviction path, we can
4840 1 : // witness all three outcomes here.
4841 1 : }
4842 2 : (Ok(()), Err(EvictionError::NotFound)) | (Err(EvictionError::NotFound), Ok(())) => {
4843 1 : // if one completes before the other, this is fine just as well.
4844 1 : }
4845 2 : other => unreachable!("unexpected {:?}", other),
4846 2 : }
4847 2 : }
4848 :
4849 2 : fn any_context() -> crate::context::RequestContext {
4850 2 : use crate::context::*;
4851 2 : use crate::task_mgr::*;
4852 2 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
4853 2 : }
4854 :
4855 2 : async fn find_some_layer(timeline: &Timeline) -> Layer {
4856 2 : let layers = timeline.layers.read().await;
4857 2 : let desc = layers
4858 2 : .layer_map()
4859 2 : .iter_historic_layers()
4860 2 : .next()
4861 2 : .expect("must find one layer to evict");
4862 2 :
4863 2 : layers.get_from_desc(&desc)
4864 2 : }
4865 : }
|