Line data Source code
1 : pub mod delete;
2 : mod eviction_task;
3 : mod init;
4 : pub mod layer_manager;
5 : pub(crate) mod logical_size;
6 : pub mod span;
7 : pub mod uninit;
8 : mod walreceiver;
9 :
10 : use anyhow::{anyhow, bail, ensure, Context, Result};
11 : use bytes::Bytes;
12 : use camino::{Utf8Path, Utf8PathBuf};
13 : use enumset::EnumSet;
14 : use fail::fail_point;
15 : use futures::stream::StreamExt;
16 : use itertools::Itertools;
17 : use once_cell::sync::Lazy;
18 : use pageserver_api::{
19 : keyspace::KeySpaceAccum,
20 : models::{
21 : DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy,
22 : LayerMapInfo, TimelineState,
23 : },
24 : reltag::BlockNumber,
25 : shard::{ShardIdentity, TenantShardId},
26 : };
27 : use rand::Rng;
28 : use serde_with::serde_as;
29 : use storage_broker::BrokerClientChannel;
30 : use tokio::{
31 : runtime::Handle,
32 : sync::{oneshot, watch},
33 : };
34 : use tokio_util::sync::CancellationToken;
35 : use tracing::*;
36 : use utils::{bin_ser::BeSer, sync::gate::Gate};
37 :
38 : use std::ops::{Deref, Range};
39 : use std::pin::pin;
40 : use std::sync::atomic::Ordering as AtomicOrdering;
41 : use std::sync::{Arc, Mutex, RwLock, Weak};
42 : use std::time::{Duration, Instant, SystemTime};
43 : use std::{
44 : array,
45 : collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
46 : sync::atomic::AtomicU64,
47 : };
48 : use std::{
49 : cmp::{max, min, Ordering},
50 : ops::ControlFlow,
51 : };
52 :
53 : use crate::pgdatadir_mapping::DirectoryKind;
54 : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
55 : use crate::tenant::{
56 : layer_map::{LayerMap, SearchResult},
57 : metadata::{save_metadata, TimelineMetadata},
58 : par_fsync,
59 : };
60 : use crate::{
61 : context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder},
62 : disk_usage_eviction_task::DiskUsageEvictionInfo,
63 : };
64 : use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
65 : use crate::{
66 : disk_usage_eviction_task::finite_f32,
67 : tenant::storage_layer::{
68 : AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
69 : LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
70 : ValueReconstructState, ValuesReconstructState,
71 : },
72 : };
73 : use crate::{
74 : disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry,
75 : };
76 : use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind};
77 :
78 : use crate::config::PageServerConf;
79 : use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
80 : use crate::metrics::{
81 : TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
82 : };
83 : use crate::pgdatadir_mapping::CalculateLogicalSizeError;
84 : use crate::tenant::config::TenantConfOpt;
85 : use pageserver_api::key::{is_inherited_key, is_rel_fsm_block_key, is_rel_vm_block_key};
86 : use pageserver_api::reltag::RelTag;
87 : use pageserver_api::shard::ShardIndex;
88 :
89 : use postgres_connection::PgConnectionConfig;
90 : use postgres_ffi::to_pg_timestamp;
91 : use utils::{
92 : completion,
93 : generation::Generation,
94 : id::TimelineId,
95 : lsn::{AtomicLsn, Lsn, RecordLsn},
96 : seqwait::SeqWait,
97 : simple_rcu::{Rcu, RcuReadGuard},
98 : };
99 :
100 : use crate::page_cache;
101 : use crate::repository::GcResult;
102 : use crate::repository::{Key, Value};
103 : use crate::task_mgr;
104 : use crate::task_mgr::TaskKind;
105 : use crate::ZERO_PAGE;
106 :
107 : use self::delete::DeleteTimelineFlow;
108 : pub(super) use self::eviction_task::EvictionTaskTenantState;
109 : use self::eviction_task::EvictionTaskTimelineState;
110 : use self::layer_manager::LayerManager;
111 : use self::logical_size::LogicalSize;
112 : use self::walreceiver::{WalReceiver, WalReceiverConf};
113 :
114 : use super::remote_timeline_client::RemoteTimelineClient;
115 : use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
116 : use super::{config::TenantConf, storage_layer::ReadableLayerDesc};
117 : use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
118 : use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
119 :
120 2 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
121 : pub(super) enum FlushLoopState {
122 : NotStarted,
123 : Running {
124 : #[cfg(test)]
125 : expect_initdb_optimization: bool,
126 : #[cfg(test)]
127 : initdb_optimization_count: usize,
128 : },
129 : Exited,
130 : }
131 :
132 : /// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
133 0 : #[derive(Debug, Clone, PartialEq, Eq)]
134 : pub(crate) struct Hole {
135 : key_range: Range<Key>,
136 : coverage_size: usize,
137 : }
138 :
139 : impl Ord for Hole {
140 0 : fn cmp(&self, other: &Self) -> Ordering {
141 0 : other.coverage_size.cmp(&self.coverage_size) // inverse order
142 0 : }
143 : }
144 :
145 : impl PartialOrd for Hole {
146 0 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
147 0 : Some(self.cmp(other))
148 0 : }
149 : }
150 :
151 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
152 : /// Can be removed after all refactors are done.
153 30 : fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
154 30 : drop(rlock)
155 30 : }
156 :
157 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
158 : /// Can be removed after all refactors are done.
159 508 : fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
160 508 : drop(rlock)
161 508 : }
162 :
163 : /// The outward-facing resources required to build a Timeline
164 : pub struct TimelineResources {
165 : pub remote_client: Option<RemoteTimelineClient>,
166 : pub deletion_queue_client: DeletionQueueClient,
167 : pub timeline_get_throttle: Arc<
168 : crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
169 : >,
170 : }
171 :
172 : pub struct Timeline {
173 : conf: &'static PageServerConf,
174 : tenant_conf: Arc<RwLock<AttachedTenantConf>>,
175 :
176 : myself: Weak<Self>,
177 :
178 : pub(crate) tenant_shard_id: TenantShardId,
179 : pub timeline_id: TimelineId,
180 :
181 : /// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
182 : /// Never changes for the lifetime of this [`Timeline`] object.
183 : ///
184 : /// This duplicates the generation stored in LocationConf, but that structure is mutable:
185 : /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
186 : pub(crate) generation: Generation,
187 :
188 : /// The detailed sharding information from our parent Tenant. This enables us to map keys
189 : /// to shards, and is constant through the lifetime of this Timeline.
190 : shard_identity: ShardIdentity,
191 :
192 : pub pg_version: u32,
193 :
194 : /// The tuple has two elements.
195 : /// 1. `LayerFileManager` keeps track of the various physical representations of the layer files (inmem, local, remote).
196 : /// 2. `LayerMap`, the acceleration data structure for `get_reconstruct_data`.
197 : ///
198 : /// `LayerMap` maps out the `(PAGE,LSN) / (KEY,LSN)` space, which is composed of `(KeyRange, LsnRange)` rectangles.
199 : /// We describe these rectangles through the `PersistentLayerDesc` struct.
200 : ///
201 : /// When we want to reconstruct a page, we first find the `PersistentLayerDesc`'s that we need for page reconstruction,
202 : /// using `LayerMap`. Then, we use `LayerFileManager` to get the `PersistentLayer`'s that correspond to the
203 : /// `PersistentLayerDesc`'s.
204 : ///
205 : /// Hence, it's important to keep things coherent. The `LayerFileManager` must always have an entry for all
206 : /// `PersistentLayerDesc`'s in the `LayerMap`. If it doesn't, `LayerFileManager::get_from_desc` will panic at
207 : /// runtime, e.g., during page reconstruction.
208 : ///
209 : /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
210 : /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
211 : pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
212 :
213 : /// Set of key ranges which should be covered by image layers to
214 : /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
215 : /// It is used by compaction task when it checks if new image layer should be created.
216 : /// Newly created image layer doesn't help to remove the delta layer, until the
217 : /// newly created image layer falls off the PITR horizon. So on next GC cycle,
218 : /// gc_timeline may still want the new image layer to be created. To avoid redundant
219 : /// image layers creation we should check if image layer exists but beyond PITR horizon.
220 : /// This is why we need remember GC cutoff LSN.
221 : ///
222 : wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
223 :
224 : last_freeze_at: AtomicLsn,
225 : // Atomic would be more appropriate here.
226 : last_freeze_ts: RwLock<Instant>,
227 :
228 : // WAL redo manager. `None` only for broken tenants.
229 : walredo_mgr: Option<Arc<super::WalRedoManager>>,
230 :
231 : /// Remote storage client.
232 : /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
233 : pub remote_client: Option<Arc<RemoteTimelineClient>>,
234 :
235 : // What page versions do we hold in the repository? If we get a
236 : // request > last_record_lsn, we need to wait until we receive all
237 : // the WAL up to the request. The SeqWait provides functions for
238 : // that. TODO: If we get a request for an old LSN, such that the
239 : // versions have already been garbage collected away, we should
240 : // throw an error, but we don't track that currently.
241 : //
242 : // last_record_lsn.load().last points to the end of last processed WAL record.
243 : //
244 : // We also remember the starting point of the previous record in
245 : // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
246 : // first WAL record when the node is started up. But here, we just
247 : // keep track of it.
248 : last_record_lsn: SeqWait<RecordLsn, Lsn>,
249 :
250 : // All WAL records have been processed and stored durably on files on
251 : // local disk, up to this LSN. On crash and restart, we need to re-process
252 : // the WAL starting from this point.
253 : //
254 : // Some later WAL records might have been processed and also flushed to disk
255 : // already, so don't be surprised to see some, but there's no guarantee on
256 : // them yet.
257 : disk_consistent_lsn: AtomicLsn,
258 :
259 : // Parent timeline that this timeline was branched from, and the LSN
260 : // of the branch point.
261 : ancestor_timeline: Option<Arc<Timeline>>,
262 : ancestor_lsn: Lsn,
263 :
264 : pub(super) metrics: TimelineMetrics,
265 :
266 : // `Timeline` doesn't write these metrics itself, but it manages the lifetime. Code
267 : // in `crate::page_service` writes these metrics.
268 : pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
269 :
270 : directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM],
271 :
272 : /// Ensures layers aren't frozen by checkpointer between
273 : /// [`Timeline::get_layer_for_write`] and layer reads.
274 : /// Locked automatically by [`TimelineWriter`] and checkpointer.
275 : /// Must always be acquired before the layer map/individual layer lock
276 : /// to avoid deadlock.
277 : write_lock: tokio::sync::Mutex<Option<TimelineWriterState>>,
278 :
279 : /// Used to avoid multiple `flush_loop` tasks running
280 : pub(super) flush_loop_state: Mutex<FlushLoopState>,
281 :
282 : /// layer_flush_start_tx can be used to wake up the layer-flushing task.
283 : /// The value is a counter, incremented every time a new flush cycle is requested.
284 : /// The flush cycle counter is sent back on the layer_flush_done channel when
285 : /// the flush finishes. You can use that to wait for the flush to finish.
286 : layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
287 : /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
288 : layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
289 :
290 : // Needed to ensure that we can't create a branch at a point that was already garbage collected
291 : pub latest_gc_cutoff_lsn: Rcu<Lsn>,
292 :
293 : // List of child timelines and their branch points. This is needed to avoid
294 : // garbage collecting data that is still needed by the child timelines.
295 : pub gc_info: std::sync::RwLock<GcInfo>,
296 :
297 : // It may change across major versions so for simplicity
298 : // keep it after running initdb for a timeline.
299 : // It is needed in checks when we want to error on some operations
300 : // when they are requested for pre-initdb lsn.
301 : // It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn",
302 : // though let's keep them both for better error visibility.
303 : pub initdb_lsn: Lsn,
304 :
305 : /// When did we last calculate the partitioning?
306 : partitioning: Mutex<(KeyPartitioning, Lsn)>,
307 :
308 : /// Configuration: how often should the partitioning be recalculated.
309 : repartition_threshold: u64,
310 :
311 : /// Current logical size of the "datadir", at the last LSN.
312 : current_logical_size: LogicalSize,
313 :
314 : /// Information about the last processed message by the WAL receiver,
315 : /// or None if WAL receiver has not received anything for this timeline
316 : /// yet.
317 : pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
318 : pub walreceiver: Mutex<Option<WalReceiver>>,
319 :
320 : /// Relation size cache
321 : pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
322 :
323 : download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
324 :
325 : state: watch::Sender<TimelineState>,
326 :
327 : /// Prevent two tasks from deleting the timeline at the same time. If held, the
328 : /// timeline is being deleted. If 'true', the timeline has already been deleted.
329 : pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
330 :
331 : eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
332 :
333 : /// Load or creation time information about the disk_consistent_lsn and when the loading
334 : /// happened. Used for consumption metrics.
335 : pub(crate) loaded_at: (Lsn, SystemTime),
336 :
337 : /// Gate to prevent shutdown completing while I/O is still happening to this timeline's data
338 : pub(crate) gate: Gate,
339 :
340 : /// Cancellation token scoped to this timeline: anything doing long-running work relating
341 : /// to the timeline should drop out when this token fires.
342 : pub(crate) cancel: CancellationToken,
343 :
344 : /// Make sure we only have one running compaction at a time in tests.
345 : ///
346 : /// Must only be taken in two places:
347 : /// - [`Timeline::compact`] (this file)
348 : /// - [`delete::delete_local_layer_files`]
349 : ///
350 : /// Timeline deletion will acquire both compaction and gc locks in whatever order.
351 : compaction_lock: tokio::sync::Mutex<()>,
352 :
353 : /// Make sure we only have one running gc at a time.
354 : ///
355 : /// Must only be taken in two places:
356 : /// - [`Timeline::gc`] (this file)
357 : /// - [`delete::delete_local_layer_files`]
358 : ///
359 : /// Timeline deletion will acquire both compaction and gc locks in whatever order.
360 : gc_lock: tokio::sync::Mutex<()>,
361 :
362 : /// Cloned from [`super::Tenant::timeline_get_throttle`] on construction.
363 : timeline_get_throttle: Arc<
364 : crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
365 : >,
366 : }
367 :
368 : pub struct WalReceiverInfo {
369 : pub wal_source_connconf: PgConnectionConfig,
370 : pub last_received_msg_lsn: Lsn,
371 : pub last_received_msg_ts: u128,
372 : }
373 :
374 : ///
375 : /// Information about how much history needs to be retained, needed by
376 : /// Garbage Collection.
377 : ///
378 : pub struct GcInfo {
379 : /// Specific LSNs that are needed.
380 : ///
381 : /// Currently, this includes all points where child branches have
382 : /// been forked off from. In the future, could also include
383 : /// explicit user-defined snapshot points.
384 : pub retain_lsns: Vec<Lsn>,
385 :
386 : /// In addition to 'retain_lsns', keep everything newer than this
387 : /// point.
388 : ///
389 : /// This is calculated by subtracting 'gc_horizon' setting from
390 : /// last-record LSN
391 : ///
392 : /// FIXME: is this inclusive or exclusive?
393 : pub horizon_cutoff: Lsn,
394 :
395 : /// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this
396 : /// point.
397 : ///
398 : /// This is calculated by finding a number such that a record is needed for PITR
399 : /// if only if its LSN is larger than 'pitr_cutoff'.
400 : pub pitr_cutoff: Lsn,
401 : }
402 :
403 : /// An error happened in a get() operation.
404 2 : #[derive(thiserror::Error, Debug)]
405 : pub(crate) enum PageReconstructError {
406 : #[error(transparent)]
407 : Other(#[from] anyhow::Error),
408 :
409 : #[error("Ancestor LSN wait error: {0}")]
410 : AncestorLsnTimeout(#[from] WaitLsnError),
411 :
412 : #[error("timeline shutting down")]
413 : Cancelled,
414 :
415 : /// The ancestor of this is being stopped
416 : #[error("ancestor timeline {0} is being stopped")]
417 : AncestorStopping(TimelineId),
418 :
419 : /// An error happened replaying WAL records
420 : #[error(transparent)]
421 : WalRedo(anyhow::Error),
422 : }
423 :
424 : impl PageReconstructError {
425 : /// Returns true if this error indicates a tenant/timeline shutdown alike situation
426 0 : pub(crate) fn is_stopping(&self) -> bool {
427 0 : use PageReconstructError::*;
428 0 : match self {
429 0 : Other(_) => false,
430 0 : AncestorLsnTimeout(_) => false,
431 0 : Cancelled | AncestorStopping(_) => true,
432 0 : WalRedo(_) => false,
433 : }
434 0 : }
435 : }
436 :
437 0 : #[derive(thiserror::Error, Debug)]
438 : enum CreateImageLayersError {
439 : #[error("timeline shutting down")]
440 : Cancelled,
441 :
442 : #[error(transparent)]
443 : GetVectoredError(GetVectoredError),
444 :
445 : #[error(transparent)]
446 : PageReconstructError(PageReconstructError),
447 :
448 : #[error(transparent)]
449 : Other(#[from] anyhow::Error),
450 : }
451 :
452 0 : #[derive(thiserror::Error, Debug)]
453 : enum FlushLayerError {
454 : /// Timeline cancellation token was cancelled
455 : #[error("timeline shutting down")]
456 : Cancelled,
457 :
458 : #[error(transparent)]
459 : CreateImageLayersError(CreateImageLayersError),
460 :
461 : #[error(transparent)]
462 : Other(#[from] anyhow::Error),
463 : }
464 :
465 0 : #[derive(thiserror::Error, Debug)]
466 : pub(crate) enum GetVectoredError {
467 : #[error("timeline shutting down")]
468 : Cancelled,
469 :
470 : #[error("Requested too many keys: {0} > {}", Timeline::MAX_GET_VECTORED_KEYS)]
471 : Oversized(u64),
472 :
473 : #[error("Requested at invalid LSN: {0}")]
474 : InvalidLsn(Lsn),
475 :
476 : #[error("Requested key {0} not found")]
477 : MissingKey(Key),
478 :
479 : #[error(transparent)]
480 : GetReadyAncestorError(GetReadyAncestorError),
481 :
482 : #[error(transparent)]
483 : Other(#[from] anyhow::Error),
484 : }
485 :
486 0 : #[derive(thiserror::Error, Debug)]
487 : pub(crate) enum GetReadyAncestorError {
488 : #[error("ancestor timeline {0} is being stopped")]
489 : AncestorStopping(TimelineId),
490 :
491 : #[error("Ancestor LSN wait error: {0}")]
492 : AncestorLsnTimeout(#[from] WaitLsnError),
493 :
494 : #[error("Cancelled")]
495 : Cancelled,
496 :
497 : #[error(transparent)]
498 : Other(#[from] anyhow::Error),
499 : }
500 :
501 0 : #[derive(Clone, Copy)]
502 : pub enum LogicalSizeCalculationCause {
503 : Initial,
504 : ConsumptionMetricsSyntheticSize,
505 : EvictionTaskImitation,
506 : TenantSizeHandler,
507 : }
508 :
509 : pub enum GetLogicalSizePriority {
510 : User,
511 : Background,
512 : }
513 :
514 0 : #[derive(enumset::EnumSetType)]
515 : pub(crate) enum CompactFlags {
516 : ForceRepartition,
517 : }
518 :
519 : impl std::fmt::Debug for Timeline {
520 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
521 0 : write!(f, "Timeline<{}>", self.timeline_id)
522 0 : }
523 : }
524 :
525 0 : #[derive(thiserror::Error, Debug)]
526 : pub(crate) enum WaitLsnError {
527 : // Called on a timeline which is shutting down
528 : #[error("Shutdown")]
529 : Shutdown,
530 :
531 : // Called on an timeline not in active state or shutting down
532 : #[error("Bad state (not active)")]
533 : BadState,
534 :
535 : // Timeout expired while waiting for LSN to catch up with goal.
536 : #[error("{0}")]
537 : Timeout(String),
538 : }
539 :
540 : // The impls below achieve cancellation mapping for errors.
541 : // Perhaps there's a way of achieving this with less cruft.
542 :
543 : impl From<CreateImageLayersError> for CompactionError {
544 0 : fn from(e: CreateImageLayersError) -> Self {
545 0 : match e {
546 0 : CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
547 0 : _ => CompactionError::Other(e.into()),
548 : }
549 0 : }
550 : }
551 :
552 : impl From<CreateImageLayersError> for FlushLayerError {
553 0 : fn from(e: CreateImageLayersError) -> Self {
554 0 : match e {
555 0 : CreateImageLayersError::Cancelled => FlushLayerError::Cancelled,
556 0 : any => FlushLayerError::CreateImageLayersError(any),
557 : }
558 0 : }
559 : }
560 :
561 : impl From<PageReconstructError> for CreateImageLayersError {
562 0 : fn from(e: PageReconstructError) -> Self {
563 0 : match e {
564 0 : PageReconstructError::Cancelled => CreateImageLayersError::Cancelled,
565 0 : _ => CreateImageLayersError::PageReconstructError(e),
566 : }
567 0 : }
568 : }
569 :
570 : impl From<GetVectoredError> for CreateImageLayersError {
571 0 : fn from(e: GetVectoredError) -> Self {
572 0 : match e {
573 0 : GetVectoredError::Cancelled => CreateImageLayersError::Cancelled,
574 0 : _ => CreateImageLayersError::GetVectoredError(e),
575 : }
576 0 : }
577 : }
578 :
579 : impl From<GetReadyAncestorError> for PageReconstructError {
580 2 : fn from(e: GetReadyAncestorError) -> Self {
581 2 : use GetReadyAncestorError::*;
582 2 : match e {
583 0 : AncestorStopping(tid) => PageReconstructError::AncestorStopping(tid),
584 0 : AncestorLsnTimeout(wait_err) => PageReconstructError::AncestorLsnTimeout(wait_err),
585 0 : Cancelled => PageReconstructError::Cancelled,
586 2 : Other(other) => PageReconstructError::Other(other),
587 : }
588 2 : }
589 : }
590 :
591 : #[derive(
592 : Eq,
593 4 : PartialEq,
594 0 : Debug,
595 : Copy,
596 0 : Clone,
597 114 : strum_macros::EnumString,
598 0 : strum_macros::Display,
599 0 : serde_with::DeserializeFromStr,
600 0 : serde_with::SerializeDisplay,
601 : )]
602 : #[strum(serialize_all = "kebab-case")]
603 : pub enum GetVectoredImpl {
604 : Sequential,
605 : Vectored,
606 : }
607 :
608 : /// Public interface functions
609 : impl Timeline {
610 : /// Get the LSN where this branch was created
611 6 : pub(crate) fn get_ancestor_lsn(&self) -> Lsn {
612 6 : self.ancestor_lsn
613 6 : }
614 :
615 : /// Get the ancestor's timeline id
616 14 : pub(crate) fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
617 14 : self.ancestor_timeline
618 14 : .as_ref()
619 14 : .map(|ancestor| ancestor.timeline_id)
620 14 : }
621 :
622 : /// Lock and get timeline's GC cutoff
623 622 : pub(crate) fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
624 622 : self.latest_gc_cutoff_lsn.read()
625 622 : }
626 :
627 : /// Look up given page version.
628 : ///
629 : /// If a remote layer file is needed, it is downloaded as part of this
630 : /// call.
631 : ///
632 : /// NOTE: It is considered an error to 'get' a key that doesn't exist. The
633 : /// abstraction above this needs to store suitable metadata to track what
634 : /// data exists with what keys, in separate metadata entries. If a
635 : /// non-existent key is requested, we may incorrectly return a value from
636 : /// an ancestor branch, for example, or waste a lot of cycles chasing the
637 : /// non-existing key.
638 : ///
639 : /// # Cancel-Safety
640 : ///
641 : /// This method is cancellation-safe.
642 502486 : pub(crate) async fn get(
643 502486 : &self,
644 502486 : key: Key,
645 502486 : lsn: Lsn,
646 502486 : ctx: &RequestContext,
647 502486 : ) -> Result<Bytes, PageReconstructError> {
648 502486 : if !lsn.is_valid() {
649 0 : return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
650 502486 : }
651 502486 :
652 502486 : self.timeline_get_throttle.throttle(ctx, 1).await;
653 :
654 : // This check is debug-only because of the cost of hashing, and because it's a double-check: we
655 : // already checked the key against the shard_identity when looking up the Timeline from
656 : // page_service.
657 502486 : debug_assert!(!self.shard_identity.is_key_disposable(&key));
658 :
659 : // XXX: structured stats collection for layer eviction here.
660 0 : trace!(
661 0 : "get page request for {}@{} from task kind {:?}",
662 0 : key,
663 0 : lsn,
664 0 : ctx.task_kind()
665 0 : );
666 :
667 : // Check the page cache. We will get back the most recent page with lsn <= `lsn`.
668 : // The cached image can be returned directly if there is no WAL between the cached image
669 : // and requested LSN. The cached image can also be used to reduce the amount of WAL needed
670 : // for redo.
671 502486 : let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
672 0 : Some((cached_lsn, cached_img)) => {
673 0 : match cached_lsn.cmp(&lsn) {
674 0 : Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
675 : Ordering::Equal => {
676 0 : MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
677 0 : return Ok(cached_img); // exact LSN match, return the image
678 : }
679 : Ordering::Greater => {
680 0 : unreachable!("the returned lsn should never be after the requested lsn")
681 : }
682 : }
683 0 : Some((cached_lsn, cached_img))
684 : }
685 502486 : None => None,
686 : };
687 :
688 502486 : let mut reconstruct_state = ValueReconstructState {
689 502486 : records: Vec::new(),
690 502486 : img: cached_page_img,
691 502486 : };
692 502486 :
693 502486 : let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
694 502486 : let path = self
695 502486 : .get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
696 36411 : .await?;
697 502378 : timer.stop_and_record();
698 502378 :
699 502378 : let start = Instant::now();
700 502378 : let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
701 502378 : let elapsed = start.elapsed();
702 502378 : crate::metrics::RECONSTRUCT_TIME
703 502378 : .for_result(&res)
704 502378 : .observe(elapsed.as_secs_f64());
705 502378 :
706 502378 : if cfg!(feature = "testing") && res.is_err() {
707 : // it can only be walredo issue
708 : use std::fmt::Write;
709 :
710 0 : let mut msg = String::new();
711 0 :
712 0 : path.into_iter().for_each(|(res, cont_lsn, layer)| {
713 0 : writeln!(
714 0 : msg,
715 0 : "- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
716 0 : layer(),
717 0 : )
718 0 : .expect("string grows")
719 0 : });
720 :
721 : // this is to rule out or provide evidence that we could in some cases read a duplicate
722 : // walrecord
723 0 : tracing::info!("walredo failed, path:\n{msg}");
724 502378 : }
725 :
726 502378 : res
727 502486 : }
728 :
729 : pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32;
730 :
731 : /// Look up multiple page versions at a given LSN
732 : ///
733 : /// This naive implementation will be replaced with a more efficient one
734 : /// which actually vectorizes the read path.
735 420 : pub(crate) async fn get_vectored(
736 420 : &self,
737 420 : keyspace: KeySpace,
738 420 : lsn: Lsn,
739 420 : ctx: &RequestContext,
740 420 : ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
741 420 : if !lsn.is_valid() {
742 0 : return Err(GetVectoredError::InvalidLsn(lsn));
743 420 : }
744 420 :
745 420 : let key_count = keyspace.total_size().try_into().unwrap();
746 420 : if key_count > Timeline::MAX_GET_VECTORED_KEYS {
747 0 : return Err(GetVectoredError::Oversized(key_count));
748 420 : }
749 420 :
750 420 : self.timeline_get_throttle
751 420 : .throttle(ctx, key_count as usize)
752 0 : .await;
753 :
754 840 : for range in &keyspace.ranges {
755 420 : let mut key = range.start;
756 980 : while key != range.end {
757 560 : assert!(!self.shard_identity.is_key_disposable(&key));
758 560 : key = key.next();
759 : }
760 : }
761 :
762 0 : trace!(
763 0 : "get vectored request for {:?}@{} from task kind {:?} will use {} implementation",
764 0 : keyspace,
765 0 : lsn,
766 0 : ctx.task_kind(),
767 0 : self.conf.get_vectored_impl
768 0 : );
769 :
770 420 : let _timer = crate::metrics::GET_VECTORED_LATENCY
771 420 : .for_task_kind(ctx.task_kind())
772 420 : .map(|t| t.start_timer());
773 420 :
774 420 : match self.conf.get_vectored_impl {
775 : GetVectoredImpl::Sequential => {
776 420 : self.get_vectored_sequential_impl(keyspace, lsn, ctx).await
777 : }
778 : GetVectoredImpl::Vectored => {
779 0 : let vectored_res = self.get_vectored_impl(keyspace.clone(), lsn, ctx).await;
780 :
781 0 : self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
782 0 : .await;
783 :
784 0 : vectored_res
785 : }
786 : }
787 420 : }
788 :
789 430 : pub(super) async fn get_vectored_sequential_impl(
790 430 : &self,
791 430 : keyspace: KeySpace,
792 430 : lsn: Lsn,
793 430 : ctx: &RequestContext,
794 430 : ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
795 430 : let mut values = BTreeMap::new();
796 860 : for range in keyspace.ranges {
797 430 : let mut key = range.start;
798 1310 : while key != range.end {
799 880 : let block = self.get(key, lsn, ctx).await;
800 :
801 : use PageReconstructError::*;
802 0 : match block {
803 : Err(Cancelled | AncestorStopping(_)) => {
804 0 : return Err(GetVectoredError::Cancelled)
805 : }
806 0 : Err(Other(err)) if err.to_string().contains("could not find data for key") => {
807 0 : return Err(GetVectoredError::MissingKey(key))
808 : }
809 880 : _ => {
810 880 : values.insert(key, block);
811 880 : key = key.next();
812 880 : }
813 : }
814 : }
815 : }
816 :
817 430 : Ok(values)
818 430 : }
819 :
820 10 : pub(super) async fn get_vectored_impl(
821 10 : &self,
822 10 : keyspace: KeySpace,
823 10 : lsn: Lsn,
824 10 : ctx: &RequestContext,
825 10 : ) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
826 10 : let mut reconstruct_state = ValuesReconstructState::new();
827 10 :
828 10 : self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx)
829 28 : .await?;
830 :
831 10 : let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
832 330 : for (key, res) in reconstruct_state.keys {
833 320 : match res {
834 0 : Err(err) => {
835 0 : results.insert(key, Err(err));
836 0 : }
837 320 : Ok(state) => {
838 320 : let state = ValueReconstructState::from(state);
839 :
840 320 : let reconstruct_res = self.reconstruct_value(key, lsn, state).await;
841 320 : results.insert(key, reconstruct_res);
842 : }
843 : }
844 : }
845 :
846 10 : Ok(results)
847 10 : }
848 :
849 10 : pub(super) async fn validate_get_vectored_impl(
850 10 : &self,
851 10 : vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>,
852 10 : keyspace: KeySpace,
853 10 : lsn: Lsn,
854 10 : ctx: &RequestContext,
855 10 : ) {
856 10 : let sequential_res = self
857 10 : .get_vectored_sequential_impl(keyspace.clone(), lsn, ctx)
858 20 : .await;
859 :
860 0 : fn errors_match(lhs: &GetVectoredError, rhs: &GetVectoredError) -> bool {
861 0 : use GetVectoredError::*;
862 0 : match (lhs, rhs) {
863 0 : (Cancelled, Cancelled) => true,
864 0 : (_, Cancelled) => true,
865 0 : (Oversized(l), Oversized(r)) => l == r,
866 0 : (InvalidLsn(l), InvalidLsn(r)) => l == r,
867 0 : (MissingKey(l), MissingKey(r)) => l == r,
868 0 : (GetReadyAncestorError(_), GetReadyAncestorError(_)) => true,
869 0 : (Other(_), Other(_)) => true,
870 0 : _ => false,
871 : }
872 0 : }
873 :
874 10 : match (&sequential_res, vectored_res) {
875 0 : (Err(seq_err), Ok(_)) => {
876 0 : panic!(concat!("Sequential get failed with {}, but vectored get did not",
877 0 : " - keyspace={:?} lsn={}"),
878 0 : seq_err, keyspace, lsn) },
879 0 : (Ok(_), Err(vec_err)) => {
880 0 : panic!(concat!("Vectored get failed with {}, but sequential get did not",
881 0 : " - keyspace={:?} lsn={}"),
882 0 : vec_err, keyspace, lsn) },
883 0 : (Err(seq_err), Err(vec_err)) => {
884 0 : assert!(errors_match(seq_err, vec_err),
885 0 : "Mismatched errors: {seq_err} != {vec_err} - keyspace={keyspace:?} lsn={lsn}")},
886 10 : (Ok(seq_values), Ok(vec_values)) => {
887 320 : seq_values.iter().zip(vec_values.iter()).for_each(|((seq_key, seq_res), (vec_key, vec_res))| {
888 320 : assert_eq!(seq_key, vec_key);
889 320 : match (seq_res, vec_res) {
890 320 : (Ok(seq_blob), Ok(vec_blob)) => {
891 320 : assert_eq!(seq_blob, vec_blob,
892 0 : "Image mismatch for key {seq_key} - keyspace={keyspace:?} lsn={lsn}");
893 : },
894 0 : (Err(err), Ok(_)) => {
895 0 : panic!(
896 0 : concat!("Sequential get failed with {} for key {}, but vectored get did not",
897 0 : " - keyspace={:?} lsn={}"),
898 0 : err, seq_key, keyspace, lsn) },
899 0 : (Ok(_), Err(err)) => {
900 0 : panic!(
901 0 : concat!("Vectored get failed with {} for key {}, but sequential get did not",
902 0 : " - keyspace={:?} lsn={}"),
903 0 : err, seq_key, keyspace, lsn) },
904 0 : (Err(_), Err(_)) => {}
905 : }
906 320 : })
907 : }
908 : }
909 10 : }
910 :
911 : /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
912 2900476 : pub(crate) fn get_last_record_lsn(&self) -> Lsn {
913 2900476 : self.last_record_lsn.load().last
914 2900476 : }
915 :
916 0 : pub(crate) fn get_prev_record_lsn(&self) -> Lsn {
917 0 : self.last_record_lsn.load().prev
918 0 : }
919 :
920 : /// Atomically get both last and prev.
921 210 : pub(crate) fn get_last_record_rlsn(&self) -> RecordLsn {
922 210 : self.last_record_lsn.load()
923 210 : }
924 :
925 694 : pub(crate) fn get_disk_consistent_lsn(&self) -> Lsn {
926 694 : self.disk_consistent_lsn.load()
927 694 : }
928 :
929 : /// remote_consistent_lsn from the perspective of the tenant's current generation,
930 : /// not validated with control plane yet.
931 : /// See [`Self::get_remote_consistent_lsn_visible`].
932 0 : pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
933 0 : if let Some(remote_client) = &self.remote_client {
934 0 : remote_client.remote_consistent_lsn_projected()
935 : } else {
936 0 : None
937 : }
938 0 : }
939 :
940 : /// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
941 : /// i.e. a value of remote_consistent_lsn_projected which has undergone
942 : /// generation validation in the deletion queue.
943 0 : pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
944 0 : if let Some(remote_client) = &self.remote_client {
945 0 : remote_client.remote_consistent_lsn_visible()
946 : } else {
947 0 : None
948 : }
949 0 : }
950 :
951 : /// The sum of the file size of all historic layers in the layer map.
952 : /// This method makes no distinction between local and remote layers.
953 : /// Hence, the result **does not represent local filesystem usage**.
954 0 : pub(crate) async fn layer_size_sum(&self) -> u64 {
955 0 : let guard = self.layers.read().await;
956 0 : let layer_map = guard.layer_map();
957 0 : let mut size = 0;
958 0 : for l in layer_map.iter_historic_layers() {
959 0 : size += l.file_size();
960 0 : }
961 0 : size
962 0 : }
963 :
964 0 : pub(crate) fn resident_physical_size(&self) -> u64 {
965 0 : self.metrics.resident_physical_size_get()
966 0 : }
967 :
968 0 : pub(crate) fn get_directory_metrics(&self) -> [u64; DirectoryKind::KINDS_NUM] {
969 0 : array::from_fn(|idx| self.directory_metrics[idx].load(AtomicOrdering::Relaxed))
970 0 : }
971 :
972 : ///
973 : /// Wait until WAL has been received and processed up to this LSN.
974 : ///
975 : /// You should call this before any of the other get_* or list_* functions. Calling
976 : /// those functions with an LSN that has been processed yet is an error.
977 : ///
978 226536 : pub(crate) async fn wait_lsn(
979 226536 : &self,
980 226536 : lsn: Lsn,
981 226536 : _ctx: &RequestContext, /* Prepare for use by cancellation */
982 226536 : ) -> Result<(), WaitLsnError> {
983 226536 : if self.cancel.is_cancelled() {
984 0 : return Err(WaitLsnError::Shutdown);
985 226536 : } else if !self.is_active() {
986 0 : return Err(WaitLsnError::BadState);
987 226536 : }
988 :
989 : // This should never be called from the WAL receiver, because that could lead
990 : // to a deadlock.
991 : debug_assert!(
992 226536 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
993 0 : "wait_lsn cannot be called in WAL receiver"
994 : );
995 : debug_assert!(
996 226536 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
997 0 : "wait_lsn cannot be called in WAL receiver"
998 : );
999 : debug_assert!(
1000 226536 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
1001 0 : "wait_lsn cannot be called in WAL receiver"
1002 : );
1003 :
1004 226536 : let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
1005 226536 :
1006 226536 : match self
1007 226536 : .last_record_lsn
1008 226536 : .wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
1009 0 : .await
1010 : {
1011 226536 : Ok(()) => Ok(()),
1012 0 : Err(e) => {
1013 0 : use utils::seqwait::SeqWaitError::*;
1014 0 : match e {
1015 0 : Shutdown => Err(WaitLsnError::Shutdown),
1016 : Timeout => {
1017 : // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
1018 0 : drop(_timer);
1019 0 : let walreceiver_status = self.walreceiver_status();
1020 0 : Err(WaitLsnError::Timeout(format!(
1021 0 : "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
1022 0 : lsn,
1023 0 : self.get_last_record_lsn(),
1024 0 : self.get_disk_consistent_lsn(),
1025 0 : walreceiver_status,
1026 0 : )))
1027 : }
1028 : }
1029 : }
1030 : }
1031 226536 : }
1032 :
1033 0 : pub(crate) fn walreceiver_status(&self) -> String {
1034 0 : match &*self.walreceiver.lock().unwrap() {
1035 0 : None => "stopping or stopped".to_string(),
1036 0 : Some(walreceiver) => match walreceiver.status() {
1037 0 : Some(status) => status.to_human_readable_string(),
1038 0 : None => "Not active".to_string(),
1039 : },
1040 : }
1041 0 : }
1042 :
1043 : /// Check that it is valid to request operations with that lsn.
1044 214 : pub(crate) fn check_lsn_is_in_scope(
1045 214 : &self,
1046 214 : lsn: Lsn,
1047 214 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
1048 214 : ) -> anyhow::Result<()> {
1049 214 : ensure!(
1050 214 : lsn >= **latest_gc_cutoff_lsn,
1051 4 : "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
1052 4 : lsn,
1053 4 : **latest_gc_cutoff_lsn,
1054 : );
1055 210 : Ok(())
1056 214 : }
1057 :
1058 : /// Flush to disk all data that was written with the put_* functions
1059 1052 : #[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
1060 : pub(crate) async fn freeze_and_flush(&self) -> anyhow::Result<()> {
1061 : self.freeze_inmem_layer(false).await;
1062 : self.flush_frozen_layers_and_wait().await
1063 : }
1064 :
1065 : /// Outermost timeline compaction operation; downloads needed layers.
1066 408 : pub(crate) async fn compact(
1067 408 : self: &Arc<Self>,
1068 408 : cancel: &CancellationToken,
1069 408 : flags: EnumSet<CompactFlags>,
1070 408 : ctx: &RequestContext,
1071 408 : ) -> Result<(), CompactionError> {
1072 408 : // most likely the cancellation token is from background task, but in tests it could be the
1073 408 : // request task as well.
1074 408 :
1075 408 : let prepare = async move {
1076 408 : let guard = self.compaction_lock.lock().await;
1077 :
1078 408 : let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
1079 408 : BackgroundLoopKind::Compaction,
1080 408 : ctx,
1081 408 : )
1082 0 : .await;
1083 :
1084 408 : (guard, permit)
1085 408 : };
1086 :
1087 : // this wait probably never needs any "long time spent" logging, because we already nag if
1088 : // compaction task goes over it's period (20s) which is quite often in production.
1089 408 : let (_guard, _permit) = tokio::select! {
1090 408 : tuple = prepare => { tuple },
1091 : _ = self.cancel.cancelled() => return Ok(()),
1092 : _ = cancel.cancelled() => return Ok(()),
1093 : };
1094 :
1095 408 : let last_record_lsn = self.get_last_record_lsn();
1096 408 :
1097 408 : // Last record Lsn could be zero in case the timeline was just created
1098 408 : if !last_record_lsn.is_valid() {
1099 0 : warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
1100 0 : return Ok(());
1101 408 : }
1102 408 :
1103 408 : // High level strategy for compaction / image creation:
1104 408 : //
1105 408 : // 1. First, calculate the desired "partitioning" of the
1106 408 : // currently in-use key space. The goal is to partition the
1107 408 : // key space into roughly fixed-size chunks, but also take into
1108 408 : // account any existing image layers, and try to align the
1109 408 : // chunk boundaries with the existing image layers to avoid
1110 408 : // too much churn. Also try to align chunk boundaries with
1111 408 : // relation boundaries. In principle, we don't know about
1112 408 : // relation boundaries here, we just deal with key-value
1113 408 : // pairs, and the code in pgdatadir_mapping.rs knows how to
1114 408 : // map relations into key-value pairs. But in practice we know
1115 408 : // that 'field6' is the block number, and the fields 1-5
1116 408 : // identify a relation. This is just an optimization,
1117 408 : // though.
1118 408 : //
1119 408 : // 2. Once we know the partitioning, for each partition,
1120 408 : // decide if it's time to create a new image layer. The
1121 408 : // criteria is: there has been too much "churn" since the last
1122 408 : // image layer? The "churn" is fuzzy concept, it's a
1123 408 : // combination of too many delta files, or too much WAL in
1124 408 : // total in the delta file. Or perhaps: if creating an image
1125 408 : // file would allow to delete some older files.
1126 408 : //
1127 408 : // 3. After that, we compact all level0 delta files if there
1128 408 : // are too many of them. While compacting, we also garbage
1129 408 : // collect any page versions that are no longer needed because
1130 408 : // of the new image layers we created in step 2.
1131 408 : //
1132 408 : // TODO: This high level strategy hasn't been implemented yet.
1133 408 : // Below are functions compact_level0() and create_image_layers()
1134 408 : // but they are a bit ad hoc and don't quite work like it's explained
1135 408 : // above. Rewrite it.
1136 408 :
1137 408 : // Is the timeline being deleted?
1138 408 : if self.is_stopping() {
1139 0 : trace!("Dropping out of compaction on timeline shutdown");
1140 0 : return Err(CompactionError::ShuttingDown);
1141 408 : }
1142 408 :
1143 408 : let target_file_size = self.get_checkpoint_distance();
1144 408 :
1145 408 : // Define partitioning schema if needed
1146 408 :
1147 408 : // FIXME: the match should only cover repartitioning, not the next steps
1148 408 : match self
1149 408 : .repartition(
1150 408 : self.get_last_record_lsn(),
1151 408 : self.get_compaction_target_size(),
1152 408 : flags,
1153 408 : ctx,
1154 408 : )
1155 13252 : .await
1156 : {
1157 408 : Ok((partitioning, lsn)) => {
1158 408 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
1159 408 : let image_ctx = RequestContextBuilder::extend(ctx)
1160 408 : .access_stats_behavior(AccessStatsBehavior::Skip)
1161 408 : .build();
1162 408 :
1163 408 : // 2. Compact
1164 408 : let timer = self.metrics.compact_time_histo.start_timer();
1165 39551 : self.compact_level0(target_file_size, ctx).await?;
1166 408 : timer.stop_and_record();
1167 :
1168 : // 3. Create new image layers for partitions that have been modified
1169 : // "enough".
1170 408 : let layers = self
1171 408 : .create_image_layers(&partitioning, lsn, false, &image_ctx)
1172 0 : .await
1173 408 : .map_err(anyhow::Error::from)?;
1174 408 : if let Some(remote_client) = &self.remote_client {
1175 408 : for layer in layers {
1176 0 : remote_client.schedule_layer_file_upload(layer)?;
1177 : }
1178 0 : }
1179 :
1180 408 : if let Some(remote_client) = &self.remote_client {
1181 : // should any new image layer been created, not uploading index_part will
1182 : // result in a mismatch between remote_physical_size and layermap calculated
1183 : // size, which will fail some tests, but should not be an issue otherwise.
1184 408 : remote_client.schedule_index_upload_for_file_changes()?;
1185 0 : }
1186 : }
1187 0 : Err(err) => {
1188 0 : // no partitioning? This is normal, if the timeline was just created
1189 0 : // as an empty timeline. Also in unit tests, when we use the timeline
1190 0 : // as a simple key-value store, ignoring the datadir layout. Log the
1191 0 : // error but continue.
1192 0 : //
1193 0 : // Suppress error when it's due to cancellation
1194 0 : if !self.cancel.is_cancelled() {
1195 0 : error!("could not compact, repartitioning keyspace failed: {err:?}");
1196 0 : }
1197 : }
1198 : };
1199 :
1200 408 : Ok(())
1201 408 : }
1202 :
1203 : /// Mutate the timeline with a [`TimelineWriter`].
1204 2957008 : pub(crate) async fn writer(&self) -> TimelineWriter<'_> {
1205 2957008 : TimelineWriter {
1206 2957008 : tl: self,
1207 2957008 : write_guard: self.write_lock.lock().await,
1208 : }
1209 2957008 : }
1210 :
1211 0 : pub(crate) fn activate(
1212 0 : self: &Arc<Self>,
1213 0 : broker_client: BrokerClientChannel,
1214 0 : background_jobs_can_start: Option<&completion::Barrier>,
1215 0 : ctx: &RequestContext,
1216 0 : ) {
1217 0 : if self.tenant_shard_id.is_zero() {
1218 0 : // Logical size is only maintained accurately on shard zero.
1219 0 : self.spawn_initial_logical_size_computation_task(ctx);
1220 0 : }
1221 0 : self.launch_wal_receiver(ctx, broker_client);
1222 0 : self.set_state(TimelineState::Active);
1223 0 : self.launch_eviction_task(background_jobs_can_start);
1224 0 : }
1225 :
1226 : /// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
1227 : /// also to remote storage. This method can easily take multiple seconds for a busy timeline.
1228 : ///
1229 : /// While we are flushing, we continue to accept read I/O.
1230 6 : pub(crate) async fn flush_and_shutdown(&self) {
1231 6 : debug_assert_current_span_has_tenant_and_timeline_id();
1232 :
1233 : // Stop ingesting data, so that we are not still writing to an InMemoryLayer while
1234 : // trying to flush
1235 0 : tracing::debug!("Waiting for WalReceiverManager...");
1236 6 : task_mgr::shutdown_tasks(
1237 6 : Some(TaskKind::WalReceiverManager),
1238 6 : Some(self.tenant_shard_id),
1239 6 : Some(self.timeline_id),
1240 6 : )
1241 0 : .await;
1242 :
1243 : // Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
1244 6 : self.last_record_lsn.shutdown();
1245 6 :
1246 6 : // now all writers to InMemory layer are gone, do the final flush if requested
1247 6 : match self.freeze_and_flush().await {
1248 : Ok(_) => {
1249 : // drain the upload queue
1250 6 : if let Some(client) = self.remote_client.as_ref() {
1251 : // if we did not wait for completion here, it might be our shutdown process
1252 : // didn't wait for remote uploads to complete at all, as new tasks can forever
1253 : // be spawned.
1254 : //
1255 : // what is problematic is the shutting down of RemoteTimelineClient, because
1256 : // obviously it does not make sense to stop while we wait for it, but what
1257 : // about corner cases like s3 suddenly hanging up?
1258 6 : if let Err(e) = client.shutdown().await {
1259 : // Non-fatal. Shutdown is infallible. Failures to flush just mean that
1260 : // we have some extra WAL replay to do next time the timeline starts.
1261 0 : warn!("failed to flush to remote storage: {e:#}");
1262 6 : }
1263 0 : }
1264 : }
1265 0 : Err(e) => {
1266 : // Non-fatal. Shutdown is infallible. Failures to flush just mean that
1267 : // we have some extra WAL replay to do next time the timeline starts.
1268 0 : warn!("failed to freeze and flush: {e:#}");
1269 : }
1270 : }
1271 :
1272 6 : self.shutdown().await;
1273 6 : }
1274 :
1275 : /// Shut down immediately, without waiting for any open layers to flush to disk. This is a subset of
1276 : /// the graceful [`Timeline::flush_and_shutdown`] function.
1277 8 : pub(crate) async fn shutdown(&self) {
1278 8 : debug_assert_current_span_has_tenant_and_timeline_id();
1279 :
1280 : // Signal any subscribers to our cancellation token to drop out
1281 0 : tracing::debug!("Cancelling CancellationToken");
1282 8 : self.cancel.cancel();
1283 8 :
1284 8 : // Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
1285 8 : // while doing so.
1286 8 : self.last_record_lsn.shutdown();
1287 8 :
1288 8 : // Shut down the layer flush task before the remote client, as one depends on the other
1289 8 : task_mgr::shutdown_tasks(
1290 8 : Some(TaskKind::LayerFlushTask),
1291 8 : Some(self.tenant_shard_id),
1292 8 : Some(self.timeline_id),
1293 8 : )
1294 6 : .await;
1295 :
1296 : // Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
1297 : // case our caller wants to use that for a deletion
1298 8 : if let Some(remote_client) = self.remote_client.as_ref() {
1299 8 : match remote_client.stop() {
1300 8 : Ok(()) => {}
1301 0 : Err(StopError::QueueUninitialized) => {
1302 0 : // Shutting down during initialization is legal
1303 0 : }
1304 : }
1305 0 : }
1306 :
1307 0 : tracing::debug!("Waiting for tasks...");
1308 :
1309 8 : task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;
1310 :
1311 : // Finally wait until any gate-holders are complete
1312 8 : self.gate.close().await;
1313 8 : }
1314 :
1315 294 : pub(crate) fn set_state(&self, new_state: TimelineState) {
1316 294 : match (self.current_state(), new_state) {
1317 294 : (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
1318 2 : info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
1319 : }
1320 0 : (st, TimelineState::Loading) => {
1321 0 : error!("ignoring transition from {st:?} into Loading state");
1322 : }
1323 0 : (TimelineState::Broken { .. }, new_state) => {
1324 0 : error!("Ignoring state update {new_state:?} for broken timeline");
1325 : }
1326 : (TimelineState::Stopping, TimelineState::Active) => {
1327 0 : error!("Not activating a Stopping timeline");
1328 : }
1329 292 : (_, new_state) => {
1330 292 : self.state.send_replace(new_state);
1331 292 : }
1332 : }
1333 294 : }
1334 :
1335 2 : pub(crate) fn set_broken(&self, reason: String) {
1336 2 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
1337 2 : let broken_state = TimelineState::Broken {
1338 2 : reason,
1339 2 : backtrace: backtrace_str,
1340 2 : };
1341 2 : self.set_state(broken_state);
1342 2 :
1343 2 : // Although the Broken state is not equivalent to shutdown() (shutdown will be called
1344 2 : // later when this tenant is detach or the process shuts down), firing the cancellation token
1345 2 : // here avoids the need for other tasks to watch for the Broken state explicitly.
1346 2 : self.cancel.cancel();
1347 2 : }
1348 :
1349 227862 : pub(crate) fn current_state(&self) -> TimelineState {
1350 227862 : self.state.borrow().clone()
1351 227862 : }
1352 :
1353 6 : pub(crate) fn is_broken(&self) -> bool {
1354 6 : matches!(&*self.state.borrow(), TimelineState::Broken { .. })
1355 6 : }
1356 :
1357 226752 : pub(crate) fn is_active(&self) -> bool {
1358 226752 : self.current_state() == TimelineState::Active
1359 226752 : }
1360 :
1361 816 : pub(crate) fn is_stopping(&self) -> bool {
1362 816 : self.current_state() == TimelineState::Stopping
1363 816 : }
1364 :
1365 0 : pub(crate) fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
1366 0 : self.state.subscribe()
1367 0 : }
1368 :
1369 226538 : pub(crate) async fn wait_to_become_active(
1370 226538 : &self,
1371 226538 : _ctx: &RequestContext, // Prepare for use by cancellation
1372 226538 : ) -> Result<(), TimelineState> {
1373 226538 : let mut receiver = self.state.subscribe();
1374 226538 : loop {
1375 226538 : let current_state = receiver.borrow().clone();
1376 226538 : match current_state {
1377 : TimelineState::Loading => {
1378 0 : receiver
1379 0 : .changed()
1380 0 : .await
1381 0 : .expect("holding a reference to self");
1382 : }
1383 : TimelineState::Active { .. } => {
1384 226536 : return Ok(());
1385 : }
1386 : TimelineState::Broken { .. } | TimelineState::Stopping => {
1387 : // There's no chance the timeline can transition back into ::Active
1388 2 : return Err(current_state);
1389 : }
1390 : }
1391 : }
1392 226538 : }
1393 :
1394 0 : pub(crate) async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
1395 0 : let guard = self.layers.read().await;
1396 0 : let layer_map = guard.layer_map();
1397 0 : let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
1398 0 : if let Some(open_layer) = &layer_map.open_layer {
1399 0 : in_memory_layers.push(open_layer.info());
1400 0 : }
1401 0 : for frozen_layer in &layer_map.frozen_layers {
1402 0 : in_memory_layers.push(frozen_layer.info());
1403 0 : }
1404 :
1405 0 : let mut historic_layers = Vec::new();
1406 0 : for historic_layer in layer_map.iter_historic_layers() {
1407 0 : let historic_layer = guard.get_from_desc(&historic_layer);
1408 0 : historic_layers.push(historic_layer.info(reset));
1409 0 : }
1410 :
1411 0 : LayerMapInfo {
1412 0 : in_memory_layers,
1413 0 : historic_layers,
1414 0 : }
1415 0 : }
1416 :
1417 0 : #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
1418 : pub(crate) async fn download_layer(
1419 : &self,
1420 : layer_file_name: &str,
1421 : ) -> anyhow::Result<Option<bool>> {
1422 : let Some(layer) = self.find_layer(layer_file_name).await else {
1423 : return Ok(None);
1424 : };
1425 :
1426 : if self.remote_client.is_none() {
1427 : return Ok(Some(false));
1428 : }
1429 :
1430 : layer.download().await?;
1431 :
1432 : Ok(Some(true))
1433 : }
1434 :
1435 : /// Evict just one layer.
1436 : ///
1437 : /// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`.
1438 0 : pub(crate) async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
1439 0 : let _gate = self
1440 0 : .gate
1441 0 : .enter()
1442 0 : .map_err(|_| anyhow::anyhow!("Shutting down"))?;
1443 :
1444 0 : let Some(local_layer) = self.find_layer(layer_file_name).await else {
1445 0 : return Ok(None);
1446 : };
1447 :
1448 0 : match local_layer.evict_and_wait().await {
1449 0 : Ok(()) => Ok(Some(true)),
1450 0 : Err(EvictionError::NotFound) => Ok(Some(false)),
1451 0 : Err(EvictionError::Downloaded) => Ok(Some(false)),
1452 : }
1453 0 : }
1454 : }
1455 :
1456 : /// Number of times we will compute partition within a checkpoint distance.
1457 : const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
1458 :
1459 : // Private functions
1460 : impl Timeline {
1461 0 : pub(crate) fn get_lazy_slru_download(&self) -> bool {
1462 0 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
1463 0 : tenant_conf
1464 0 : .lazy_slru_download
1465 0 : .unwrap_or(self.conf.default_tenant_conf.lazy_slru_download)
1466 0 : }
1467 :
1468 796 : fn get_checkpoint_distance(&self) -> u64 {
1469 796 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
1470 796 : tenant_conf
1471 796 : .checkpoint_distance
1472 796 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
1473 796 : }
1474 :
1475 48 : fn get_checkpoint_timeout(&self) -> Duration {
1476 48 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
1477 48 : tenant_conf
1478 48 : .checkpoint_timeout
1479 48 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
1480 48 : }
1481 :
1482 478 : fn get_compaction_target_size(&self) -> u64 {
1483 478 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
1484 478 : tenant_conf
1485 478 : .compaction_target_size
1486 478 : .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
1487 478 : }
1488 :
1489 408 : fn get_compaction_threshold(&self) -> usize {
1490 408 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
1491 408 : tenant_conf
1492 408 : .compaction_threshold
1493 408 : .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
1494 408 : }
1495 :
1496 408 : fn get_image_creation_threshold(&self) -> usize {
1497 408 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
1498 408 : tenant_conf
1499 408 : .image_creation_threshold
1500 408 : .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
1501 408 : }
1502 :
1503 0 : fn get_eviction_policy(&self) -> EvictionPolicy {
1504 0 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
1505 0 : tenant_conf
1506 0 : .eviction_policy
1507 0 : .unwrap_or(self.conf.default_tenant_conf.eviction_policy)
1508 0 : }
1509 :
1510 292 : fn get_evictions_low_residence_duration_metric_threshold(
1511 292 : tenant_conf: &TenantConfOpt,
1512 292 : default_tenant_conf: &TenantConf,
1513 292 : ) -> Duration {
1514 292 : tenant_conf
1515 292 : .evictions_low_residence_duration_metric_threshold
1516 292 : .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
1517 292 : }
1518 :
1519 1984 : fn get_gc_feedback(&self) -> bool {
1520 1984 : let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf.clone();
1521 1984 : tenant_conf
1522 1984 : .gc_feedback
1523 1984 : .unwrap_or(self.conf.default_tenant_conf.gc_feedback)
1524 1984 : }
1525 :
1526 0 : pub(super) fn tenant_conf_updated(&self) {
1527 0 : // NB: Most tenant conf options are read by background loops, so,
1528 0 : // changes will automatically be picked up.
1529 0 :
1530 0 : // The threshold is embedded in the metric. So, we need to update it.
1531 0 : {
1532 0 : let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
1533 0 : &self.tenant_conf.read().unwrap().tenant_conf,
1534 0 : &self.conf.default_tenant_conf,
1535 0 : );
1536 0 :
1537 0 : let tenant_id_str = self.tenant_shard_id.tenant_id.to_string();
1538 0 : let shard_id_str = format!("{}", self.tenant_shard_id.shard_slug());
1539 0 :
1540 0 : let timeline_id_str = self.timeline_id.to_string();
1541 0 : self.metrics
1542 0 : .evictions_with_low_residence_duration
1543 0 : .write()
1544 0 : .unwrap()
1545 0 : .change_threshold(
1546 0 : &tenant_id_str,
1547 0 : &shard_id_str,
1548 0 : &timeline_id_str,
1549 0 : new_threshold,
1550 0 : );
1551 0 : }
1552 0 : }
1553 :
1554 : /// Open a Timeline handle.
1555 : ///
1556 : /// Loads the metadata for the timeline into memory, but not the layer map.
1557 : #[allow(clippy::too_many_arguments)]
1558 292 : pub(super) fn new(
1559 292 : conf: &'static PageServerConf,
1560 292 : tenant_conf: Arc<RwLock<AttachedTenantConf>>,
1561 292 : metadata: &TimelineMetadata,
1562 292 : ancestor: Option<Arc<Timeline>>,
1563 292 : timeline_id: TimelineId,
1564 292 : tenant_shard_id: TenantShardId,
1565 292 : generation: Generation,
1566 292 : shard_identity: ShardIdentity,
1567 292 : walredo_mgr: Option<Arc<super::WalRedoManager>>,
1568 292 : resources: TimelineResources,
1569 292 : pg_version: u32,
1570 292 : state: TimelineState,
1571 292 : cancel: CancellationToken,
1572 292 : ) -> Arc<Self> {
1573 292 : let disk_consistent_lsn = metadata.disk_consistent_lsn();
1574 292 : let (state, _) = watch::channel(state);
1575 292 :
1576 292 : let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
1577 292 : let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
1578 292 :
1579 292 : let tenant_conf_guard = tenant_conf.read().unwrap();
1580 292 :
1581 292 : let evictions_low_residence_duration_metric_threshold =
1582 292 : Self::get_evictions_low_residence_duration_metric_threshold(
1583 292 : &tenant_conf_guard.tenant_conf,
1584 292 : &conf.default_tenant_conf,
1585 292 : );
1586 292 : drop(tenant_conf_guard);
1587 292 :
1588 292 : Arc::new_cyclic(|myself| {
1589 292 : let mut result = Timeline {
1590 292 : conf,
1591 292 : tenant_conf,
1592 292 : myself: myself.clone(),
1593 292 : timeline_id,
1594 292 : tenant_shard_id,
1595 292 : generation,
1596 292 : shard_identity,
1597 292 : pg_version,
1598 292 : layers: Default::default(),
1599 292 : wanted_image_layers: Mutex::new(None),
1600 292 :
1601 292 : walredo_mgr,
1602 292 : walreceiver: Mutex::new(None),
1603 292 :
1604 292 : remote_client: resources.remote_client.map(Arc::new),
1605 292 :
1606 292 : // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
1607 292 : last_record_lsn: SeqWait::new(RecordLsn {
1608 292 : last: disk_consistent_lsn,
1609 292 : prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
1610 292 : }),
1611 292 : disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
1612 292 :
1613 292 : last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
1614 292 : last_freeze_ts: RwLock::new(Instant::now()),
1615 292 :
1616 292 : loaded_at: (disk_consistent_lsn, SystemTime::now()),
1617 292 :
1618 292 : ancestor_timeline: ancestor,
1619 292 : ancestor_lsn: metadata.ancestor_lsn(),
1620 292 :
1621 292 : metrics: TimelineMetrics::new(
1622 292 : &tenant_shard_id,
1623 292 : &timeline_id,
1624 292 : crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
1625 292 : "mtime",
1626 292 : evictions_low_residence_duration_metric_threshold,
1627 292 : ),
1628 292 : ),
1629 292 :
1630 292 : query_metrics: crate::metrics::SmgrQueryTimePerTimeline::new(
1631 292 : &tenant_shard_id,
1632 292 : &timeline_id,
1633 292 : ),
1634 292 :
1635 2044 : directory_metrics: array::from_fn(|_| AtomicU64::new(0)),
1636 292 :
1637 292 : flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
1638 292 :
1639 292 : layer_flush_start_tx,
1640 292 : layer_flush_done_tx,
1641 292 :
1642 292 : write_lock: tokio::sync::Mutex::new(None),
1643 292 :
1644 292 : gc_info: std::sync::RwLock::new(GcInfo {
1645 292 : retain_lsns: Vec::new(),
1646 292 : horizon_cutoff: Lsn(0),
1647 292 : pitr_cutoff: Lsn(0),
1648 292 : }),
1649 292 :
1650 292 : latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
1651 292 : initdb_lsn: metadata.initdb_lsn(),
1652 292 :
1653 292 : current_logical_size: if disk_consistent_lsn.is_valid() {
1654 : // we're creating timeline data with some layer files existing locally,
1655 : // need to recalculate timeline's logical size based on data in the layers.
1656 216 : LogicalSize::deferred_initial(disk_consistent_lsn)
1657 : } else {
1658 : // we're creating timeline data without any layers existing locally,
1659 : // initial logical size is 0.
1660 76 : LogicalSize::empty_initial()
1661 : },
1662 292 : partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
1663 292 : repartition_threshold: 0,
1664 292 :
1665 292 : last_received_wal: Mutex::new(None),
1666 292 : rel_size_cache: RwLock::new(HashMap::new()),
1667 292 :
1668 292 : download_all_remote_layers_task_info: RwLock::new(None),
1669 292 :
1670 292 : state,
1671 292 :
1672 292 : eviction_task_timeline_state: tokio::sync::Mutex::new(
1673 292 : EvictionTaskTimelineState::default(),
1674 292 : ),
1675 292 : delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
1676 292 :
1677 292 : cancel,
1678 292 : gate: Gate::default(),
1679 292 :
1680 292 : compaction_lock: tokio::sync::Mutex::default(),
1681 292 : gc_lock: tokio::sync::Mutex::default(),
1682 292 :
1683 292 : timeline_get_throttle: resources.timeline_get_throttle,
1684 292 : };
1685 292 : result.repartition_threshold =
1686 292 : result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
1687 292 : result
1688 292 : .metrics
1689 292 : .last_record_gauge
1690 292 : .set(disk_consistent_lsn.0 as i64);
1691 292 : result
1692 292 : })
1693 292 : }
1694 :
1695 358 : pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
1696 358 : let Ok(guard) = self.gate.enter() else {
1697 0 : info!("cannot start flush loop when the timeline gate has already been closed");
1698 0 : return;
1699 : };
1700 358 : let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
1701 358 : match *flush_loop_state {
1702 288 : FlushLoopState::NotStarted => (),
1703 : FlushLoopState::Running { .. } => {
1704 70 : info!(
1705 70 : "skipping attempt to start flush_loop twice {}/{}",
1706 70 : self.tenant_shard_id, self.timeline_id
1707 70 : );
1708 70 : return;
1709 : }
1710 : FlushLoopState::Exited => {
1711 0 : warn!(
1712 0 : "ignoring attempt to restart exited flush_loop {}/{}",
1713 0 : self.tenant_shard_id, self.timeline_id
1714 0 : );
1715 0 : return;
1716 : }
1717 : }
1718 :
1719 288 : let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
1720 288 : let self_clone = Arc::clone(self);
1721 288 :
1722 288 : debug!("spawning flush loop");
1723 288 : *flush_loop_state = FlushLoopState::Running {
1724 288 : #[cfg(test)]
1725 288 : expect_initdb_optimization: false,
1726 288 : #[cfg(test)]
1727 288 : initdb_optimization_count: 0,
1728 288 : };
1729 288 : task_mgr::spawn(
1730 288 : task_mgr::BACKGROUND_RUNTIME.handle(),
1731 288 : task_mgr::TaskKind::LayerFlushTask,
1732 288 : Some(self.tenant_shard_id),
1733 288 : Some(self.timeline_id),
1734 288 : "layer flush task",
1735 : false,
1736 288 : async move {
1737 288 : let _guard = guard;
1738 288 : let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
1739 2774 : self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
1740 8 : let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
1741 8 : assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
1742 8 : *flush_loop_state = FlushLoopState::Exited;
1743 8 : Ok(())
1744 8 : }
1745 288 : .instrument(info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
1746 : );
1747 358 : }
1748 :
1749 : /// Creates and starts the wal receiver.
1750 : ///
1751 : /// This function is expected to be called at most once per Timeline's lifecycle
1752 : /// when the timeline is activated.
1753 0 : fn launch_wal_receiver(
1754 0 : self: &Arc<Self>,
1755 0 : ctx: &RequestContext,
1756 0 : broker_client: BrokerClientChannel,
1757 0 : ) {
1758 0 : info!(
1759 0 : "launching WAL receiver for timeline {} of tenant {}",
1760 0 : self.timeline_id, self.tenant_shard_id
1761 0 : );
1762 :
1763 0 : let tenant_conf_guard = self.tenant_conf.read().unwrap();
1764 0 : let wal_connect_timeout = tenant_conf_guard
1765 0 : .tenant_conf
1766 0 : .walreceiver_connect_timeout
1767 0 : .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
1768 0 : let lagging_wal_timeout = tenant_conf_guard
1769 0 : .tenant_conf
1770 0 : .lagging_wal_timeout
1771 0 : .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
1772 0 : let max_lsn_wal_lag = tenant_conf_guard
1773 0 : .tenant_conf
1774 0 : .max_lsn_wal_lag
1775 0 : .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
1776 0 : drop(tenant_conf_guard);
1777 0 :
1778 0 : let mut guard = self.walreceiver.lock().unwrap();
1779 0 : assert!(
1780 0 : guard.is_none(),
1781 0 : "multiple launches / re-launches of WAL receiver are not supported"
1782 : );
1783 0 : *guard = Some(WalReceiver::start(
1784 0 : Arc::clone(self),
1785 0 : WalReceiverConf {
1786 0 : wal_connect_timeout,
1787 0 : lagging_wal_timeout,
1788 0 : max_lsn_wal_lag,
1789 0 : auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
1790 0 : availability_zone: self.conf.availability_zone.clone(),
1791 0 : ingest_batch_size: self.conf.ingest_batch_size,
1792 0 : },
1793 0 : broker_client,
1794 0 : ctx,
1795 0 : ));
1796 0 : }
1797 :
1798 : /// Initialize with an empty layer map. Used when creating a new timeline.
1799 286 : pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
1800 286 : let mut layers = self.layers.try_write().expect(
1801 286 : "in the context where we call this function, no other task has access to the object",
1802 286 : );
1803 286 : layers.initialize_empty(Lsn(start_lsn.0));
1804 286 : }
1805 :
1806 : /// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
1807 : /// files.
1808 6 : pub(super) async fn load_layer_map(
1809 6 : &self,
1810 6 : disk_consistent_lsn: Lsn,
1811 6 : index_part: Option<IndexPart>,
1812 6 : ) -> anyhow::Result<()> {
1813 : use init::{Decision::*, Discovered, DismissedLayer};
1814 : use LayerFileName::*;
1815 :
1816 6 : let mut guard = self.layers.write().await;
1817 :
1818 6 : let timer = self.metrics.load_layer_map_histo.start_timer();
1819 6 :
1820 6 : // Scan timeline directory and create ImageFileName and DeltaFilename
1821 6 : // structs representing all files on disk
1822 6 : let timeline_path = self
1823 6 : .conf
1824 6 : .timeline_path(&self.tenant_shard_id, &self.timeline_id);
1825 6 : let conf = self.conf;
1826 6 : let span = tracing::Span::current();
1827 6 :
1828 6 : // Copy to move into the task we're about to spawn
1829 6 : let generation = self.generation;
1830 6 : let shard = self.get_shard_index();
1831 6 : let this = self.myself.upgrade().expect("&self method holds the arc");
1832 :
1833 6 : let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
1834 6 : move || {
1835 6 : let _g = span.entered();
1836 6 : let discovered = init::scan_timeline_dir(&timeline_path)?;
1837 6 : let mut discovered_layers = Vec::with_capacity(discovered.len());
1838 6 : let mut unrecognized_files = Vec::new();
1839 6 :
1840 6 : let mut path = timeline_path;
1841 :
1842 28 : for discovered in discovered {
1843 22 : let (name, kind) = match discovered {
1844 16 : Discovered::Layer(file_name, file_size) => {
1845 16 : discovered_layers.push((file_name, file_size));
1846 16 : continue;
1847 : }
1848 : Discovered::Metadata | Discovered::IgnoredBackup => {
1849 6 : continue;
1850 : }
1851 0 : Discovered::Unknown(file_name) => {
1852 0 : // we will later error if there are any
1853 0 : unrecognized_files.push(file_name);
1854 0 : continue;
1855 : }
1856 0 : Discovered::Ephemeral(name) => (name, "old ephemeral file"),
1857 0 : Discovered::Temporary(name) => (name, "temporary timeline file"),
1858 0 : Discovered::TemporaryDownload(name) => (name, "temporary download"),
1859 : };
1860 0 : path.push(Utf8Path::new(&name));
1861 0 : init::cleanup(&path, kind)?;
1862 0 : path.pop();
1863 : }
1864 :
1865 6 : if !unrecognized_files.is_empty() {
1866 : // assume that if there are any there are many many.
1867 0 : let n = unrecognized_files.len();
1868 0 : let first = &unrecognized_files[..n.min(10)];
1869 0 : anyhow::bail!(
1870 0 : "unrecognized files in timeline dir (total {n}), first 10: {first:?}"
1871 0 : );
1872 6 : }
1873 6 :
1874 6 : let decided = init::reconcile(
1875 6 : discovered_layers,
1876 6 : index_part.as_ref(),
1877 6 : disk_consistent_lsn,
1878 6 : generation,
1879 6 : shard,
1880 6 : );
1881 6 :
1882 6 : let mut loaded_layers = Vec::new();
1883 6 : let mut needs_cleanup = Vec::new();
1884 6 : let mut total_physical_size = 0;
1885 :
1886 22 : for (name, decision) in decided {
1887 16 : let decision = match decision {
1888 0 : Ok(UseRemote { local, remote }) => {
1889 0 : // Remote is authoritative, but we may still choose to retain
1890 0 : // the local file if the contents appear to match
1891 0 : if local.file_size() == remote.file_size() {
1892 : // Use the local file, but take the remote metadata so that we pick up
1893 : // the correct generation.
1894 0 : UseLocal(remote)
1895 : } else {
1896 0 : path.push(name.file_name());
1897 0 : init::cleanup_local_file_for_remote(&path, &local, &remote)?;
1898 0 : path.pop();
1899 0 : UseRemote { local, remote }
1900 : }
1901 : }
1902 16 : Ok(decision) => decision,
1903 0 : Err(DismissedLayer::Future { local }) => {
1904 0 : if local.is_some() {
1905 0 : path.push(name.file_name());
1906 0 : init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
1907 0 : path.pop();
1908 0 : }
1909 0 : needs_cleanup.push(name);
1910 0 : continue;
1911 : }
1912 0 : Err(DismissedLayer::LocalOnly(local)) => {
1913 0 : path.push(name.file_name());
1914 0 : init::cleanup_local_only_file(&path, &name, &local)?;
1915 0 : path.pop();
1916 0 : // this file never existed remotely, we will have to do rework
1917 0 : continue;
1918 : }
1919 : };
1920 :
1921 16 : match &name {
1922 12 : Delta(d) => assert!(d.lsn_range.end <= disk_consistent_lsn + 1),
1923 4 : Image(i) => assert!(i.lsn <= disk_consistent_lsn),
1924 : }
1925 :
1926 16 : tracing::debug!(layer=%name, ?decision, "applied");
1927 :
1928 16 : let layer = match decision {
1929 16 : UseLocal(m) => {
1930 16 : total_physical_size += m.file_size();
1931 16 : Layer::for_resident(conf, &this, name, m).drop_eviction_guard()
1932 : }
1933 0 : Evicted(remote) | UseRemote { remote, .. } => {
1934 0 : Layer::for_evicted(conf, &this, name, remote)
1935 : }
1936 : };
1937 :
1938 16 : loaded_layers.push(layer);
1939 : }
1940 6 : Ok((loaded_layers, needs_cleanup, total_physical_size))
1941 6 : }
1942 6 : })
1943 6 : .await
1944 6 : .map_err(anyhow::Error::new)
1945 6 : .and_then(|x| x)?;
1946 :
1947 6 : let num_layers = loaded_layers.len();
1948 6 :
1949 6 : guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
1950 :
1951 6 : if let Some(rtc) = self.remote_client.as_ref() {
1952 6 : rtc.schedule_layer_file_deletion(&needs_cleanup)?;
1953 6 : rtc.schedule_index_upload_for_file_changes()?;
1954 : // This barrier orders above DELETEs before any later operations.
1955 : // This is critical because code executing after the barrier might
1956 : // create again objects with the same key that we just scheduled for deletion.
1957 : // For example, if we just scheduled deletion of an image layer "from the future",
1958 : // later compaction might run again and re-create the same image layer.
1959 : // "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
1960 : // "same" here means same key range and LSN.
1961 : //
1962 : // Without a barrier between above DELETEs and the re-creation's PUTs,
1963 : // the upload queue may execute the PUT first, then the DELETE.
1964 : // In our example, we will end up with an IndexPart referencing a non-existent object.
1965 : //
1966 : // 1. a future image layer is created and uploaded
1967 : // 2. ps restart
1968 : // 3. the future layer from (1) is deleted during load layer map
1969 : // 4. image layer is re-created and uploaded
1970 : // 5. deletion queue would like to delete (1) but actually deletes (4)
1971 : // 6. delete by name works as expected, but it now deletes the wrong (later) version
1972 : //
1973 : // See https://github.com/neondatabase/neon/issues/5878
1974 : //
1975 : // NB: generation numbers naturally protect against this because they disambiguate
1976 : // (1) and (4)
1977 6 : rtc.schedule_barrier()?;
1978 : // Tenant::create_timeline will wait for these uploads to happen before returning, or
1979 : // on retry.
1980 0 : }
1981 :
1982 6 : info!(
1983 6 : "loaded layer map with {} layers at {}, total physical size: {}",
1984 6 : num_layers, disk_consistent_lsn, total_physical_size
1985 6 : );
1986 :
1987 6 : timer.stop_and_record();
1988 6 : Ok(())
1989 6 : }
1990 :
1991 : /// Retrieve current logical size of the timeline.
1992 : ///
1993 : /// The size could be lagging behind the actual number, in case
1994 : /// the initial size calculation has not been run (gets triggered on the first size access).
1995 : ///
1996 : /// return size and boolean flag that shows if the size is exact
1997 0 : pub(crate) fn get_current_logical_size(
1998 0 : self: &Arc<Self>,
1999 0 : priority: GetLogicalSizePriority,
2000 0 : ctx: &RequestContext,
2001 0 : ) -> logical_size::CurrentLogicalSize {
2002 0 : if !self.tenant_shard_id.is_zero() {
2003 : // Logical size is only accurately maintained on shard zero: when called elsewhere, for example
2004 : // when HTTP API is serving a GET for timeline zero, return zero
2005 0 : return logical_size::CurrentLogicalSize::Approximate(logical_size::Approximate::zero());
2006 0 : }
2007 0 :
2008 0 : let current_size = self.current_logical_size.current_size();
2009 0 : debug!("Current size: {current_size:?}");
2010 :
2011 0 : match (current_size.accuracy(), priority) {
2012 0 : (logical_size::Accuracy::Exact, _) => (), // nothing to do
2013 0 : (logical_size::Accuracy::Approximate, GetLogicalSizePriority::Background) => {
2014 0 : // background task will eventually deliver an exact value, we're in no rush
2015 0 : }
2016 : (logical_size::Accuracy::Approximate, GetLogicalSizePriority::User) => {
2017 : // background task is not ready, but user is asking for it now;
2018 : // => make the background task skip the line
2019 : // (The alternative would be to calculate the size here, but,
2020 : // it can actually take a long time if the user has a lot of rels.
2021 : // And we'll inevitable need it again; So, let the background task do the work.)
2022 0 : match self
2023 0 : .current_logical_size
2024 0 : .cancel_wait_for_background_loop_concurrency_limit_semaphore
2025 0 : .get()
2026 : {
2027 0 : Some(cancel) => cancel.cancel(),
2028 : None => {
2029 0 : let state = self.current_state();
2030 0 : if matches!(
2031 0 : state,
2032 : TimelineState::Broken { .. } | TimelineState::Stopping
2033 0 : ) {
2034 0 :
2035 0 : // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
2036 0 : // Don't make noise.
2037 0 : } else {
2038 0 : warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work");
2039 : }
2040 : }
2041 : };
2042 : }
2043 : }
2044 :
2045 0 : if let CurrentLogicalSize::Approximate(_) = ¤t_size {
2046 0 : if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler {
2047 0 : let first = self
2048 0 : .current_logical_size
2049 0 : .did_return_approximate_to_walreceiver
2050 0 : .compare_exchange(
2051 0 : false,
2052 0 : true,
2053 0 : AtomicOrdering::Relaxed,
2054 0 : AtomicOrdering::Relaxed,
2055 0 : )
2056 0 : .is_ok();
2057 0 : if first {
2058 0 : crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc();
2059 0 : }
2060 0 : }
2061 0 : }
2062 :
2063 0 : current_size
2064 0 : }
2065 :
2066 0 : fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &RequestContext) {
2067 0 : let Some(initial_part_end) = self.current_logical_size.initial_part_end else {
2068 : // nothing to do for freshly created timelines;
2069 0 : assert_eq!(
2070 0 : self.current_logical_size.current_size().accuracy(),
2071 0 : logical_size::Accuracy::Exact,
2072 0 : );
2073 0 : self.current_logical_size.initialized.add_permits(1);
2074 0 : return;
2075 : };
2076 :
2077 0 : let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new();
2078 0 : let token = cancel_wait_for_background_loop_concurrency_limit_semaphore.clone();
2079 0 : self.current_logical_size
2080 0 : .cancel_wait_for_background_loop_concurrency_limit_semaphore.set(token)
2081 0 : .expect("initial logical size calculation task must be spawned exactly once per Timeline object");
2082 0 :
2083 0 : let self_clone = Arc::clone(self);
2084 0 : let background_ctx = ctx.detached_child(
2085 0 : TaskKind::InitialLogicalSizeCalculation,
2086 0 : DownloadBehavior::Download,
2087 0 : );
2088 0 : task_mgr::spawn(
2089 0 : task_mgr::BACKGROUND_RUNTIME.handle(),
2090 0 : task_mgr::TaskKind::InitialLogicalSizeCalculation,
2091 0 : Some(self.tenant_shard_id),
2092 0 : Some(self.timeline_id),
2093 0 : "initial size calculation",
2094 : false,
2095 : // NB: don't log errors here, task_mgr will do that.
2096 0 : async move {
2097 0 : let cancel = task_mgr::shutdown_token();
2098 0 : self_clone
2099 0 : .initial_logical_size_calculation_task(
2100 0 : initial_part_end,
2101 0 : cancel_wait_for_background_loop_concurrency_limit_semaphore,
2102 0 : cancel,
2103 0 : background_ctx,
2104 0 : )
2105 0 : .await;
2106 0 : Ok(())
2107 0 : }
2108 0 : .instrument(info_span!(parent: None, "initial_size_calculation", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id)),
2109 : );
2110 0 : }
2111 :
2112 0 : async fn initial_logical_size_calculation_task(
2113 0 : self: Arc<Self>,
2114 0 : initial_part_end: Lsn,
2115 0 : skip_concurrency_limiter: CancellationToken,
2116 0 : cancel: CancellationToken,
2117 0 : background_ctx: RequestContext,
2118 0 : ) {
2119 0 : scopeguard::defer! {
2120 0 : // Irrespective of the outcome of this operation, we should unblock anyone waiting for it.
2121 0 : self.current_logical_size.initialized.add_permits(1);
2122 0 : }
2123 :
2124 : enum BackgroundCalculationError {
2125 : Cancelled,
2126 : Other(anyhow::Error),
2127 : }
2128 :
2129 0 : let try_once = |attempt: usize| {
2130 0 : let background_ctx = &background_ctx;
2131 0 : let self_ref = &self;
2132 0 : let skip_concurrency_limiter = &skip_concurrency_limiter;
2133 0 : async move {
2134 0 : let cancel = task_mgr::shutdown_token();
2135 0 : let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
2136 0 : BackgroundLoopKind::InitialLogicalSizeCalculation,
2137 0 : background_ctx,
2138 0 : );
2139 :
2140 : use crate::metrics::initial_logical_size::StartCircumstances;
2141 0 : let (_maybe_permit, circumstances) = tokio::select! {
2142 0 : permit = wait_for_permit => {
2143 : (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit)
2144 : }
2145 : _ = self_ref.cancel.cancelled() => {
2146 : return Err(BackgroundCalculationError::Cancelled);
2147 : }
2148 : _ = cancel.cancelled() => {
2149 : return Err(BackgroundCalculationError::Cancelled);
2150 : },
2151 : () = skip_concurrency_limiter.cancelled() => {
2152 : // Some action that is part of a end user interaction requested logical size
2153 : // => break out of the rate limit
2154 : // TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime;
2155 : // but then again what happens if they cancel; also, we should just be using
2156 : // one runtime across the entire process, so, let's leave this for now.
2157 : (None, StartCircumstances::SkippedConcurrencyLimiter)
2158 : }
2159 : };
2160 :
2161 0 : let metrics_guard = if attempt == 1 {
2162 0 : crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances)
2163 : } else {
2164 0 : crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
2165 : };
2166 :
2167 0 : match self_ref
2168 0 : .logical_size_calculation_task(
2169 0 : initial_part_end,
2170 0 : LogicalSizeCalculationCause::Initial,
2171 0 : background_ctx,
2172 0 : )
2173 0 : .await
2174 : {
2175 0 : Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
2176 : Err(CalculateLogicalSizeError::Cancelled) => {
2177 0 : Err(BackgroundCalculationError::Cancelled)
2178 : }
2179 0 : Err(CalculateLogicalSizeError::Other(err)) => {
2180 0 : if let Some(PageReconstructError::AncestorStopping(_)) =
2181 0 : err.root_cause().downcast_ref()
2182 : {
2183 0 : Err(BackgroundCalculationError::Cancelled)
2184 : } else {
2185 0 : Err(BackgroundCalculationError::Other(err))
2186 : }
2187 : }
2188 : }
2189 0 : }
2190 0 : };
2191 :
2192 0 : let retrying = async {
2193 0 : let mut attempt = 0;
2194 0 : loop {
2195 0 : attempt += 1;
2196 0 :
2197 0 : match try_once(attempt).await {
2198 0 : Ok(res) => return ControlFlow::Continue(res),
2199 0 : Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
2200 0 : Err(BackgroundCalculationError::Other(e)) => {
2201 0 : warn!(attempt, "initial size calculation failed: {e:?}");
2202 : // exponential back-off doesn't make sense at these long intervals;
2203 : // use fixed retry interval with generous jitter instead
2204 0 : let sleep_duration = Duration::from_secs(
2205 0 : u64::try_from(
2206 0 : // 1hour base
2207 0 : (60_i64 * 60_i64)
2208 0 : // 10min jitter
2209 0 : + rand::thread_rng().gen_range(-10 * 60..10 * 60),
2210 0 : )
2211 0 : .expect("10min < 1hour"),
2212 0 : );
2213 0 : tokio::time::sleep(sleep_duration).await;
2214 : }
2215 : }
2216 : }
2217 0 : };
2218 :
2219 0 : let (calculated_size, metrics_guard) = tokio::select! {
2220 0 : res = retrying => {
2221 : match res {
2222 : ControlFlow::Continue(calculated_size) => calculated_size,
2223 : ControlFlow::Break(()) => return,
2224 : }
2225 : }
2226 : _ = cancel.cancelled() => {
2227 : return;
2228 : }
2229 : };
2230 :
2231 : // we cannot query current_logical_size.current_size() to know the current
2232 : // *negative* value, only truncated to u64.
2233 0 : let added = self
2234 0 : .current_logical_size
2235 0 : .size_added_after_initial
2236 0 : .load(AtomicOrdering::Relaxed);
2237 0 :
2238 0 : let sum = calculated_size.saturating_add_signed(added);
2239 0 :
2240 0 : // set the gauge value before it can be set in `update_current_logical_size`.
2241 0 : self.metrics.current_logical_size_gauge.set(sum);
2242 0 :
2243 0 : self.current_logical_size
2244 0 : .initial_logical_size
2245 0 : .set((calculated_size, metrics_guard.calculation_result_saved()))
2246 0 : .ok()
2247 0 : .expect("only this task sets it");
2248 0 : }
2249 :
2250 0 : pub(crate) fn spawn_ondemand_logical_size_calculation(
2251 0 : self: &Arc<Self>,
2252 0 : lsn: Lsn,
2253 0 : cause: LogicalSizeCalculationCause,
2254 0 : ctx: RequestContext,
2255 0 : ) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
2256 0 : let (sender, receiver) = oneshot::channel();
2257 0 : let self_clone = Arc::clone(self);
2258 0 : // XXX if our caller loses interest, i.e., ctx is cancelled,
2259 0 : // we should stop the size calculation work and return an error.
2260 0 : // That would require restructuring this function's API to
2261 0 : // return the result directly, instead of a Receiver for the result.
2262 0 : let ctx = ctx.detached_child(
2263 0 : TaskKind::OndemandLogicalSizeCalculation,
2264 0 : DownloadBehavior::Download,
2265 0 : );
2266 0 : task_mgr::spawn(
2267 0 : task_mgr::BACKGROUND_RUNTIME.handle(),
2268 0 : task_mgr::TaskKind::OndemandLogicalSizeCalculation,
2269 0 : Some(self.tenant_shard_id),
2270 0 : Some(self.timeline_id),
2271 0 : "ondemand logical size calculation",
2272 0 : false,
2273 0 : async move {
2274 0 : let res = self_clone
2275 0 : .logical_size_calculation_task(lsn, cause, &ctx)
2276 0 : .await;
2277 0 : let _ = sender.send(res).ok();
2278 0 : Ok(()) // Receiver is responsible for handling errors
2279 0 : }
2280 0 : .in_current_span(),
2281 0 : );
2282 0 : receiver
2283 0 : }
2284 :
2285 : /// # Cancel-Safety
2286 : ///
2287 : /// This method is cancellation-safe.
2288 0 : #[instrument(skip_all)]
2289 : async fn logical_size_calculation_task(
2290 : self: &Arc<Self>,
2291 : lsn: Lsn,
2292 : cause: LogicalSizeCalculationCause,
2293 : ctx: &RequestContext,
2294 : ) -> Result<u64, CalculateLogicalSizeError> {
2295 : crate::span::debug_assert_current_span_has_tenant_and_timeline_id();
2296 : // We should never be calculating logical sizes on shard !=0, because these shards do not have
2297 : // accurate relation sizes, and they do not emit consumption metrics.
2298 : debug_assert!(self.tenant_shard_id.is_zero());
2299 :
2300 : let _guard = self.gate.enter();
2301 :
2302 : let self_calculation = Arc::clone(self);
2303 :
2304 0 : let mut calculation = pin!(async {
2305 0 : let ctx = ctx.attached_child();
2306 0 : self_calculation
2307 0 : .calculate_logical_size(lsn, cause, &ctx)
2308 0 : .await
2309 0 : });
2310 :
2311 0 : tokio::select! {
2312 0 : res = &mut calculation => { res }
2313 : _ = self.cancel.cancelled() => {
2314 0 : debug!("cancelling logical size calculation for timeline shutdown");
2315 : calculation.await
2316 : }
2317 : _ = task_mgr::shutdown_watcher() => {
2318 0 : debug!("cancelling logical size calculation for task shutdown");
2319 : calculation.await
2320 : }
2321 : }
2322 : }
2323 :
2324 : /// Calculate the logical size of the database at the latest LSN.
2325 : ///
2326 : /// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
2327 : /// especially if we need to download remote layers.
2328 : ///
2329 : /// # Cancel-Safety
2330 : ///
2331 : /// This method is cancellation-safe.
2332 0 : async fn calculate_logical_size(
2333 0 : &self,
2334 0 : up_to_lsn: Lsn,
2335 0 : cause: LogicalSizeCalculationCause,
2336 0 : ctx: &RequestContext,
2337 0 : ) -> Result<u64, CalculateLogicalSizeError> {
2338 0 : info!(
2339 0 : "Calculating logical size for timeline {} at {}",
2340 0 : self.timeline_id, up_to_lsn
2341 0 : );
2342 : // These failpoints are used by python tests to ensure that we don't delete
2343 : // the timeline while the logical size computation is ongoing.
2344 : // The first failpoint is used to make this function pause.
2345 : // Then the python test initiates timeline delete operation in a thread.
2346 : // It waits for a few seconds, then arms the second failpoint and disables
2347 : // the first failpoint. The second failpoint prints an error if the timeline
2348 : // delete code has deleted the on-disk state while we're still running here.
2349 : // It shouldn't do that. If it does it anyway, the error will be caught
2350 : // by the test suite, highlighting the problem.
2351 0 : fail::fail_point!("timeline-calculate-logical-size-pause");
2352 0 : fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
2353 0 : if !self
2354 0 : .conf
2355 0 : .metadata_path(&self.tenant_shard_id, &self.timeline_id)
2356 0 : .exists()
2357 : {
2358 0 : error!("timeline-calculate-logical-size-pre metadata file does not exist")
2359 0 : }
2360 : // need to return something
2361 0 : Ok(0)
2362 0 : });
2363 : // See if we've already done the work for initial size calculation.
2364 : // This is a short-cut for timelines that are mostly unused.
2365 0 : if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
2366 0 : return Ok(size);
2367 0 : }
2368 0 : let storage_time_metrics = match cause {
2369 : LogicalSizeCalculationCause::Initial
2370 : | LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
2371 0 : | LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
2372 : LogicalSizeCalculationCause::EvictionTaskImitation => {
2373 0 : &self.metrics.imitate_logical_size_histo
2374 : }
2375 : };
2376 0 : let timer = storage_time_metrics.start_timer();
2377 0 : let logical_size = self
2378 0 : .get_current_logical_size_non_incremental(up_to_lsn, ctx)
2379 0 : .await?;
2380 0 : debug!("calculated logical size: {logical_size}");
2381 0 : timer.stop_and_record();
2382 0 : Ok(logical_size)
2383 0 : }
2384 :
2385 : /// Update current logical size, adding `delta' to the old value.
2386 270570 : fn update_current_logical_size(&self, delta: i64) {
2387 270570 : let logical_size = &self.current_logical_size;
2388 270570 : logical_size.increment_size(delta);
2389 270570 :
2390 270570 : // Also set the value in the prometheus gauge. Note that
2391 270570 : // there is a race condition here: if this is is called by two
2392 270570 : // threads concurrently, the prometheus gauge might be set to
2393 270570 : // one value while current_logical_size is set to the
2394 270570 : // other.
2395 270570 : match logical_size.current_size() {
2396 270570 : CurrentLogicalSize::Exact(ref new_current_size) => self
2397 270570 : .metrics
2398 270570 : .current_logical_size_gauge
2399 270570 : .set(new_current_size.into()),
2400 0 : CurrentLogicalSize::Approximate(_) => {
2401 0 : // don't update the gauge yet, this allows us not to update the gauge back and
2402 0 : // forth between the initial size calculation task.
2403 0 : }
2404 : }
2405 270570 : }
2406 :
2407 2400 : pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: u64) {
2408 2400 : self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed);
2409 2400 : let aux_metric =
2410 2400 : self.directory_metrics[DirectoryKind::AuxFiles.offset()].load(AtomicOrdering::Relaxed);
2411 2400 :
2412 2400 : let sum_of_entries = self
2413 2400 : .directory_metrics
2414 2400 : .iter()
2415 16800 : .map(|v| v.load(AtomicOrdering::Relaxed))
2416 2400 : .sum();
2417 2400 : // Set a high general threshold and a lower threshold for the auxiliary files,
2418 2400 : // as we can have large numbers of relations in the db directory.
2419 2400 : const SUM_THRESHOLD: u64 = 5000;
2420 2400 : const AUX_THRESHOLD: u64 = 1000;
2421 2400 : if sum_of_entries >= SUM_THRESHOLD || aux_metric >= AUX_THRESHOLD {
2422 0 : self.metrics
2423 0 : .directory_entries_count_gauge
2424 0 : .set(sum_of_entries);
2425 2400 : } else if let Some(metric) = Lazy::get(&self.metrics.directory_entries_count_gauge) {
2426 0 : metric.set(sum_of_entries);
2427 2400 : }
2428 2400 : }
2429 :
2430 0 : async fn find_layer(&self, layer_file_name: &str) -> Option<Layer> {
2431 0 : let guard = self.layers.read().await;
2432 0 : for historic_layer in guard.layer_map().iter_historic_layers() {
2433 0 : let historic_layer_name = historic_layer.filename().file_name();
2434 0 : if layer_file_name == historic_layer_name {
2435 0 : return Some(guard.get_from_desc(&historic_layer));
2436 0 : }
2437 : }
2438 :
2439 0 : None
2440 0 : }
2441 :
2442 : /// The timeline heatmap is a hint to secondary locations from the primary location,
2443 : /// indicating which layers are currently on-disk on the primary.
2444 : ///
2445 : /// None is returned if the Timeline is in a state where uploading a heatmap
2446 : /// doesn't make sense, such as shutting down or initializing. The caller
2447 : /// should treat this as a cue to simply skip doing any heatmap uploading
2448 : /// for this timeline.
2449 0 : pub(crate) async fn generate_heatmap(&self) -> Option<HeatMapTimeline> {
2450 : // no point in heatmaps without remote client
2451 0 : let _remote_client = self.remote_client.as_ref()?;
2452 :
2453 0 : if !self.is_active() {
2454 0 : return None;
2455 0 : }
2456 :
2457 0 : let guard = self.layers.read().await;
2458 :
2459 0 : let resident = guard.resident_layers().map(|layer| {
2460 0 : let last_activity_ts = layer.access_stats().latest_activity_or_now();
2461 0 :
2462 0 : HeatMapLayer::new(
2463 0 : layer.layer_desc().filename(),
2464 0 : layer.metadata().into(),
2465 0 : last_activity_ts,
2466 0 : )
2467 0 : });
2468 :
2469 0 : let layers = resident.collect().await;
2470 :
2471 0 : Some(HeatMapTimeline::new(self.timeline_id, layers))
2472 0 : }
2473 : }
2474 :
2475 : type TraversalId = String;
2476 :
2477 : trait TraversalLayerExt {
2478 : fn traversal_id(&self) -> TraversalId;
2479 : }
2480 :
2481 : impl TraversalLayerExt for Layer {
2482 104 : fn traversal_id(&self) -> TraversalId {
2483 104 : self.local_path().to_string()
2484 104 : }
2485 : }
2486 :
2487 : impl TraversalLayerExt for Arc<InMemoryLayer> {
2488 4 : fn traversal_id(&self) -> TraversalId {
2489 4 : format!("timeline {} in-memory {self}", self.get_timeline_id())
2490 4 : }
2491 : }
2492 :
2493 : impl Timeline {
2494 : ///
2495 : /// Get a handle to a Layer for reading.
2496 : ///
2497 : /// The returned Layer might be from an ancestor timeline, if the
2498 : /// segment hasn't been updated on this timeline yet.
2499 : ///
2500 : /// This function takes the current timeline's locked LayerMap as an argument,
2501 : /// so callers can avoid potential race conditions.
2502 : ///
2503 : /// # Cancel-Safety
2504 : ///
2505 : /// This method is cancellation-safe.
2506 502486 : async fn get_reconstruct_data(
2507 502486 : &self,
2508 502486 : key: Key,
2509 502486 : request_lsn: Lsn,
2510 502486 : reconstruct_state: &mut ValueReconstructState,
2511 502486 : ctx: &RequestContext,
2512 502486 : ) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
2513 502486 : // Start from the current timeline.
2514 502486 : let mut timeline_owned;
2515 502486 : let mut timeline = self;
2516 502486 :
2517 502486 : let mut read_count = scopeguard::guard(0, |cnt| {
2518 502486 : crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
2519 502486 : });
2520 502486 :
2521 502486 : // For debugging purposes, collect the path of layers that we traversed
2522 502486 : // through. It's included in the error message if we fail to find the key.
2523 502486 : let mut traversal_path = Vec::<TraversalPathItem>::new();
2524 :
2525 502486 : let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
2526 0 : *cached_lsn
2527 : } else {
2528 502486 : Lsn(0)
2529 : };
2530 :
2531 : // 'prev_lsn' tracks the last LSN that we were at in our search. It's used
2532 : // to check that each iteration make some progress, to break infinite
2533 : // looping if something goes wrong.
2534 502486 : let mut prev_lsn = Lsn(u64::MAX);
2535 502486 :
2536 502486 : let mut result = ValueReconstructResult::Continue;
2537 502486 : let mut cont_lsn = Lsn(request_lsn.0 + 1);
2538 :
2539 1356187 : 'outer: loop {
2540 1356187 : if self.cancel.is_cancelled() {
2541 0 : return Err(PageReconstructError::Cancelled);
2542 1356187 : }
2543 1356187 :
2544 1356187 : // The function should have updated 'state'
2545 1356187 : //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
2546 1356187 : match result {
2547 502378 : ValueReconstructResult::Complete => return Ok(traversal_path),
2548 : ValueReconstructResult::Continue => {
2549 : // If we reached an earlier cached page image, we're done.
2550 853803 : if cont_lsn == cached_lsn + 1 {
2551 0 : MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
2552 0 : return Ok(traversal_path);
2553 853803 : }
2554 853803 : if prev_lsn <= cont_lsn {
2555 : // Didn't make any progress in last iteration. Error out to avoid
2556 : // getting stuck in the loop.
2557 100 : return Err(layer_traversal_error(format!(
2558 100 : "could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
2559 100 : key,
2560 100 : Lsn(cont_lsn.0 - 1),
2561 100 : request_lsn,
2562 100 : timeline.ancestor_lsn
2563 100 : ), traversal_path));
2564 853703 : }
2565 853703 : prev_lsn = cont_lsn;
2566 : }
2567 : ValueReconstructResult::Missing => {
2568 : return Err(layer_traversal_error(
2569 6 : if cfg!(test) {
2570 6 : format!(
2571 6 : "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}\n{}",
2572 6 : key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
2573 6 : )
2574 : } else {
2575 0 : format!(
2576 0 : "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}",
2577 0 : key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn
2578 0 : )
2579 : },
2580 6 : traversal_path,
2581 : ));
2582 : }
2583 : }
2584 :
2585 : // Recurse into ancestor if needed
2586 853703 : if is_inherited_key(key) && Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
2587 0 : trace!(
2588 0 : "going into ancestor {}, cont_lsn is {}",
2589 0 : timeline.ancestor_lsn,
2590 0 : cont_lsn
2591 0 : );
2592 :
2593 226538 : timeline_owned = timeline.get_ready_ancestor_timeline(ctx).await?;
2594 226536 : timeline = &*timeline_owned;
2595 226536 : prev_lsn = Lsn(u64::MAX);
2596 226536 : continue 'outer;
2597 627165 : }
2598 :
2599 627165 : let guard = timeline.layers.read().await;
2600 627165 : let layers = guard.layer_map();
2601 :
2602 : // Check the open and frozen in-memory layers first, in order from newest
2603 : // to oldest.
2604 627165 : if let Some(open_layer) = &layers.open_layer {
2605 556028 : let start_lsn = open_layer.get_lsn_range().start;
2606 556028 : if cont_lsn > start_lsn {
2607 : //info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
2608 : // Get all the data needed to reconstruct the page version from this layer.
2609 : // But if we have an older cached page image, no need to go past that.
2610 502232 : let lsn_floor = max(cached_lsn + 1, start_lsn);
2611 502232 : result = match open_layer
2612 502232 : .get_value_reconstruct_data(
2613 502232 : key,
2614 502232 : lsn_floor..cont_lsn,
2615 502232 : reconstruct_state,
2616 502232 : ctx,
2617 502232 : )
2618 8395 : .await
2619 : {
2620 502232 : Ok(result) => result,
2621 0 : Err(e) => return Err(PageReconstructError::from(e)),
2622 : };
2623 502232 : cont_lsn = lsn_floor;
2624 502232 : // metrics: open_layer does not count as fs access, so we are not updating `read_count`
2625 502232 : traversal_path.push((
2626 502232 : result,
2627 502232 : cont_lsn,
2628 502232 : Box::new({
2629 502232 : let open_layer = Arc::clone(open_layer);
2630 502232 : move || open_layer.traversal_id()
2631 502232 : }),
2632 502232 : ));
2633 502232 : continue 'outer;
2634 53796 : }
2635 71137 : }
2636 124933 : for frozen_layer in layers.frozen_layers.iter().rev() {
2637 980 : let start_lsn = frozen_layer.get_lsn_range().start;
2638 980 : if cont_lsn > start_lsn {
2639 : //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
2640 980 : let lsn_floor = max(cached_lsn + 1, start_lsn);
2641 980 : result = match frozen_layer
2642 980 : .get_value_reconstruct_data(
2643 980 : key,
2644 980 : lsn_floor..cont_lsn,
2645 980 : reconstruct_state,
2646 980 : ctx,
2647 980 : )
2648 0 : .await
2649 : {
2650 980 : Ok(result) => result,
2651 0 : Err(e) => return Err(PageReconstructError::from(e)),
2652 : };
2653 980 : cont_lsn = lsn_floor;
2654 980 : // metrics: open_layer does not count as fs access, so we are not updating `read_count`
2655 980 : traversal_path.push((
2656 980 : result,
2657 980 : cont_lsn,
2658 980 : Box::new({
2659 980 : let frozen_layer = Arc::clone(frozen_layer);
2660 980 : move || frozen_layer.traversal_id()
2661 980 : }),
2662 980 : ));
2663 980 : continue 'outer;
2664 0 : }
2665 : }
2666 :
2667 123953 : if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
2668 123851 : let layer = guard.get_from_desc(&layer);
2669 123851 : // Get all the data needed to reconstruct the page version from this layer.
2670 123851 : // But if we have an older cached page image, no need to go past that.
2671 123851 : let lsn_floor = max(cached_lsn + 1, lsn_floor);
2672 123851 : result = match layer
2673 123851 : .get_value_reconstruct_data(key, lsn_floor..cont_lsn, reconstruct_state, ctx)
2674 23596 : .await
2675 : {
2676 123851 : Ok(result) => result,
2677 0 : Err(e) => return Err(PageReconstructError::from(e)),
2678 : };
2679 123851 : cont_lsn = lsn_floor;
2680 123851 : *read_count += 1;
2681 123851 : traversal_path.push((
2682 123851 : result,
2683 123851 : cont_lsn,
2684 123851 : Box::new({
2685 123851 : let layer = layer.to_owned();
2686 123851 : move || layer.traversal_id()
2687 123851 : }),
2688 123851 : ));
2689 123851 : continue 'outer;
2690 102 : } else if timeline.ancestor_timeline.is_some() {
2691 : // Nothing on this timeline. Traverse to parent
2692 100 : result = ValueReconstructResult::Continue;
2693 100 : cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
2694 100 : continue 'outer;
2695 : } else {
2696 : // Nothing found
2697 2 : result = ValueReconstructResult::Missing;
2698 2 : continue 'outer;
2699 : }
2700 : }
2701 502486 : }
2702 :
2703 : /// Get the data needed to reconstruct all keys in the provided keyspace
2704 : ///
2705 : /// The algorithm is as follows:
2706 : /// 1. While some keys are still not done and there's a timeline to visit:
2707 : /// 2. Visit the timeline (see [`Timeline::get_vectored_reconstruct_data_timeline`]:
2708 : /// 2.1: Build the fringe for the current keyspace
2709 : /// 2.2 Visit the newest layer from the fringe to collect all values for the range it
2710 : /// intersects
2711 : /// 2.3. Pop the timeline from the fringe
2712 : /// 2.4. If the fringe is empty, go back to 1
2713 10 : async fn get_vectored_reconstruct_data(
2714 10 : &self,
2715 10 : mut keyspace: KeySpace,
2716 10 : request_lsn: Lsn,
2717 10 : reconstruct_state: &mut ValuesReconstructState,
2718 10 : ctx: &RequestContext,
2719 10 : ) -> Result<(), GetVectoredError> {
2720 10 : let mut timeline_owned: Arc<Timeline>;
2721 10 : let mut timeline = self;
2722 10 :
2723 10 : let mut cont_lsn = Lsn(request_lsn.0 + 1);
2724 :
2725 : loop {
2726 10 : if self.cancel.is_cancelled() {
2727 0 : return Err(GetVectoredError::Cancelled);
2728 10 : }
2729 :
2730 10 : let completed = Self::get_vectored_reconstruct_data_timeline(
2731 10 : timeline,
2732 10 : keyspace.clone(),
2733 10 : cont_lsn,
2734 10 : reconstruct_state,
2735 10 : &self.cancel,
2736 10 : ctx,
2737 10 : )
2738 28 : .await?;
2739 :
2740 10 : keyspace.remove_overlapping_with(&completed);
2741 10 : if keyspace.total_size() == 0 || timeline.ancestor_timeline.is_none() {
2742 10 : break;
2743 0 : }
2744 0 :
2745 0 : cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
2746 0 : timeline_owned = timeline
2747 0 : .get_ready_ancestor_timeline(ctx)
2748 0 : .await
2749 0 : .map_err(GetVectoredError::GetReadyAncestorError)?;
2750 0 : timeline = &*timeline_owned;
2751 : }
2752 :
2753 10 : if keyspace.total_size() != 0 {
2754 0 : return Err(GetVectoredError::MissingKey(keyspace.start().unwrap()));
2755 10 : }
2756 10 :
2757 10 : Ok(())
2758 10 : }
2759 :
2760 : /// Collect the reconstruct data for a ketspace from the specified timeline.
2761 : ///
2762 : /// Maintain a fringe [`LayerFringe`] which tracks all the layers that intersect
2763 : /// the current keyspace. The current keyspace of the search at any given timeline
2764 : /// is the original keyspace minus all the keys that have been completed minus
2765 : /// any keys for which we couldn't find an intersecting layer. It's not tracked explicitly,
2766 : /// but if you merge all the keyspaces in the fringe, you get the "current keyspace".
2767 : ///
2768 : /// This is basically a depth-first search visitor implementation where a vertex
2769 : /// is the (layer, lsn range, key space) tuple. The fringe acts as the stack.
2770 : ///
2771 : /// At each iteration pop the top of the fringe (the layer with the highest Lsn)
2772 : /// and get all the required reconstruct data from the layer in one go.
2773 10 : async fn get_vectored_reconstruct_data_timeline(
2774 10 : timeline: &Timeline,
2775 10 : keyspace: KeySpace,
2776 10 : mut cont_lsn: Lsn,
2777 10 : reconstruct_state: &mut ValuesReconstructState,
2778 10 : cancel: &CancellationToken,
2779 10 : ctx: &RequestContext,
2780 10 : ) -> Result<KeySpace, GetVectoredError> {
2781 10 : let mut unmapped_keyspace = keyspace.clone();
2782 10 : let mut fringe = LayerFringe::new();
2783 10 :
2784 10 : let mut completed_keyspace = KeySpace::default();
2785 :
2786 : // Hold the layer map whilst visiting the timeline to prevent
2787 : // compaction, eviction and flushes from rendering the layers unreadable.
2788 : //
2789 : // TODO: Do we actually need to do this? In theory holding on
2790 : // to [`tenant::storage_layer::Layer`] should be enough. However,
2791 : // [`Timeline::get`] also holds the lock during IO, so more investigation
2792 : // is needed.
2793 10 : let guard = timeline.layers.read().await;
2794 10 : let layers = guard.layer_map();
2795 :
2796 20 : 'outer: loop {
2797 20 : if cancel.is_cancelled() {
2798 0 : return Err(GetVectoredError::Cancelled);
2799 20 : }
2800 20 :
2801 20 : let keys_done_last_step = reconstruct_state.consume_done_keys();
2802 20 : unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
2803 20 : completed_keyspace.merge(&keys_done_last_step);
2804 20 :
2805 20 : let in_memory_layer = layers.find_in_memory_layer(|l| {
2806 0 : let start_lsn = l.get_lsn_range().start;
2807 0 : cont_lsn > start_lsn
2808 20 : });
2809 20 :
2810 20 : match in_memory_layer {
2811 0 : Some(l) => {
2812 0 : fringe.update(
2813 0 : ReadableLayerDesc::InMemory {
2814 0 : handle: l,
2815 0 : lsn_ceil: cont_lsn,
2816 0 : },
2817 0 : unmapped_keyspace.clone(),
2818 0 : );
2819 0 : }
2820 : None => {
2821 20 : for range in unmapped_keyspace.ranges.iter() {
2822 10 : let results = match layers.range_search(range.clone(), cont_lsn) {
2823 10 : Some(res) => res,
2824 : None => {
2825 0 : break 'outer;
2826 : }
2827 : };
2828 :
2829 10 : results
2830 10 : .found
2831 10 : .into_iter()
2832 10 : .map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
2833 10 : (
2834 10 : ReadableLayerDesc::Persistent {
2835 10 : desc: (*layer).clone(),
2836 10 : lsn_floor,
2837 10 : lsn_ceil: cont_lsn,
2838 10 : },
2839 10 : keyspace_accum.to_keyspace(),
2840 10 : )
2841 10 : })
2842 10 : .for_each(|(layer, keyspace)| fringe.update(layer, keyspace));
2843 : }
2844 : }
2845 : }
2846 :
2847 20 : if let Some((layer_to_read, keyspace_to_read)) = fringe.next_layer() {
2848 10 : layer_to_read
2849 10 : .get_values_reconstruct_data(
2850 10 : &guard,
2851 10 : keyspace_to_read.clone(),
2852 10 : reconstruct_state,
2853 10 : ctx,
2854 10 : )
2855 28 : .await?;
2856 :
2857 10 : unmapped_keyspace = keyspace_to_read;
2858 10 : cont_lsn = layer_to_read.get_lsn_floor();
2859 : } else {
2860 10 : break;
2861 : }
2862 : }
2863 :
2864 10 : Ok(completed_keyspace)
2865 10 : }
2866 :
2867 : /// # Cancel-safety
2868 : ///
2869 : /// This method is cancellation-safe.
2870 502486 : async fn lookup_cached_page(
2871 502486 : &self,
2872 502486 : key: &Key,
2873 502486 : lsn: Lsn,
2874 502486 : ctx: &RequestContext,
2875 502486 : ) -> Option<(Lsn, Bytes)> {
2876 502486 : let cache = page_cache::get();
2877 :
2878 : // FIXME: It's pointless to check the cache for things that are not 8kB pages.
2879 : // We should look at the key to determine if it's a cacheable object
2880 502486 : let (lsn, read_guard) = cache
2881 502486 : .lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
2882 502486 : .await?;
2883 0 : let img = Bytes::from(read_guard.to_vec());
2884 0 : Some((lsn, img))
2885 502486 : }
2886 :
2887 226538 : async fn get_ready_ancestor_timeline(
2888 226538 : &self,
2889 226538 : ctx: &RequestContext,
2890 226538 : ) -> Result<Arc<Timeline>, GetReadyAncestorError> {
2891 226538 : let ancestor = match self.get_ancestor_timeline() {
2892 226538 : Ok(timeline) => timeline,
2893 0 : Err(e) => return Err(GetReadyAncestorError::from(e)),
2894 : };
2895 :
2896 : // It's possible that the ancestor timeline isn't active yet, or
2897 : // is active but hasn't yet caught up to the branch point. Wait
2898 : // for it.
2899 : //
2900 : // This cannot happen while the pageserver is running normally,
2901 : // because you cannot create a branch from a point that isn't
2902 : // present in the pageserver yet. However, we don't wait for the
2903 : // branch point to be uploaded to cloud storage before creating
2904 : // a branch. I.e., the branch LSN need not be remote consistent
2905 : // for the branching operation to succeed.
2906 : //
2907 : // Hence, if we try to load a tenant in such a state where
2908 : // 1. the existence of the branch was persisted (in IndexPart and/or locally)
2909 : // 2. but the ancestor state is behind branch_lsn because it was not yet persisted
2910 : // then we will need to wait for the ancestor timeline to
2911 : // re-stream WAL up to branch_lsn before we access it.
2912 : //
2913 : // How can a tenant get in such a state?
2914 : // - ungraceful pageserver process exit
2915 : // - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
2916 : //
2917 : // NB: this could be avoided by requiring
2918 : // branch_lsn >= remote_consistent_lsn
2919 : // during branch creation.
2920 226538 : match ancestor.wait_to_become_active(ctx).await {
2921 226536 : Ok(()) => {}
2922 : Err(TimelineState::Stopping) => {
2923 0 : return Err(GetReadyAncestorError::AncestorStopping(
2924 0 : ancestor.timeline_id,
2925 0 : ));
2926 : }
2927 2 : Err(state) => {
2928 2 : return Err(GetReadyAncestorError::Other(anyhow::anyhow!(
2929 2 : "Timeline {} will not become active. Current state: {:?}",
2930 2 : ancestor.timeline_id,
2931 2 : &state,
2932 2 : )));
2933 : }
2934 : }
2935 226536 : ancestor
2936 226536 : .wait_lsn(self.ancestor_lsn, ctx)
2937 0 : .await
2938 226536 : .map_err(|e| match e {
2939 0 : e @ WaitLsnError::Timeout(_) => GetReadyAncestorError::AncestorLsnTimeout(e),
2940 0 : WaitLsnError::Shutdown => GetReadyAncestorError::Cancelled,
2941 0 : e @ WaitLsnError::BadState => GetReadyAncestorError::Other(anyhow::anyhow!(e)),
2942 226536 : })?;
2943 :
2944 226536 : Ok(ancestor)
2945 226538 : }
2946 :
2947 226538 : fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
2948 226538 : let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
2949 0 : format!(
2950 0 : "Ancestor is missing. Timeline id: {} Ancestor id {:?}",
2951 0 : self.timeline_id,
2952 0 : self.get_ancestor_timeline_id(),
2953 0 : )
2954 226538 : })?;
2955 226538 : Ok(Arc::clone(ancestor))
2956 226538 : }
2957 :
2958 12 : pub(crate) fn get_shard_identity(&self) -> &ShardIdentity {
2959 12 : &self.shard_identity
2960 12 : }
2961 :
2962 : ///
2963 : /// Get a handle to the latest layer for appending.
2964 : ///
2965 2627992 : async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
2966 2627992 : let mut guard = self.layers.write().await;
2967 2627992 : let layer = guard
2968 2627992 : .get_layer_for_write(
2969 2627992 : lsn,
2970 2627992 : self.get_last_record_lsn(),
2971 2627992 : self.conf,
2972 2627992 : self.timeline_id,
2973 2627992 : self.tenant_shard_id,
2974 2627992 : )
2975 320 : .await?;
2976 2627992 : Ok(layer)
2977 2627992 : }
2978 :
2979 3102908 : pub(crate) fn finish_write(&self, new_lsn: Lsn) {
2980 3102908 : assert!(new_lsn.is_aligned());
2981 :
2982 3102908 : self.metrics.last_record_gauge.set(new_lsn.0 as i64);
2983 3102908 : self.last_record_lsn.advance(new_lsn);
2984 3102908 : }
2985 :
2986 526 : async fn freeze_inmem_layer(&self, write_lock_held: bool) {
2987 : // Freeze the current open in-memory layer. It will be written to disk on next
2988 : // iteration.
2989 :
2990 526 : let _write_guard = if write_lock_held {
2991 0 : None
2992 : } else {
2993 526 : Some(self.write_lock.lock().await)
2994 : };
2995 :
2996 526 : self.freeze_inmem_layer_at(self.get_last_record_lsn()).await;
2997 526 : }
2998 :
2999 526 : async fn freeze_inmem_layer_at(&self, at: Lsn) {
3000 526 : let mut guard = self.layers.write().await;
3001 526 : guard
3002 526 : .try_freeze_in_memory_layer(at, &self.last_freeze_at)
3003 0 : .await;
3004 526 : }
3005 :
3006 : /// Layer flusher task's main loop.
3007 288 : async fn flush_loop(
3008 288 : self: &Arc<Self>,
3009 288 : mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
3010 288 : ctx: &RequestContext,
3011 288 : ) {
3012 288 : info!("started flush loop");
3013 : loop {
3014 814 : tokio::select! {
3015 1281 : _ = self.cancel.cancelled() => {
3016 1281 : info!("shutting down layer flush task");
3017 1281 : break;
3018 1281 : },
3019 1281 : _ = task_mgr::shutdown_watcher() => {
3020 1281 : info!("shutting down layer flush task");
3021 1281 : break;
3022 1281 : },
3023 1281 : _ = layer_flush_start_rx.changed() => {}
3024 1281 : }
3025 :
3026 0 : trace!("waking up");
3027 526 : let timer = self.metrics.flush_time_histo.start_timer();
3028 526 : let flush_counter = *layer_flush_start_rx.borrow();
3029 1046 : let result = loop {
3030 1046 : if self.cancel.is_cancelled() {
3031 0 : info!("dropping out of flush loop for timeline shutdown");
3032 : // Note: we do not bother transmitting into [`layer_flush_done_tx`], because
3033 : // anyone waiting on that will respect self.cancel as well: they will stop
3034 : // waiting at the same time we as drop out of this loop.
3035 0 : return;
3036 1046 : }
3037 :
3038 1046 : let layer_to_flush = {
3039 1046 : let guard = self.layers.read().await;
3040 1046 : guard.layer_map().frozen_layers.front().cloned()
3041 : // drop 'layers' lock to allow concurrent reads and writes
3042 : };
3043 1046 : let Some(layer_to_flush) = layer_to_flush else {
3044 526 : break Ok(());
3045 : };
3046 2307 : match self.flush_frozen_layer(layer_to_flush, ctx).await {
3047 520 : Ok(()) => {}
3048 : Err(FlushLayerError::Cancelled) => {
3049 0 : info!("dropping out of flush loop for timeline shutdown");
3050 0 : return;
3051 : }
3052 0 : err @ Err(
3053 : FlushLayerError::Other(_) | FlushLayerError::CreateImageLayersError(_),
3054 : ) => {
3055 0 : error!("could not flush frozen layer: {err:?}");
3056 0 : break err;
3057 : }
3058 : }
3059 : };
3060 : // Notify any listeners that we're done
3061 526 : let _ = self
3062 526 : .layer_flush_done_tx
3063 526 : .send_replace((flush_counter, result));
3064 526 :
3065 526 : timer.stop_and_record();
3066 : }
3067 8 : }
3068 :
3069 526 : async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
3070 526 : let mut rx = self.layer_flush_done_tx.subscribe();
3071 526 :
3072 526 : // Increment the flush cycle counter and wake up the flush task.
3073 526 : // Remember the new value, so that when we listen for the flush
3074 526 : // to finish, we know when the flush that we initiated has
3075 526 : // finished, instead of some other flush that was started earlier.
3076 526 : let mut my_flush_request = 0;
3077 526 :
3078 526 : let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
3079 526 : if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
3080 0 : anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
3081 526 : }
3082 526 :
3083 526 : self.layer_flush_start_tx.send_modify(|counter| {
3084 526 : my_flush_request = *counter + 1;
3085 526 : *counter = my_flush_request;
3086 526 : });
3087 :
3088 1052 : loop {
3089 1052 : {
3090 1052 : let (last_result_counter, last_result) = &*rx.borrow();
3091 1052 : if *last_result_counter >= my_flush_request {
3092 526 : if let Err(_err) = last_result {
3093 : // We already logged the original error in
3094 : // flush_loop. We cannot propagate it to the caller
3095 : // here, because it might not be Cloneable
3096 0 : anyhow::bail!(
3097 0 : "Could not flush frozen layer. Request id: {}",
3098 0 : my_flush_request
3099 0 : );
3100 : } else {
3101 526 : return Ok(());
3102 : }
3103 526 : }
3104 : }
3105 0 : trace!("waiting for flush to complete");
3106 526 : tokio::select! {
3107 526 : rx_e = rx.changed() => {
3108 : rx_e?;
3109 : },
3110 : // Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring
3111 : // the notification from [`flush_loop`] that it completed.
3112 : _ = self.cancel.cancelled() => {
3113 0 : tracing::info!("Cancelled layer flush due on timeline shutdown");
3114 : return Ok(())
3115 : }
3116 : };
3117 0 : trace!("done")
3118 : }
3119 526 : }
3120 :
3121 0 : fn flush_frozen_layers(&self) {
3122 0 : self.layer_flush_start_tx.send_modify(|val| *val += 1);
3123 0 : }
3124 :
3125 : /// Flush one frozen in-memory layer to disk, as a new delta layer.
3126 1040 : #[instrument(skip_all, fields(layer=%frozen_layer))]
3127 : async fn flush_frozen_layer(
3128 : self: &Arc<Self>,
3129 : frozen_layer: Arc<InMemoryLayer>,
3130 : ctx: &RequestContext,
3131 : ) -> Result<(), FlushLayerError> {
3132 : debug_assert_current_span_has_tenant_and_timeline_id();
3133 : // As a special case, when we have just imported an image into the repository,
3134 : // instead of writing out a L0 delta layer, we directly write out image layer
3135 : // files instead. This is possible as long as *all* the data imported into the
3136 : // repository have the same LSN.
3137 : let lsn_range = frozen_layer.get_lsn_range();
3138 : let (layers_to_upload, delta_layer_to_add) =
3139 : if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
3140 : #[cfg(test)]
3141 : match &mut *self.flush_loop_state.lock().unwrap() {
3142 : FlushLoopState::NotStarted | FlushLoopState::Exited => {
3143 : panic!("flush loop not running")
3144 : }
3145 : FlushLoopState::Running {
3146 : initdb_optimization_count,
3147 : ..
3148 : } => {
3149 : *initdb_optimization_count += 1;
3150 : }
3151 : }
3152 : // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
3153 : // require downloading anything during initial import.
3154 : let (partitioning, _lsn) = self
3155 : .repartition(
3156 : self.initdb_lsn,
3157 : self.get_compaction_target_size(),
3158 : EnumSet::empty(),
3159 : ctx,
3160 : )
3161 : .await?;
3162 :
3163 : if self.cancel.is_cancelled() {
3164 : return Err(FlushLayerError::Cancelled);
3165 : }
3166 :
3167 : // For image layers, we add them immediately into the layer map.
3168 : (
3169 : self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
3170 : .await?,
3171 : None,
3172 : )
3173 : } else {
3174 : #[cfg(test)]
3175 : match &mut *self.flush_loop_state.lock().unwrap() {
3176 : FlushLoopState::NotStarted | FlushLoopState::Exited => {
3177 : panic!("flush loop not running")
3178 : }
3179 : FlushLoopState::Running {
3180 : expect_initdb_optimization,
3181 : ..
3182 : } => {
3183 : assert!(!*expect_initdb_optimization, "expected initdb optimization");
3184 : }
3185 : }
3186 : // Normal case, write out a L0 delta layer file.
3187 : // `create_delta_layer` will not modify the layer map.
3188 : // We will remove frozen layer and add delta layer in one atomic operation later.
3189 : let layer = self.create_delta_layer(&frozen_layer, ctx).await?;
3190 : (
3191 : // FIXME: even though we have a single image and single delta layer assumption
3192 : // we push them to vec
3193 : vec![layer.clone()],
3194 : Some(layer),
3195 : )
3196 : };
3197 :
3198 520 : pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
3199 :
3200 : if self.cancel.is_cancelled() {
3201 : return Err(FlushLayerError::Cancelled);
3202 : }
3203 :
3204 : let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
3205 : let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
3206 :
3207 : // The new on-disk layers are now in the layer map. We can remove the
3208 : // in-memory layer from the map now. The flushed layer is stored in
3209 : // the mapping in `create_delta_layer`.
3210 : let metadata = {
3211 : let mut guard = self.layers.write().await;
3212 :
3213 : if self.cancel.is_cancelled() {
3214 : return Err(FlushLayerError::Cancelled);
3215 : }
3216 :
3217 : guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
3218 :
3219 : if disk_consistent_lsn != old_disk_consistent_lsn {
3220 : assert!(disk_consistent_lsn > old_disk_consistent_lsn);
3221 : self.disk_consistent_lsn.store(disk_consistent_lsn);
3222 :
3223 : // Schedule remote uploads that will reflect our new disk_consistent_lsn
3224 : Some(self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?)
3225 : } else {
3226 : None
3227 : }
3228 : // release lock on 'layers'
3229 : };
3230 :
3231 : // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
3232 : // a compaction can delete the file and then it won't be available for uploads any more.
3233 : // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
3234 : // race situation.
3235 : // See https://github.com/neondatabase/neon/issues/4526
3236 520 : pausable_failpoint!("flush-frozen-pausable");
3237 :
3238 : // This failpoint is used by another test case `test_pageserver_recovery`.
3239 0 : fail_point!("flush-frozen-exit");
3240 :
3241 : // Update the metadata file, with new 'disk_consistent_lsn'
3242 : //
3243 : // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
3244 : // *all* the layers, to avoid fsyncing the file multiple times.
3245 :
3246 : // If we updated our disk_consistent_lsn, persist the updated metadata to local disk.
3247 : if let Some(metadata) = metadata {
3248 : save_metadata(
3249 : self.conf,
3250 : &self.tenant_shard_id,
3251 : &self.timeline_id,
3252 : &metadata,
3253 : )
3254 : .await
3255 : .context("save_metadata")?;
3256 : }
3257 : Ok(())
3258 : }
3259 :
3260 : /// Update metadata file
3261 520 : fn schedule_uploads(
3262 520 : &self,
3263 520 : disk_consistent_lsn: Lsn,
3264 520 : layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
3265 520 : ) -> anyhow::Result<TimelineMetadata> {
3266 520 : // We can only save a valid 'prev_record_lsn' value on disk if we
3267 520 : // flushed *all* in-memory changes to disk. We only track
3268 520 : // 'prev_record_lsn' in memory for the latest processed record, so we
3269 520 : // don't remember what the correct value that corresponds to some old
3270 520 : // LSN is. But if we flush everything, then the value corresponding
3271 520 : // current 'last_record_lsn' is correct and we can store it on disk.
3272 520 : let RecordLsn {
3273 520 : last: last_record_lsn,
3274 520 : prev: prev_record_lsn,
3275 520 : } = self.last_record_lsn.load();
3276 520 : let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
3277 520 : Some(prev_record_lsn)
3278 : } else {
3279 0 : None
3280 : };
3281 :
3282 520 : let ancestor_timeline_id = self
3283 520 : .ancestor_timeline
3284 520 : .as_ref()
3285 520 : .map(|ancestor| ancestor.timeline_id);
3286 520 :
3287 520 : let metadata = TimelineMetadata::new(
3288 520 : disk_consistent_lsn,
3289 520 : ondisk_prev_record_lsn,
3290 520 : ancestor_timeline_id,
3291 520 : self.ancestor_lsn,
3292 520 : *self.latest_gc_cutoff_lsn.read(),
3293 520 : self.initdb_lsn,
3294 520 : self.pg_version,
3295 520 : );
3296 520 :
3297 520 : fail_point!("checkpoint-before-saving-metadata", |x| bail!(
3298 0 : "{}",
3299 0 : x.unwrap()
3300 520 : ));
3301 :
3302 520 : if let Some(remote_client) = &self.remote_client {
3303 1040 : for layer in layers_to_upload {
3304 520 : remote_client.schedule_layer_file_upload(layer)?;
3305 : }
3306 520 : remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
3307 0 : }
3308 :
3309 520 : Ok(metadata)
3310 520 : }
3311 :
3312 0 : async fn update_metadata_file(
3313 0 : &self,
3314 0 : disk_consistent_lsn: Lsn,
3315 0 : layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
3316 0 : ) -> anyhow::Result<()> {
3317 0 : let metadata = self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?;
3318 :
3319 0 : save_metadata(
3320 0 : self.conf,
3321 0 : &self.tenant_shard_id,
3322 0 : &self.timeline_id,
3323 0 : &metadata,
3324 0 : )
3325 0 : .await
3326 0 : .context("save_metadata")?;
3327 :
3328 0 : Ok(())
3329 0 : }
3330 :
3331 0 : pub(crate) async fn preserve_initdb_archive(&self) -> anyhow::Result<()> {
3332 0 : if let Some(remote_client) = &self.remote_client {
3333 0 : remote_client
3334 0 : .preserve_initdb_archive(
3335 0 : &self.tenant_shard_id.tenant_id,
3336 0 : &self.timeline_id,
3337 0 : &self.cancel,
3338 0 : )
3339 0 : .await?;
3340 : } else {
3341 0 : bail!("No remote storage configured, but was asked to backup the initdb archive for {} / {}", self.tenant_shard_id.tenant_id, self.timeline_id);
3342 : }
3343 0 : Ok(())
3344 0 : }
3345 :
3346 : // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
3347 : // in layer map immediately. The caller is responsible to put it into the layer map.
3348 450 : async fn create_delta_layer(
3349 450 : self: &Arc<Self>,
3350 450 : frozen_layer: &Arc<InMemoryLayer>,
3351 450 : ctx: &RequestContext,
3352 450 : ) -> anyhow::Result<ResidentLayer> {
3353 450 : let span = tracing::info_span!("blocking");
3354 450 : let new_delta: ResidentLayer = tokio::task::spawn_blocking({
3355 450 : let self_clone = Arc::clone(self);
3356 450 : let frozen_layer = Arc::clone(frozen_layer);
3357 450 : let ctx = ctx.attached_child();
3358 450 : move || {
3359 450 : // Write it out
3360 450 : // Keep this inside `spawn_blocking` and `Handle::current`
3361 450 : // as long as the write path is still sync and the read impl
3362 450 : // is still not fully async. Otherwise executor threads would
3363 450 : // be blocked.
3364 450 : let _g = span.entered();
3365 450 : let new_delta =
3366 450 : Handle::current().block_on(frozen_layer.write_to_disk(&self_clone, &ctx))?;
3367 450 : let new_delta_path = new_delta.local_path().to_owned();
3368 450 :
3369 450 : // Sync it to disk.
3370 450 : //
3371 450 : // We must also fsync the timeline dir to ensure the directory entries for
3372 450 : // new layer files are durable.
3373 450 : //
3374 450 : // NB: timeline dir must be synced _after_ the file contents are durable.
3375 450 : // So, two separate fsyncs are required, they mustn't be batched.
3376 450 : //
3377 450 : // TODO: If we're running inside 'flush_frozen_layers' and there are multiple
3378 450 : // files to flush, the fsync overhead can be reduces as follows:
3379 450 : // 1. write them all to temporary file names
3380 450 : // 2. fsync them
3381 450 : // 3. rename to the final name
3382 450 : // 4. fsync the parent directory.
3383 450 : // Note that (1),(2),(3) today happen inside write_to_disk().
3384 450 : //
3385 450 : // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
3386 450 : par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
3387 450 : par_fsync::par_fsync(&[self_clone
3388 450 : .conf
3389 450 : .timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id)])
3390 450 : .context("fsync of timeline dir")?;
3391 :
3392 450 : anyhow::Ok(new_delta)
3393 450 : }
3394 450 : })
3395 450 : .await
3396 450 : .context("spawn_blocking")
3397 450 : .and_then(|x| x)?;
3398 :
3399 450 : Ok(new_delta)
3400 450 : }
3401 :
3402 478 : async fn repartition(
3403 478 : &self,
3404 478 : lsn: Lsn,
3405 478 : partition_size: u64,
3406 478 : flags: EnumSet<CompactFlags>,
3407 478 : ctx: &RequestContext,
3408 478 : ) -> anyhow::Result<(KeyPartitioning, Lsn)> {
3409 478 : {
3410 478 : let partitioning_guard = self.partitioning.lock().unwrap();
3411 478 : let distance = lsn.0 - partitioning_guard.1 .0;
3412 478 : if partitioning_guard.1 != Lsn(0)
3413 308 : && distance <= self.repartition_threshold
3414 308 : && !flags.contains(CompactFlags::ForceRepartition)
3415 : {
3416 0 : debug!(
3417 0 : distance,
3418 0 : threshold = self.repartition_threshold,
3419 0 : "no repartitioning needed"
3420 0 : );
3421 308 : return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
3422 170 : }
3423 : }
3424 13252 : let keyspace = self.collect_keyspace(lsn, ctx).await?;
3425 170 : let partitioning = keyspace.partition(partition_size);
3426 170 :
3427 170 : let mut partitioning_guard = self.partitioning.lock().unwrap();
3428 170 : if lsn > partitioning_guard.1 {
3429 170 : *partitioning_guard = (partitioning, lsn);
3430 170 : } else {
3431 0 : warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
3432 : }
3433 170 : Ok((partitioning_guard.0.clone(), partitioning_guard.1))
3434 478 : }
3435 :
3436 : // Is it time to create a new image layer for the given partition?
3437 408 : async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
3438 408 : let threshold = self.get_image_creation_threshold();
3439 :
3440 408 : let guard = self.layers.read().await;
3441 408 : let layers = guard.layer_map();
3442 408 :
3443 408 : let mut max_deltas = 0;
3444 408 : {
3445 408 : let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
3446 408 : if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
3447 294 : let img_range =
3448 294 : partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
3449 294 : if wanted.overlaps(&img_range) {
3450 : //
3451 : // gc_timeline only pays attention to image layers that are older than the GC cutoff,
3452 : // but create_image_layers creates image layers at last-record-lsn.
3453 : // So it's possible that gc_timeline wants a new image layer to be created for a key range,
3454 : // but the range is already covered by image layers at more recent LSNs. Before we
3455 : // create a new image layer, check if the range is already covered at more recent LSNs.
3456 0 : if !layers
3457 0 : .image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))
3458 : {
3459 0 : debug!(
3460 0 : "Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
3461 0 : img_range.start, img_range.end, cutoff_lsn, lsn
3462 0 : );
3463 0 : return true;
3464 0 : }
3465 294 : }
3466 114 : }
3467 : }
3468 :
3469 2856 : for part_range in &partition.ranges {
3470 2448 : let image_coverage = layers.image_coverage(part_range, lsn);
3471 5204 : for (img_range, last_img) in image_coverage {
3472 2756 : let img_lsn = if let Some(last_img) = last_img {
3473 2156 : last_img.get_lsn_range().end
3474 : } else {
3475 600 : Lsn(0)
3476 : };
3477 : // Let's consider an example:
3478 : //
3479 : // delta layer with LSN range 71-81
3480 : // delta layer with LSN range 81-91
3481 : // delta layer with LSN range 91-101
3482 : // image layer at LSN 100
3483 : //
3484 : // If 'lsn' is still 100, i.e. no new WAL has been processed since the last image layer,
3485 : // there's no need to create a new one. We check this case explicitly, to avoid passing
3486 : // a bogus range to count_deltas below, with start > end. It's even possible that there
3487 : // are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
3488 : // after we read last_record_lsn, which is passed here in the 'lsn' argument.
3489 2756 : if img_lsn < lsn {
3490 600 : let num_deltas =
3491 600 : layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold));
3492 600 :
3493 600 : max_deltas = max_deltas.max(num_deltas);
3494 600 : if num_deltas >= threshold {
3495 0 : debug!(
3496 0 : "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
3497 0 : img_range.start, img_range.end, num_deltas, img_lsn, lsn
3498 0 : );
3499 0 : return true;
3500 600 : }
3501 2156 : }
3502 : }
3503 : }
3504 :
3505 0 : debug!(
3506 0 : max_deltas,
3507 0 : "none of the partitioned ranges had >= {threshold} deltas"
3508 0 : );
3509 408 : false
3510 408 : }
3511 :
3512 956 : #[tracing::instrument(skip_all, fields(%lsn, %force))]
3513 : async fn create_image_layers(
3514 : self: &Arc<Timeline>,
3515 : partitioning: &KeyPartitioning,
3516 : lsn: Lsn,
3517 : force: bool,
3518 : ctx: &RequestContext,
3519 : ) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
3520 : let timer = self.metrics.create_images_time_histo.start_timer();
3521 : let mut image_layers = Vec::new();
3522 :
3523 : // We need to avoid holes between generated image layers.
3524 : // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
3525 : // image layer with hole between them. In this case such layer can not be utilized by GC.
3526 : //
3527 : // How such hole between partitions can appear?
3528 : // if we have relation with relid=1 and size 100 and relation with relid=2 with size 200 then result of
3529 : // KeySpace::partition may contain partitions <100000000..100000099> and <200000000..200000199>.
3530 : // If there is delta layer <100000000..300000000> then it never be garbage collected because
3531 : // image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
3532 : let mut start = Key::MIN;
3533 :
3534 : for partition in partitioning.parts.iter() {
3535 : let img_range = start..partition.ranges.last().unwrap().end;
3536 : if !force && !self.time_for_new_image_layer(partition, lsn).await {
3537 : start = img_range.end;
3538 : continue;
3539 : }
3540 :
3541 : let mut image_layer_writer = ImageLayerWriter::new(
3542 : self.conf,
3543 : self.timeline_id,
3544 : self.tenant_shard_id,
3545 : &img_range,
3546 : lsn,
3547 : )
3548 : .await?;
3549 :
3550 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
3551 0 : Err(CreateImageLayersError::Other(anyhow::anyhow!(
3552 0 : "failpoint image-layer-writer-fail-before-finish"
3553 0 : )))
3554 0 : });
3555 :
3556 : let mut wrote_keys = false;
3557 :
3558 : let mut key_request_accum = KeySpaceAccum::new();
3559 : for range in &partition.ranges {
3560 : let mut key = range.start;
3561 : while key < range.end {
3562 : // Decide whether to retain this key: usually we do, but sharded tenants may
3563 : // need to drop keys that don't belong to them. If we retain the key, add it
3564 : // to `key_request_accum` for later issuing a vectored get
3565 : if self.shard_identity.is_key_disposable(&key) {
3566 0 : debug!(
3567 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
3568 0 : key,
3569 0 : self.shard_identity.get_shard_number(&key)
3570 0 : );
3571 : } else {
3572 : key_request_accum.add_key(key);
3573 : }
3574 :
3575 : let last_key_in_range = key.next() == range.end;
3576 : key = key.next();
3577 :
3578 : // Maybe flush `key_rest_accum`
3579 : if key_request_accum.size() >= Timeline::MAX_GET_VECTORED_KEYS
3580 : || last_key_in_range
3581 : {
3582 : let results = self
3583 : .get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
3584 : .await?;
3585 :
3586 : for (img_key, img) in results {
3587 : let img = match img {
3588 : Ok(img) => img,
3589 : Err(err) => {
3590 : // If we fail to reconstruct a VM or FSM page, we can zero the
3591 : // page without losing any actual user data. That seems better
3592 : // than failing repeatedly and getting stuck.
3593 : //
3594 : // We had a bug at one point, where we truncated the FSM and VM
3595 : // in the pageserver, but the Postgres didn't know about that
3596 : // and continued to generate incremental WAL records for pages
3597 : // that didn't exist in the pageserver. Trying to replay those
3598 : // WAL records failed to find the previous image of the page.
3599 : // This special case allows us to recover from that situation.
3600 : // See https://github.com/neondatabase/neon/issues/2601.
3601 : //
3602 : // Unfortunately we cannot do this for the main fork, or for
3603 : // any metadata keys, keys, as that would lead to actual data
3604 : // loss.
3605 : if is_rel_fsm_block_key(img_key) || is_rel_vm_block_key(img_key)
3606 : {
3607 0 : warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}");
3608 : ZERO_PAGE.clone()
3609 : } else {
3610 : return Err(CreateImageLayersError::PageReconstructError(
3611 : err,
3612 : ));
3613 : }
3614 : }
3615 : };
3616 :
3617 : // Write all the keys we just read into our new image layer.
3618 : image_layer_writer.put_image(img_key, img).await?;
3619 : wrote_keys = true;
3620 : }
3621 : }
3622 : }
3623 : }
3624 :
3625 : if wrote_keys {
3626 : // Normal path: we have written some data into the new image layer for this
3627 : // partition, so flush it to disk.
3628 : start = img_range.end;
3629 : let image_layer = image_layer_writer.finish(self).await?;
3630 : image_layers.push(image_layer);
3631 : } else {
3632 : // Special case: the image layer may be empty if this is a sharded tenant and the
3633 : // partition does not cover any keys owned by this shard. In this case, to ensure
3634 : // we don't leave gaps between image layers, leave `start` where it is, so that the next
3635 : // layer we write will cover the key range that we just scanned.
3636 0 : tracing::debug!("no data in range {}-{}", img_range.start, img_range.end);
3637 : }
3638 : }
3639 : // All layers that the GC wanted us to create have now been created.
3640 : //
3641 : // It's possible that another GC cycle happened while we were compacting, and added
3642 : // something new to wanted_image_layers, and we now clear that before processing it.
3643 : // That's OK, because the next GC iteration will put it back in.
3644 : *self.wanted_image_layers.lock().unwrap() = None;
3645 :
3646 : // Sync the new layer to disk before adding it to the layer map, to make sure
3647 : // we don't garbage collect something based on the new layer, before it has
3648 : // reached the disk.
3649 : //
3650 : // We must also fsync the timeline dir to ensure the directory entries for
3651 : // new layer files are durable
3652 : //
3653 : // Compaction creates multiple image layers. It would be better to create them all
3654 : // and fsync them all in parallel.
3655 : let all_paths = image_layers
3656 : .iter()
3657 70 : .map(|layer| layer.local_path().to_owned())
3658 : .collect::<Vec<_>>();
3659 :
3660 : par_fsync::par_fsync_async(&all_paths)
3661 : .await
3662 : .context("fsync of newly created layer files")?;
3663 :
3664 : if !all_paths.is_empty() {
3665 : par_fsync::par_fsync_async(&[self
3666 : .conf
3667 : .timeline_path(&self.tenant_shard_id, &self.timeline_id)])
3668 : .await
3669 : .context("fsync of timeline dir")?;
3670 : }
3671 :
3672 : let mut guard = self.layers.write().await;
3673 :
3674 : // FIXME: we could add the images to be uploaded *before* returning from here, but right
3675 : // now they are being scheduled outside of write lock
3676 : guard.track_new_image_layers(&image_layers, &self.metrics);
3677 : drop_wlock(guard);
3678 : timer.stop_and_record();
3679 :
3680 : Ok(image_layers)
3681 : }
3682 :
3683 : /// Wait until the background initial logical size calculation is complete, or
3684 : /// this Timeline is shut down. Calling this function will cause the initial
3685 : /// logical size calculation to skip waiting for the background jobs barrier.
3686 0 : pub(crate) async fn await_initial_logical_size(self: Arc<Self>) {
3687 0 : if let Some(await_bg_cancel) = self
3688 0 : .current_logical_size
3689 0 : .cancel_wait_for_background_loop_concurrency_limit_semaphore
3690 0 : .get()
3691 0 : {
3692 0 : await_bg_cancel.cancel();
3693 0 : } else {
3694 : // We should not wait if we were not able to explicitly instruct
3695 : // the logical size cancellation to skip the concurrency limit semaphore.
3696 : // TODO: this is an unexpected case. We should restructure so that it
3697 : // can't happen.
3698 0 : tracing::info!(
3699 0 : "await_initial_logical_size: can't get semaphore cancel token, skipping"
3700 0 : );
3701 : }
3702 :
3703 0 : tokio::select!(
3704 0 : _ = self.current_logical_size.initialized.acquire() => {},
3705 0 : _ = self.cancel.cancelled() => {}
3706 0 : )
3707 0 : }
3708 : }
3709 :
3710 378 : #[derive(Default)]
3711 : struct CompactLevel0Phase1Result {
3712 : new_layers: Vec<ResidentLayer>,
3713 : deltas_to_compact: Vec<Layer>,
3714 : }
3715 :
3716 : /// Top-level failure to compact.
3717 0 : #[derive(Debug, thiserror::Error)]
3718 : pub(crate) enum CompactionError {
3719 : #[error("The timeline or pageserver is shutting down")]
3720 : ShuttingDown,
3721 : /// Compaction cannot be done right now; page reconstruction and so on.
3722 : #[error(transparent)]
3723 : Other(#[from] anyhow::Error),
3724 : }
3725 :
3726 : #[serde_as]
3727 420 : #[derive(serde::Serialize)]
3728 : struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
3729 :
3730 2856 : #[derive(Default)]
3731 : enum DurationRecorder {
3732 : #[default]
3733 : NotStarted,
3734 : Recorded(RecordedDuration, tokio::time::Instant),
3735 : }
3736 :
3737 : impl DurationRecorder {
3738 558 : fn till_now(&self) -> DurationRecorder {
3739 558 : match self {
3740 : DurationRecorder::NotStarted => {
3741 0 : panic!("must only call on recorded measurements")
3742 : }
3743 558 : DurationRecorder::Recorded(_, ended) => {
3744 558 : let now = tokio::time::Instant::now();
3745 558 : DurationRecorder::Recorded(RecordedDuration(now - *ended), now)
3746 558 : }
3747 558 : }
3748 558 : }
3749 210 : fn into_recorded(self) -> Option<RecordedDuration> {
3750 210 : match self {
3751 0 : DurationRecorder::NotStarted => None,
3752 210 : DurationRecorder::Recorded(recorded, _) => Some(recorded),
3753 : }
3754 210 : }
3755 : }
3756 :
3757 408 : #[derive(Default)]
3758 : struct CompactLevel0Phase1StatsBuilder {
3759 : version: Option<u64>,
3760 : tenant_id: Option<TenantShardId>,
3761 : timeline_id: Option<TimelineId>,
3762 : read_lock_acquisition_micros: DurationRecorder,
3763 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
3764 : read_lock_held_key_sort_micros: DurationRecorder,
3765 : read_lock_held_prerequisites_micros: DurationRecorder,
3766 : read_lock_held_compute_holes_micros: DurationRecorder,
3767 : read_lock_drop_micros: DurationRecorder,
3768 : write_layer_files_micros: DurationRecorder,
3769 : level0_deltas_count: Option<usize>,
3770 : new_deltas_count: Option<usize>,
3771 : new_deltas_size: Option<u64>,
3772 : }
3773 :
3774 30 : #[derive(serde::Serialize)]
3775 : struct CompactLevel0Phase1Stats {
3776 : version: u64,
3777 : tenant_id: TenantShardId,
3778 : timeline_id: TimelineId,
3779 : read_lock_acquisition_micros: RecordedDuration,
3780 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
3781 : read_lock_held_key_sort_micros: RecordedDuration,
3782 : read_lock_held_prerequisites_micros: RecordedDuration,
3783 : read_lock_held_compute_holes_micros: RecordedDuration,
3784 : read_lock_drop_micros: RecordedDuration,
3785 : write_layer_files_micros: RecordedDuration,
3786 : level0_deltas_count: usize,
3787 : new_deltas_count: usize,
3788 : new_deltas_size: u64,
3789 : }
3790 :
3791 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
3792 : type Error = anyhow::Error;
3793 :
3794 30 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
3795 30 : Ok(Self {
3796 30 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
3797 30 : tenant_id: value
3798 30 : .tenant_id
3799 30 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
3800 30 : timeline_id: value
3801 30 : .timeline_id
3802 30 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
3803 30 : read_lock_acquisition_micros: value
3804 30 : .read_lock_acquisition_micros
3805 30 : .into_recorded()
3806 30 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
3807 30 : read_lock_held_spawn_blocking_startup_micros: value
3808 30 : .read_lock_held_spawn_blocking_startup_micros
3809 30 : .into_recorded()
3810 30 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
3811 30 : read_lock_held_key_sort_micros: value
3812 30 : .read_lock_held_key_sort_micros
3813 30 : .into_recorded()
3814 30 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
3815 30 : read_lock_held_prerequisites_micros: value
3816 30 : .read_lock_held_prerequisites_micros
3817 30 : .into_recorded()
3818 30 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
3819 30 : read_lock_held_compute_holes_micros: value
3820 30 : .read_lock_held_compute_holes_micros
3821 30 : .into_recorded()
3822 30 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
3823 30 : read_lock_drop_micros: value
3824 30 : .read_lock_drop_micros
3825 30 : .into_recorded()
3826 30 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
3827 30 : write_layer_files_micros: value
3828 30 : .write_layer_files_micros
3829 30 : .into_recorded()
3830 30 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
3831 30 : level0_deltas_count: value
3832 30 : .level0_deltas_count
3833 30 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
3834 30 : new_deltas_count: value
3835 30 : .new_deltas_count
3836 30 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
3837 30 : new_deltas_size: value
3838 30 : .new_deltas_size
3839 30 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
3840 : })
3841 30 : }
3842 : }
3843 :
3844 : impl Timeline {
3845 : /// Level0 files first phase of compaction, explained in the [`Self::compact`] comment.
3846 408 : async fn compact_level0_phase1(
3847 408 : self: &Arc<Self>,
3848 408 : guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
3849 408 : mut stats: CompactLevel0Phase1StatsBuilder,
3850 408 : target_file_size: u64,
3851 408 : ctx: &RequestContext,
3852 408 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
3853 408 : stats.read_lock_held_spawn_blocking_startup_micros =
3854 408 : stats.read_lock_acquisition_micros.till_now(); // set by caller
3855 408 : let layers = guard.layer_map();
3856 408 : let level0_deltas = layers.get_level0_deltas()?;
3857 408 : let mut level0_deltas = level0_deltas
3858 408 : .into_iter()
3859 1770 : .map(|x| guard.get_from_desc(&x))
3860 408 : .collect_vec();
3861 408 : stats.level0_deltas_count = Some(level0_deltas.len());
3862 408 : // Only compact if enough layers have accumulated.
3863 408 : let threshold = self.get_compaction_threshold();
3864 408 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
3865 0 : debug!(
3866 0 : level0_deltas = level0_deltas.len(),
3867 0 : threshold, "too few deltas to compact"
3868 0 : );
3869 378 : return Ok(CompactLevel0Phase1Result::default());
3870 30 : }
3871 30 :
3872 30 : // This failpoint is used together with `test_duplicate_layers` integration test.
3873 30 : // It returns the compaction result exactly the same layers as input to compaction.
3874 30 : // We want to ensure that this will not cause any problem when updating the layer map
3875 30 : // after the compaction is finished.
3876 30 : //
3877 30 : // Currently, there are two rare edge cases that will cause duplicated layers being
3878 30 : // inserted.
3879 30 : // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
3880 30 : // is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
3881 30 : // map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
3882 30 : // point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
3883 30 : // and this causes an overwrite. This is acceptable because the content is the same, and we should do a
3884 30 : // layer replace instead of the normal remove / upload process.
3885 30 : // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
3886 30 : // size length. Compaction will likely create the same set of n files afterwards.
3887 30 : //
3888 30 : // This failpoint is a superset of both of the cases.
3889 30 : if cfg!(feature = "testing") {
3890 30 : let active = (|| {
3891 30 : ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
3892 30 : false
3893 30 : })();
3894 30 :
3895 30 : if active {
3896 0 : let mut new_layers = Vec::with_capacity(level0_deltas.len());
3897 0 : for delta in &level0_deltas {
3898 : // we are just faking these layers as being produced again for this failpoint
3899 0 : new_layers.push(
3900 0 : delta
3901 0 : .download_and_keep_resident()
3902 0 : .await
3903 0 : .context("download layer for failpoint")?,
3904 : );
3905 : }
3906 0 : tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
3907 0 : return Ok(CompactLevel0Phase1Result {
3908 0 : new_layers,
3909 0 : deltas_to_compact: level0_deltas,
3910 0 : });
3911 30 : }
3912 0 : }
3913 :
3914 : // Gather the files to compact in this iteration.
3915 : //
3916 : // Start with the oldest Level 0 delta file, and collect any other
3917 : // level 0 files that form a contiguous sequence, such that the end
3918 : // LSN of previous file matches the start LSN of the next file.
3919 : //
3920 : // Note that if the files don't form such a sequence, we might
3921 : // "compact" just a single file. That's a bit pointless, but it allows
3922 : // us to get rid of the level 0 file, and compact the other files on
3923 : // the next iteration. This could probably made smarter, but such
3924 : // "gaps" in the sequence of level 0 files should only happen in case
3925 : // of a crash, partial download from cloud storage, or something like
3926 : // that, so it's not a big deal in practice.
3927 540 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
3928 30 : let mut level0_deltas_iter = level0_deltas.iter();
3929 30 :
3930 30 : let first_level0_delta = level0_deltas_iter.next().unwrap();
3931 30 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
3932 30 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
3933 30 :
3934 30 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
3935 300 : for l in level0_deltas_iter {
3936 270 : let lsn_range = &l.layer_desc().lsn_range;
3937 270 :
3938 270 : if lsn_range.start != prev_lsn_end {
3939 0 : break;
3940 270 : }
3941 270 : deltas_to_compact.push(l.download_and_keep_resident().await?);
3942 270 : prev_lsn_end = lsn_range.end;
3943 : }
3944 30 : let lsn_range = Range {
3945 30 : start: deltas_to_compact
3946 30 : .first()
3947 30 : .unwrap()
3948 30 : .layer_desc()
3949 30 : .lsn_range
3950 30 : .start,
3951 30 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
3952 30 : };
3953 :
3954 30 : info!(
3955 30 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
3956 30 : lsn_range.start,
3957 30 : lsn_range.end,
3958 30 : deltas_to_compact.len(),
3959 30 : level0_deltas.len()
3960 30 : );
3961 :
3962 300 : for l in deltas_to_compact.iter() {
3963 300 : info!("compact includes {l}");
3964 : }
3965 :
3966 : // We don't need the original list of layers anymore. Drop it so that
3967 : // we don't accidentally use it later in the function.
3968 30 : drop(level0_deltas);
3969 30 :
3970 30 : stats.read_lock_held_prerequisites_micros = stats
3971 30 : .read_lock_held_spawn_blocking_startup_micros
3972 30 : .till_now();
3973 30 :
3974 30 : // Determine N largest holes where N is number of compacted layers.
3975 30 : let max_holes = deltas_to_compact.len();
3976 30 : let last_record_lsn = self.get_last_record_lsn();
3977 30 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
3978 30 : let min_hole_coverage_size = 3; // TODO: something more flexible?
3979 30 :
3980 30 : // min-heap (reserve space for one more element added before eviction)
3981 30 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
3982 30 : let mut prev: Option<Key> = None;
3983 30 :
3984 30 : let mut all_keys = Vec::new();
3985 :
3986 300 : for l in deltas_to_compact.iter() {
3987 2422 : all_keys.extend(l.load_keys(ctx).await?);
3988 : }
3989 :
3990 : // FIXME: should spawn_blocking the rest of this function
3991 :
3992 : // The current stdlib sorting implementation is designed in a way where it is
3993 : // particularly fast where the slice is made up of sorted sub-ranges.
3994 4931318 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
3995 30 :
3996 30 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
3997 :
3998 2102000 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
3999 2102000 : if let Some(prev_key) = prev {
4000 : // just first fast filter
4001 2101970 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
4002 0 : let key_range = prev_key..next_key;
4003 0 : // Measuring hole by just subtraction of i128 representation of key range boundaries
4004 0 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
4005 0 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
4006 0 : // That is why it is better to measure size of hole as number of covering image layers.
4007 0 : let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
4008 0 : if coverage_size >= min_hole_coverage_size {
4009 0 : heap.push(Hole {
4010 0 : key_range,
4011 0 : coverage_size,
4012 0 : });
4013 0 : if heap.len() > max_holes {
4014 0 : heap.pop(); // remove smallest hole
4015 0 : }
4016 0 : }
4017 2101970 : }
4018 30 : }
4019 2102000 : prev = Some(next_key.next());
4020 : }
4021 30 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
4022 30 : drop_rlock(guard);
4023 30 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
4024 30 : let mut holes = heap.into_vec();
4025 30 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
4026 30 : let mut next_hole = 0; // index of next hole in holes vector
4027 30 :
4028 30 : // This iterator walks through all key-value pairs from all the layers
4029 30 : // we're compacting, in key, LSN order.
4030 30 : let all_values_iter = all_keys.iter();
4031 30 :
4032 30 : // This iterator walks through all keys and is needed to calculate size used by each key
4033 30 : let mut all_keys_iter = all_keys
4034 30 : .iter()
4035 2102000 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
4036 2101970 : .coalesce(|mut prev, cur| {
4037 2101970 : // Coalesce keys that belong to the same key pair.
4038 2101970 : // This ensures that compaction doesn't put them
4039 2101970 : // into different layer files.
4040 2101970 : // Still limit this by the target file size,
4041 2101970 : // so that we keep the size of the files in
4042 2101970 : // check.
4043 2101970 : if prev.0 == cur.0 && prev.2 < target_file_size {
4044 92001 : prev.2 += cur.2;
4045 92001 : Ok(prev)
4046 : } else {
4047 2009969 : Err((prev, cur))
4048 : }
4049 2101970 : });
4050 30 :
4051 30 : // Merge the contents of all the input delta layers into a new set
4052 30 : // of delta layers, based on the current partitioning.
4053 30 : //
4054 30 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
4055 30 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
4056 30 : // would be too large. In that case, we also split on the LSN dimension.
4057 30 : //
4058 30 : // LSN
4059 30 : // ^
4060 30 : // |
4061 30 : // | +-----------+ +--+--+--+--+
4062 30 : // | | | | | | | |
4063 30 : // | +-----------+ | | | | |
4064 30 : // | | | | | | | |
4065 30 : // | +-----------+ ==> | | | | |
4066 30 : // | | | | | | | |
4067 30 : // | +-----------+ | | | | |
4068 30 : // | | | | | | | |
4069 30 : // | +-----------+ +--+--+--+--+
4070 30 : // |
4071 30 : // +--------------> key
4072 30 : //
4073 30 : //
4074 30 : // If one key (X) has a lot of page versions:
4075 30 : //
4076 30 : // LSN
4077 30 : // ^
4078 30 : // | (X)
4079 30 : // | +-----------+ +--+--+--+--+
4080 30 : // | | | | | | | |
4081 30 : // | +-----------+ | | +--+ |
4082 30 : // | | | | | | | |
4083 30 : // | +-----------+ ==> | | | | |
4084 30 : // | | | | | +--+ |
4085 30 : // | +-----------+ | | | | |
4086 30 : // | | | | | | | |
4087 30 : // | +-----------+ +--+--+--+--+
4088 30 : // |
4089 30 : // +--------------> key
4090 30 : // TODO: this actually divides the layers into fixed-size chunks, not
4091 30 : // based on the partitioning.
4092 30 : //
4093 30 : // TODO: we should also opportunistically materialize and
4094 30 : // garbage collect what we can.
4095 30 : let mut new_layers = Vec::new();
4096 30 : let mut prev_key: Option<Key> = None;
4097 30 : let mut writer: Option<DeltaLayerWriter> = None;
4098 30 : let mut key_values_total_size = 0u64;
4099 30 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
4100 30 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
4101 :
4102 : for &DeltaEntry {
4103 2102000 : key, lsn, ref val, ..
4104 2102030 : } in all_values_iter
4105 : {
4106 2102000 : let value = val.load(ctx).await?;
4107 2102000 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
4108 2102000 : // We need to check key boundaries once we reach next key or end of layer with the same key
4109 2102000 : if !same_key || lsn == dup_end_lsn {
4110 2009999 : let mut next_key_size = 0u64;
4111 2009999 : let is_dup_layer = dup_end_lsn.is_valid();
4112 2009999 : dup_start_lsn = Lsn::INVALID;
4113 2009999 : if !same_key {
4114 2009999 : dup_end_lsn = Lsn::INVALID;
4115 2009999 : }
4116 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
4117 2009999 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
4118 2009999 : next_key_size = next_size;
4119 2009999 : if key != next_key {
4120 2009969 : if dup_end_lsn.is_valid() {
4121 0 : // We are writting segment with duplicates:
4122 0 : // place all remaining values of this key in separate segment
4123 0 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
4124 0 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
4125 2009969 : }
4126 2009969 : break;
4127 30 : }
4128 30 : key_values_total_size += next_size;
4129 30 : // Check if it is time to split segment: if total keys size is larger than target file size.
4130 30 : // We need to avoid generation of empty segments if next_size > target_file_size.
4131 30 : if key_values_total_size > target_file_size && lsn != next_lsn {
4132 : // Split key between multiple layers: such layer can contain only single key
4133 0 : dup_start_lsn = if dup_end_lsn.is_valid() {
4134 0 : dup_end_lsn // new segment with duplicates starts where old one stops
4135 : } else {
4136 0 : lsn // start with the first LSN for this key
4137 : };
4138 0 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
4139 0 : break;
4140 30 : }
4141 : }
4142 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
4143 2009999 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
4144 0 : dup_start_lsn = dup_end_lsn;
4145 0 : dup_end_lsn = lsn_range.end;
4146 2009999 : }
4147 2009999 : if writer.is_some() {
4148 2009969 : let written_size = writer.as_mut().unwrap().size();
4149 2009969 : let contains_hole =
4150 2009969 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
4151 : // check if key cause layer overflow or contains hole...
4152 2009969 : if is_dup_layer
4153 2009969 : || dup_end_lsn.is_valid()
4154 2009969 : || written_size + key_values_total_size > target_file_size
4155 2009969 : || contains_hole
4156 : {
4157 : // ... if so, flush previous layer and prepare to write new one
4158 0 : new_layers.push(
4159 0 : writer
4160 0 : .take()
4161 0 : .unwrap()
4162 0 : .finish(prev_key.unwrap().next(), self)
4163 0 : .await?,
4164 : );
4165 0 : writer = None;
4166 0 :
4167 0 : if contains_hole {
4168 0 : // skip hole
4169 0 : next_hole += 1;
4170 0 : }
4171 2009969 : }
4172 30 : }
4173 : // Remember size of key value because at next iteration we will access next item
4174 2009999 : key_values_total_size = next_key_size;
4175 92001 : }
4176 2102000 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
4177 0 : Err(CompactionError::Other(anyhow::anyhow!(
4178 0 : "failpoint delta-layer-writer-fail-before-finish"
4179 0 : )))
4180 2102000 : });
4181 :
4182 2102000 : if !self.shard_identity.is_key_disposable(&key) {
4183 2102000 : if writer.is_none() {
4184 30 : // Create writer if not initiaized yet
4185 30 : writer = Some(
4186 : DeltaLayerWriter::new(
4187 30 : self.conf,
4188 30 : self.timeline_id,
4189 30 : self.tenant_shard_id,
4190 30 : key,
4191 30 : if dup_end_lsn.is_valid() {
4192 : // this is a layer containing slice of values of the same key
4193 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
4194 0 : dup_start_lsn..dup_end_lsn
4195 : } else {
4196 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
4197 30 : lsn_range.clone()
4198 : },
4199 : )
4200 15 : .await?,
4201 : );
4202 2101970 : }
4203 :
4204 2102000 : writer.as_mut().unwrap().put_value(key, lsn, value).await?;
4205 : } else {
4206 0 : debug!(
4207 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
4208 0 : key,
4209 0 : self.shard_identity.get_shard_number(&key)
4210 0 : );
4211 : }
4212 :
4213 2102000 : if !new_layers.is_empty() {
4214 0 : fail_point!("after-timeline-compacted-first-L1");
4215 2102000 : }
4216 :
4217 2102000 : prev_key = Some(key);
4218 : }
4219 30 : if let Some(writer) = writer {
4220 61 : new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
4221 0 : }
4222 :
4223 : // Sync layers
4224 30 : if !new_layers.is_empty() {
4225 : // Print a warning if the created layer is larger than double the target size
4226 : // Add two pages for potential overhead. This should in theory be already
4227 : // accounted for in the target calculation, but for very small targets,
4228 : // we still might easily hit the limit otherwise.
4229 30 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
4230 30 : for layer in new_layers.iter() {
4231 30 : if layer.layer_desc().file_size > warn_limit {
4232 0 : warn!(
4233 0 : %layer,
4234 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
4235 0 : );
4236 30 : }
4237 : }
4238 :
4239 : // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
4240 30 : let layer_paths: Vec<Utf8PathBuf> = new_layers
4241 30 : .iter()
4242 30 : .map(|l| l.local_path().to_owned())
4243 30 : .collect();
4244 30 :
4245 30 : // Fsync all the layer files and directory using multiple threads to
4246 30 : // minimize latency.
4247 30 : par_fsync::par_fsync_async(&layer_paths)
4248 30 : .await
4249 30 : .context("fsync all new layers")?;
4250 :
4251 30 : let timeline_dir = self
4252 30 : .conf
4253 30 : .timeline_path(&self.tenant_shard_id, &self.timeline_id);
4254 30 :
4255 30 : par_fsync::par_fsync_async(&[timeline_dir])
4256 30 : .await
4257 30 : .context("fsync of timeline dir")?;
4258 0 : }
4259 :
4260 30 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
4261 30 : stats.new_deltas_count = Some(new_layers.len());
4262 30 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
4263 30 :
4264 30 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
4265 30 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
4266 : {
4267 30 : Ok(stats_json) => {
4268 30 : info!(
4269 30 : stats_json = stats_json.as_str(),
4270 30 : "compact_level0_phase1 stats available"
4271 30 : )
4272 : }
4273 0 : Err(e) => {
4274 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
4275 : }
4276 : }
4277 :
4278 30 : Ok(CompactLevel0Phase1Result {
4279 30 : new_layers,
4280 30 : deltas_to_compact: deltas_to_compact
4281 30 : .into_iter()
4282 300 : .map(|x| x.drop_eviction_guard())
4283 30 : .collect::<Vec<_>>(),
4284 30 : })
4285 408 : }
4286 :
4287 : ///
4288 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
4289 : /// as Level 1 files.
4290 : ///
4291 408 : async fn compact_level0(
4292 408 : self: &Arc<Self>,
4293 408 : target_file_size: u64,
4294 408 : ctx: &RequestContext,
4295 408 : ) -> Result<(), CompactionError> {
4296 : let CompactLevel0Phase1Result {
4297 408 : new_layers,
4298 408 : deltas_to_compact,
4299 : } = {
4300 408 : let phase1_span = info_span!("compact_level0_phase1");
4301 408 : let ctx = ctx.attached_child();
4302 408 : let mut stats = CompactLevel0Phase1StatsBuilder {
4303 408 : version: Some(2),
4304 408 : tenant_id: Some(self.tenant_shard_id),
4305 408 : timeline_id: Some(self.timeline_id),
4306 408 : ..Default::default()
4307 408 : };
4308 408 :
4309 408 : let begin = tokio::time::Instant::now();
4310 408 : let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
4311 408 : let now = tokio::time::Instant::now();
4312 408 : stats.read_lock_acquisition_micros =
4313 408 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
4314 408 : self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
4315 408 : .instrument(phase1_span)
4316 39551 : .await?
4317 : };
4318 :
4319 408 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
4320 : // nothing to do
4321 378 : return Ok(());
4322 30 : }
4323 :
4324 30 : let mut guard = self.layers.write().await;
4325 :
4326 30 : let mut duplicated_layers = HashSet::new();
4327 30 :
4328 30 : let mut insert_layers = Vec::with_capacity(new_layers.len());
4329 :
4330 60 : for l in &new_layers {
4331 30 : if guard.contains(l.as_ref()) {
4332 : // expected in tests
4333 0 : tracing::error!(layer=%l, "duplicated L1 layer");
4334 :
4335 : // good ways to cause a duplicate: we repeatedly error after taking the writelock
4336 : // `guard` on self.layers. as of writing this, there are no error returns except
4337 : // for compact_level0_phase1 creating an L0, which does not happen in practice
4338 : // because we have not implemented L0 => L0 compaction.
4339 0 : duplicated_layers.insert(l.layer_desc().key());
4340 30 : } else if LayerMap::is_l0(l.layer_desc()) {
4341 0 : return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
4342 30 : } else {
4343 30 : insert_layers.push(l.clone());
4344 30 : }
4345 : }
4346 :
4347 30 : let remove_layers = {
4348 30 : let mut deltas_to_compact = deltas_to_compact;
4349 30 : // only remove those inputs which were not outputs
4350 300 : deltas_to_compact.retain(|l| !duplicated_layers.contains(&l.layer_desc().key()));
4351 30 : deltas_to_compact
4352 30 : };
4353 30 :
4354 30 : // deletion will happen later, the layer file manager calls garbage_collect_on_drop
4355 30 : guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
4356 :
4357 30 : if let Some(remote_client) = self.remote_client.as_ref() {
4358 30 : remote_client.schedule_compaction_update(&remove_layers, &new_layers)?;
4359 0 : }
4360 :
4361 30 : drop_wlock(guard);
4362 30 :
4363 30 : Ok(())
4364 408 : }
4365 :
4366 : /// Update information about which layer files need to be retained on
4367 : /// garbage collection. This is separate from actually performing the GC,
4368 : /// and is updated more frequently, so that compaction can remove obsolete
4369 : /// page versions more aggressively.
4370 : ///
4371 : /// TODO: that's wishful thinking, compaction doesn't actually do that
4372 : /// currently.
4373 : ///
4374 : /// The caller specifies how much history is needed with the 3 arguments:
4375 : ///
4376 : /// retain_lsns: keep a version of each page at these LSNs
4377 : /// cutoff_horizon: also keep everything newer than this LSN
4378 : /// pitr: the time duration required to keep data for PITR
4379 : ///
4380 : /// The 'retain_lsns' list is currently used to prevent removing files that
4381 : /// are needed by child timelines. In the future, the user might be able to
4382 : /// name additional points in time to retain. The caller is responsible for
4383 : /// collecting that information.
4384 : ///
4385 : /// The 'cutoff_horizon' point is used to retain recent versions that might still be
4386 : /// needed by read-only nodes. (As of this writing, the caller just passes
4387 : /// the latest LSN subtracted by a constant, and doesn't do anything smart
4388 : /// to figure out what read-only nodes might actually need.)
4389 : ///
4390 : /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
4391 : /// whether a record is needed for PITR.
4392 : ///
4393 : /// NOTE: This function holds a short-lived lock to protect the 'gc_info'
4394 : /// field, so that the three values passed as argument are stored
4395 : /// atomically. But the caller is responsible for ensuring that no new
4396 : /// branches are created that would need to be included in 'retain_lsns',
4397 : /// for example. The caller should hold `Tenant::gc_cs` lock to ensure
4398 : /// that.
4399 : ///
4400 816 : #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
4401 : pub(super) async fn update_gc_info(
4402 : &self,
4403 : retain_lsns: Vec<Lsn>,
4404 : cutoff_horizon: Lsn,
4405 : pitr: Duration,
4406 : cancel: &CancellationToken,
4407 : ctx: &RequestContext,
4408 : ) -> anyhow::Result<()> {
4409 : // First, calculate pitr_cutoff_timestamp and then convert it to LSN.
4410 : //
4411 : // Some unit tests depend on garbage-collection working even when
4412 : // CLOG data is missing, so that find_lsn_for_timestamp() doesn't
4413 : // work, so avoid calling it altogether if time-based retention is not
4414 : // configured. It would be pointless anyway.
4415 : let pitr_cutoff = if pitr != Duration::ZERO {
4416 : let now = SystemTime::now();
4417 : if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
4418 : let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
4419 :
4420 : match self
4421 : .find_lsn_for_timestamp(pitr_timestamp, cancel, ctx)
4422 : .await?
4423 : {
4424 : LsnForTimestamp::Present(lsn) => lsn,
4425 : LsnForTimestamp::Future(lsn) => {
4426 : // The timestamp is in the future. That sounds impossible,
4427 : // but what it really means is that there hasn't been
4428 : // any commits since the cutoff timestamp.
4429 0 : debug!("future({})", lsn);
4430 : cutoff_horizon
4431 : }
4432 : LsnForTimestamp::Past(lsn) => {
4433 0 : debug!("past({})", lsn);
4434 : // conservative, safe default is to remove nothing, when we
4435 : // have no commit timestamp data available
4436 : *self.get_latest_gc_cutoff_lsn()
4437 : }
4438 : LsnForTimestamp::NoData(lsn) => {
4439 0 : debug!("nodata({})", lsn);
4440 : // conservative, safe default is to remove nothing, when we
4441 : // have no commit timestamp data available
4442 : *self.get_latest_gc_cutoff_lsn()
4443 : }
4444 : }
4445 : } else {
4446 : // If we don't have enough data to convert to LSN,
4447 : // play safe and don't remove any layers.
4448 : *self.get_latest_gc_cutoff_lsn()
4449 : }
4450 : } else {
4451 : // No time-based retention was configured. Set time-based cutoff to
4452 : // same as LSN based.
4453 : cutoff_horizon
4454 : };
4455 :
4456 : // Grab the lock and update the values
4457 : *self.gc_info.write().unwrap() = GcInfo {
4458 : retain_lsns,
4459 : horizon_cutoff: cutoff_horizon,
4460 : pitr_cutoff,
4461 : };
4462 :
4463 : Ok(())
4464 : }
4465 :
4466 : /// Garbage collect layer files on a timeline that are no longer needed.
4467 : ///
4468 : /// Currently, we don't make any attempt at removing unneeded page versions
4469 : /// within a layer file. We can only remove the whole file if it's fully
4470 : /// obsolete.
4471 408 : pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
4472 408 : // this is most likely the background tasks, but it might be the spawned task from
4473 408 : // immediate_gc
4474 408 : let cancel = crate::task_mgr::shutdown_token();
4475 408 : let _g = tokio::select! {
4476 408 : guard = self.gc_lock.lock() => guard,
4477 : _ = self.cancel.cancelled() => return Ok(GcResult::default()),
4478 : _ = cancel.cancelled() => return Ok(GcResult::default()),
4479 : };
4480 408 : let timer = self.metrics.garbage_collect_histo.start_timer();
4481 :
4482 0 : fail_point!("before-timeline-gc");
4483 :
4484 : // Is the timeline being deleted?
4485 408 : if self.is_stopping() {
4486 0 : anyhow::bail!("timeline is Stopping");
4487 408 : }
4488 408 :
4489 408 : let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
4490 408 : let gc_info = self.gc_info.read().unwrap();
4491 408 :
4492 408 : let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
4493 408 : let pitr_cutoff = gc_info.pitr_cutoff;
4494 408 : let retain_lsns = gc_info.retain_lsns.clone();
4495 408 : (horizon_cutoff, pitr_cutoff, retain_lsns)
4496 408 : };
4497 408 :
4498 408 : let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
4499 :
4500 408 : let res = self
4501 408 : .gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
4502 408 : .instrument(
4503 408 : info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
4504 : )
4505 1 : .await?;
4506 :
4507 : // only record successes
4508 408 : timer.stop_and_record();
4509 408 :
4510 408 : Ok(res)
4511 408 : }
4512 :
4513 408 : async fn gc_timeline(
4514 408 : &self,
4515 408 : horizon_cutoff: Lsn,
4516 408 : pitr_cutoff: Lsn,
4517 408 : retain_lsns: Vec<Lsn>,
4518 408 : new_gc_cutoff: Lsn,
4519 408 : ) -> anyhow::Result<GcResult> {
4520 408 : let now = SystemTime::now();
4521 408 : let mut result: GcResult = GcResult::default();
4522 408 :
4523 408 : // Nothing to GC. Return early.
4524 408 : let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
4525 408 : if latest_gc_cutoff >= new_gc_cutoff {
4526 0 : info!(
4527 0 : "Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
4528 0 : );
4529 0 : return Ok(result);
4530 408 : }
4531 :
4532 : // We need to ensure that no one tries to read page versions or create
4533 : // branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
4534 : // for details. This will block until the old value is no longer in use.
4535 : //
4536 : // The GC cutoff should only ever move forwards.
4537 408 : let waitlist = {
4538 408 : let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
4539 408 : ensure!(
4540 408 : *write_guard <= new_gc_cutoff,
4541 0 : "Cannot move GC cutoff LSN backwards (was {}, new {})",
4542 0 : *write_guard,
4543 : new_gc_cutoff
4544 : );
4545 408 : write_guard.store_and_unlock(new_gc_cutoff)
4546 408 : };
4547 408 : waitlist.wait().await;
4548 :
4549 408 : info!("GC starting");
4550 :
4551 0 : debug!("retain_lsns: {:?}", retain_lsns);
4552 :
4553 408 : let mut layers_to_remove = Vec::new();
4554 408 : let mut wanted_image_layers = KeySpaceRandomAccum::default();
4555 :
4556 : // Scan all layers in the timeline (remote or on-disk).
4557 : //
4558 : // Garbage collect the layer if all conditions are satisfied:
4559 : // 1. it is older than cutoff LSN;
4560 : // 2. it is older than PITR interval;
4561 : // 3. it doesn't need to be retained for 'retain_lsns';
4562 : // 4. newer on-disk image layers cover the layer's whole key range
4563 : //
4564 : // TODO holding a write lock is too agressive and avoidable
4565 408 : let mut guard = self.layers.write().await;
4566 408 : let layers = guard.layer_map();
4567 2404 : 'outer: for l in layers.iter_historic_layers() {
4568 2404 : result.layers_total += 1;
4569 2404 :
4570 2404 : // 1. Is it newer than GC horizon cutoff point?
4571 2404 : if l.get_lsn_range().end > horizon_cutoff {
4572 0 : debug!(
4573 0 : "keeping {} because it's newer than horizon_cutoff {}",
4574 0 : l.filename(),
4575 0 : horizon_cutoff,
4576 0 : );
4577 408 : result.layers_needed_by_cutoff += 1;
4578 408 : continue 'outer;
4579 1996 : }
4580 1996 :
4581 1996 : // 2. It is newer than PiTR cutoff point?
4582 1996 : if l.get_lsn_range().end > pitr_cutoff {
4583 0 : debug!(
4584 0 : "keeping {} because it's newer than pitr_cutoff {}",
4585 0 : l.filename(),
4586 0 : pitr_cutoff,
4587 0 : );
4588 0 : result.layers_needed_by_pitr += 1;
4589 0 : continue 'outer;
4590 1996 : }
4591 :
4592 : // 3. Is it needed by a child branch?
4593 : // NOTE With that we would keep data that
4594 : // might be referenced by child branches forever.
4595 : // We can track this in child timeline GC and delete parent layers when
4596 : // they are no longer needed. This might be complicated with long inheritance chains.
4597 : //
4598 : // TODO Vec is not a great choice for `retain_lsns`
4599 1996 : for retain_lsn in &retain_lsns {
4600 : // start_lsn is inclusive
4601 12 : if &l.get_lsn_range().start <= retain_lsn {
4602 0 : debug!(
4603 0 : "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
4604 0 : l.filename(),
4605 0 : retain_lsn,
4606 0 : l.is_incremental(),
4607 0 : );
4608 12 : result.layers_needed_by_branches += 1;
4609 12 : continue 'outer;
4610 0 : }
4611 : }
4612 :
4613 : // 4. Is there a later on-disk layer for this relation?
4614 : //
4615 : // The end-LSN is exclusive, while disk_consistent_lsn is
4616 : // inclusive. For example, if disk_consistent_lsn is 100, it is
4617 : // OK for a delta layer to have end LSN 101, but if the end LSN
4618 : // is 102, then it might not have been fully flushed to disk
4619 : // before crash.
4620 : //
4621 : // For example, imagine that the following layers exist:
4622 : //
4623 : // 1000 - image (A)
4624 : // 1000-2000 - delta (B)
4625 : // 2000 - image (C)
4626 : // 2000-3000 - delta (D)
4627 : // 3000 - image (E)
4628 : //
4629 : // If GC horizon is at 2500, we can remove layers A and B, but
4630 : // we cannot remove C, even though it's older than 2500, because
4631 : // the delta layer 2000-3000 depends on it.
4632 1984 : if !layers
4633 1984 : .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))
4634 : {
4635 0 : debug!("keeping {} because it is the latest layer", l.filename());
4636 : // Collect delta key ranges that need image layers to allow garbage
4637 : // collecting the layers.
4638 : // It is not so obvious whether we need to propagate information only about
4639 : // delta layers. Image layers can form "stairs" preventing old image from been deleted.
4640 : // But image layers are in any case less sparse than delta layers. Also we need some
4641 : // protection from replacing recent image layers with new one after each GC iteration.
4642 1984 : if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
4643 0 : wanted_image_layers.add_range(l.get_key_range());
4644 1984 : }
4645 1984 : result.layers_not_updated += 1;
4646 1984 : continue 'outer;
4647 0 : }
4648 :
4649 : // We didn't find any reason to keep this file, so remove it.
4650 0 : debug!(
4651 0 : "garbage collecting {} is_dropped: xx is_incremental: {}",
4652 0 : l.filename(),
4653 0 : l.is_incremental(),
4654 0 : );
4655 0 : layers_to_remove.push(l);
4656 : }
4657 408 : self.wanted_image_layers
4658 408 : .lock()
4659 408 : .unwrap()
4660 408 : .replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
4661 408 :
4662 408 : if !layers_to_remove.is_empty() {
4663 : // Persist the new GC cutoff value in the metadata file, before
4664 : // we actually remove anything.
4665 : //
4666 : // This does not in fact have any effect as we no longer consider local metadata unless
4667 : // running without remote storage.
4668 : //
4669 : // This unconditionally schedules also an index_part.json update, even though, we will
4670 : // be doing one a bit later with the unlinked gc'd layers.
4671 : //
4672 : // TODO: remove when implementing <https://github.com/neondatabase/neon/issues/4099>.
4673 0 : self.update_metadata_file(self.disk_consistent_lsn.load(), None)
4674 0 : .await?;
4675 :
4676 0 : let gc_layers = layers_to_remove
4677 0 : .iter()
4678 0 : .map(|x| guard.get_from_desc(x))
4679 0 : .collect::<Vec<Layer>>();
4680 0 :
4681 0 : result.layers_removed = gc_layers.len() as u64;
4682 :
4683 0 : if let Some(remote_client) = self.remote_client.as_ref() {
4684 0 : remote_client.schedule_gc_update(&gc_layers)?;
4685 0 : }
4686 :
4687 0 : guard.finish_gc_timeline(&gc_layers);
4688 0 :
4689 0 : #[cfg(feature = "testing")]
4690 0 : {
4691 0 : result.doomed_layers = gc_layers;
4692 0 : }
4693 408 : }
4694 :
4695 408 : info!(
4696 408 : "GC completed removing {} layers, cutoff {}",
4697 408 : result.layers_removed, new_gc_cutoff
4698 408 : );
4699 :
4700 408 : result.elapsed = now.elapsed()?;
4701 408 : Ok(result)
4702 408 : }
4703 :
4704 : /// Reconstruct a value, using the given base image and WAL records in 'data'.
4705 502698 : async fn reconstruct_value(
4706 502698 : &self,
4707 502698 : key: Key,
4708 502698 : request_lsn: Lsn,
4709 502698 : mut data: ValueReconstructState,
4710 502698 : ) -> Result<Bytes, PageReconstructError> {
4711 502698 : // Perform WAL redo if needed
4712 502698 : data.records.reverse();
4713 502698 :
4714 502698 : // If we have a page image, and no WAL, we're all set
4715 502698 : if data.records.is_empty() {
4716 502690 : if let Some((img_lsn, img)) = &data.img {
4717 0 : trace!(
4718 0 : "found page image for key {} at {}, no WAL redo required, req LSN {}",
4719 0 : key,
4720 0 : img_lsn,
4721 0 : request_lsn,
4722 0 : );
4723 502690 : Ok(img.clone())
4724 : } else {
4725 0 : Err(PageReconstructError::from(anyhow!(
4726 0 : "base image for {key} at {request_lsn} not found"
4727 0 : )))
4728 : }
4729 : } else {
4730 : // We need to do WAL redo.
4731 : //
4732 : // If we don't have a base image, then the oldest WAL record better initialize
4733 : // the page
4734 8 : if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
4735 0 : Err(PageReconstructError::from(anyhow!(
4736 0 : "Base image for {} at {} not found, but got {} WAL records",
4737 0 : key,
4738 0 : request_lsn,
4739 0 : data.records.len()
4740 0 : )))
4741 : } else {
4742 8 : if data.img.is_some() {
4743 0 : trace!(
4744 0 : "found {} WAL records and a base image for {} at {}, performing WAL redo",
4745 0 : data.records.len(),
4746 0 : key,
4747 0 : request_lsn
4748 0 : );
4749 : } else {
4750 0 : trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
4751 : };
4752 :
4753 8 : let last_rec_lsn = data.records.last().unwrap().0;
4754 :
4755 8 : let img = match self
4756 8 : .walredo_mgr
4757 8 : .as_ref()
4758 8 : .context("timeline has no walredo manager")
4759 8 : .map_err(PageReconstructError::WalRedo)?
4760 8 : .request_redo(key, request_lsn, data.img, data.records, self.pg_version)
4761 0 : .await
4762 8 : .context("reconstruct a page image")
4763 : {
4764 8 : Ok(img) => img,
4765 0 : Err(e) => return Err(PageReconstructError::WalRedo(e)),
4766 : };
4767 :
4768 8 : if img.len() == page_cache::PAGE_SZ {
4769 0 : let cache = page_cache::get();
4770 0 : if let Err(e) = cache
4771 0 : .memorize_materialized_page(
4772 0 : self.tenant_shard_id,
4773 0 : self.timeline_id,
4774 0 : key,
4775 0 : last_rec_lsn,
4776 0 : &img,
4777 0 : )
4778 0 : .await
4779 0 : .context("Materialized page memoization failed")
4780 : {
4781 0 : return Err(PageReconstructError::from(e));
4782 0 : }
4783 8 : }
4784 :
4785 8 : Ok(img)
4786 : }
4787 : }
4788 502698 : }
4789 :
4790 0 : pub(crate) async fn spawn_download_all_remote_layers(
4791 0 : self: Arc<Self>,
4792 0 : request: DownloadRemoteLayersTaskSpawnRequest,
4793 0 : ) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
4794 0 : use pageserver_api::models::DownloadRemoteLayersTaskState;
4795 0 :
4796 0 : // this is not really needed anymore; it has tests which really check the return value from
4797 0 : // http api. it would be better not to maintain this anymore.
4798 0 :
4799 0 : let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
4800 0 : if let Some(st) = &*status_guard {
4801 0 : match &st.state {
4802 : DownloadRemoteLayersTaskState::Running => {
4803 0 : return Err(st.clone());
4804 : }
4805 : DownloadRemoteLayersTaskState::ShutDown
4806 0 : | DownloadRemoteLayersTaskState::Completed => {
4807 0 : *status_guard = None;
4808 0 : }
4809 : }
4810 0 : }
4811 :
4812 0 : let self_clone = Arc::clone(&self);
4813 0 : let task_id = task_mgr::spawn(
4814 0 : task_mgr::BACKGROUND_RUNTIME.handle(),
4815 0 : task_mgr::TaskKind::DownloadAllRemoteLayers,
4816 0 : Some(self.tenant_shard_id),
4817 0 : Some(self.timeline_id),
4818 0 : "download all remote layers task",
4819 : false,
4820 0 : async move {
4821 0 : self_clone.download_all_remote_layers(request).await;
4822 0 : let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
4823 0 : match &mut *status_guard {
4824 : None => {
4825 0 : warn!("tasks status is supposed to be Some(), since we are running");
4826 : }
4827 0 : Some(st) => {
4828 0 : let exp_task_id = format!("{}", task_mgr::current_task_id().unwrap());
4829 0 : if st.task_id != exp_task_id {
4830 0 : warn!("task id changed while we were still running, expecting {} but have {}", exp_task_id, st.task_id);
4831 0 : } else {
4832 0 : st.state = DownloadRemoteLayersTaskState::Completed;
4833 0 : }
4834 : }
4835 : };
4836 0 : Ok(())
4837 0 : }
4838 0 : .instrument(info_span!(parent: None, "download_all_remote_layers", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
4839 : );
4840 :
4841 0 : let initial_info = DownloadRemoteLayersTaskInfo {
4842 0 : task_id: format!("{task_id}"),
4843 0 : state: DownloadRemoteLayersTaskState::Running,
4844 0 : total_layer_count: 0,
4845 0 : successful_download_count: 0,
4846 0 : failed_download_count: 0,
4847 0 : };
4848 0 : *status_guard = Some(initial_info.clone());
4849 0 :
4850 0 : Ok(initial_info)
4851 0 : }
4852 :
4853 0 : async fn download_all_remote_layers(
4854 0 : self: &Arc<Self>,
4855 0 : request: DownloadRemoteLayersTaskSpawnRequest,
4856 0 : ) {
4857 : use pageserver_api::models::DownloadRemoteLayersTaskState;
4858 :
4859 0 : let remaining = {
4860 0 : let guard = self.layers.read().await;
4861 0 : guard
4862 0 : .layer_map()
4863 0 : .iter_historic_layers()
4864 0 : .map(|desc| guard.get_from_desc(&desc))
4865 0 : .collect::<Vec<_>>()
4866 0 : };
4867 0 : let total_layer_count = remaining.len();
4868 0 :
4869 0 : macro_rules! lock_status {
4870 0 : ($st:ident) => {
4871 0 : let mut st = self.download_all_remote_layers_task_info.write().unwrap();
4872 0 : let st = st
4873 0 : .as_mut()
4874 0 : .expect("this function is only called after the task has been spawned");
4875 0 : assert_eq!(
4876 0 : st.task_id,
4877 0 : format!(
4878 0 : "{}",
4879 0 : task_mgr::current_task_id().expect("we run inside a task_mgr task")
4880 0 : )
4881 0 : );
4882 0 : let $st = st;
4883 0 : };
4884 0 : }
4885 0 :
4886 0 : {
4887 0 : lock_status!(st);
4888 0 : st.total_layer_count = total_layer_count as u64;
4889 0 : }
4890 0 :
4891 0 : let mut remaining = remaining.into_iter();
4892 0 : let mut have_remaining = true;
4893 0 : let mut js = tokio::task::JoinSet::new();
4894 0 :
4895 0 : let cancel = task_mgr::shutdown_token();
4896 0 :
4897 0 : let limit = request.max_concurrent_downloads;
4898 :
4899 : loop {
4900 0 : while js.len() < limit.get() && have_remaining && !cancel.is_cancelled() {
4901 0 : let Some(next) = remaining.next() else {
4902 0 : have_remaining = false;
4903 0 : break;
4904 : };
4905 :
4906 0 : let span = tracing::info_span!("download", layer = %next);
4907 :
4908 0 : js.spawn(
4909 0 : async move {
4910 0 : let res = next.download().await;
4911 0 : (next, res)
4912 0 : }
4913 0 : .instrument(span),
4914 0 : );
4915 : }
4916 :
4917 0 : while let Some(res) = js.join_next().await {
4918 0 : match res {
4919 : Ok((_, Ok(_))) => {
4920 0 : lock_status!(st);
4921 0 : st.successful_download_count += 1;
4922 : }
4923 0 : Ok((layer, Err(e))) => {
4924 0 : tracing::error!(%layer, "download failed: {e:#}");
4925 0 : lock_status!(st);
4926 0 : st.failed_download_count += 1;
4927 : }
4928 0 : Err(je) if je.is_cancelled() => unreachable!("not used here"),
4929 0 : Err(je) if je.is_panic() => {
4930 0 : lock_status!(st);
4931 0 : st.failed_download_count += 1;
4932 : }
4933 0 : Err(je) => tracing::warn!("unknown joinerror: {je:?}"),
4934 : }
4935 : }
4936 :
4937 0 : if js.is_empty() && (!have_remaining || cancel.is_cancelled()) {
4938 0 : break;
4939 0 : }
4940 : }
4941 :
4942 : {
4943 0 : lock_status!(st);
4944 0 : st.state = DownloadRemoteLayersTaskState::Completed;
4945 0 : }
4946 0 : }
4947 :
4948 0 : pub(crate) fn get_download_all_remote_layers_task_info(
4949 0 : &self,
4950 0 : ) -> Option<DownloadRemoteLayersTaskInfo> {
4951 0 : self.download_all_remote_layers_task_info
4952 0 : .read()
4953 0 : .unwrap()
4954 0 : .clone()
4955 0 : }
4956 : }
4957 :
4958 : impl Timeline {
4959 : /// Returns non-remote layers for eviction.
4960 0 : pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
4961 0 : let guard = self.layers.read().await;
4962 0 : let mut max_layer_size: Option<u64> = None;
4963 :
4964 0 : let resident_layers = guard
4965 0 : .resident_layers()
4966 0 : .map(|layer| {
4967 0 : let file_size = layer.layer_desc().file_size;
4968 0 : max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
4969 0 :
4970 0 : let last_activity_ts = layer.access_stats().latest_activity_or_now();
4971 0 :
4972 0 : EvictionCandidate {
4973 0 : layer: layer.into(),
4974 0 : last_activity_ts,
4975 0 : relative_last_activity: finite_f32::FiniteF32::ZERO,
4976 0 : }
4977 0 : })
4978 0 : .collect()
4979 0 : .await;
4980 :
4981 0 : DiskUsageEvictionInfo {
4982 0 : max_layer_size,
4983 0 : resident_layers,
4984 0 : }
4985 0 : }
4986 :
4987 556 : pub(crate) fn get_shard_index(&self) -> ShardIndex {
4988 556 : ShardIndex {
4989 556 : shard_number: self.tenant_shard_id.shard_number,
4990 556 : shard_count: self.tenant_shard_id.shard_count,
4991 556 : }
4992 556 : }
4993 : }
4994 :
4995 : type TraversalPathItem = (
4996 : ValueReconstructResult,
4997 : Lsn,
4998 : Box<dyn Send + FnOnce() -> TraversalId>,
4999 : );
5000 :
5001 : /// Helper function for get_reconstruct_data() to add the path of layers traversed
5002 : /// to an error, as anyhow context information.
5003 106 : fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructError {
5004 106 : // We want the original 'msg' to be the outermost context. The outermost context
5005 106 : // is the most high-level information, which also gets propagated to the client.
5006 106 : let mut msg_iter = path
5007 106 : .into_iter()
5008 108 : .map(|(r, c, l)| {
5009 108 : format!(
5010 108 : "layer traversal: result {:?}, cont_lsn {}, layer: {}",
5011 108 : r,
5012 108 : c,
5013 108 : l(),
5014 108 : )
5015 108 : })
5016 106 : .chain(std::iter::once(msg));
5017 106 : // Construct initial message from the first traversed layer
5018 106 : let err = anyhow!(msg_iter.next().unwrap());
5019 106 :
5020 106 : // Append all subsequent traversals, and the error message 'msg', as contexts.
5021 108 : let msg = msg_iter.fold(err, |err, msg| err.context(msg));
5022 106 : PageReconstructError::from(msg)
5023 106 : }
5024 :
5025 : struct TimelineWriterState {
5026 : open_layer: Arc<InMemoryLayer>,
5027 : current_size: u64,
5028 : // Previous Lsn which passed through
5029 : prev_lsn: Option<Lsn>,
5030 : // Largest Lsn which passed through the current writer
5031 : max_lsn: Option<Lsn>,
5032 : // Cached details of the last freeze. Avoids going trough the atomic/lock on every put.
5033 : cached_last_freeze_at: Lsn,
5034 : cached_last_freeze_ts: Instant,
5035 : }
5036 :
5037 : impl TimelineWriterState {
5038 2627992 : fn new(
5039 2627992 : open_layer: Arc<InMemoryLayer>,
5040 2627992 : current_size: u64,
5041 2627992 : last_freeze_at: Lsn,
5042 2627992 : last_freeze_ts: Instant,
5043 2627992 : ) -> Self {
5044 2627992 : Self {
5045 2627992 : open_layer,
5046 2627992 : current_size,
5047 2627992 : prev_lsn: None,
5048 2627992 : max_lsn: None,
5049 2627992 : cached_last_freeze_at: last_freeze_at,
5050 2627992 : cached_last_freeze_ts: last_freeze_ts,
5051 2627992 : }
5052 2627992 : }
5053 : }
5054 :
5055 : /// Various functions to mutate the timeline.
5056 : // TODO Currently, Deref is used to allow easy access to read methods from this trait.
5057 : // This is probably considered a bad practice in Rust and should be fixed eventually,
5058 : // but will cause large code changes.
5059 : pub(crate) struct TimelineWriter<'a> {
5060 : tl: &'a Timeline,
5061 : write_guard: tokio::sync::MutexGuard<'a, Option<TimelineWriterState>>,
5062 : }
5063 :
5064 : impl Deref for TimelineWriter<'_> {
5065 : type Target = Timeline;
5066 :
5067 5258576 : fn deref(&self) -> &Self::Target {
5068 5258576 : self.tl
5069 5258576 : }
5070 : }
5071 :
5072 : impl Drop for TimelineWriter<'_> {
5073 2957008 : fn drop(&mut self) {
5074 2957008 : self.write_guard.take();
5075 2957008 : }
5076 : }
5077 :
5078 : enum OpenLayerAction {
5079 : Roll,
5080 : Open,
5081 : None,
5082 : }
5083 :
5084 : impl<'a> TimelineWriter<'a> {
5085 : /// Put a new page version that can be constructed from a WAL record
5086 : ///
5087 : /// This will implicitly extend the relation, if the page is beyond the
5088 : /// current end-of-file.
5089 2913766 : pub(crate) async fn put(
5090 2913766 : &mut self,
5091 2913766 : key: Key,
5092 2913766 : lsn: Lsn,
5093 2913766 : value: &Value,
5094 2913766 : ctx: &RequestContext,
5095 2913766 : ) -> anyhow::Result<()> {
5096 2913766 : // Avoid doing allocations for "small" values.
5097 2913766 : // In the regression test suite, the limit of 256 avoided allocations in 95% of cases:
5098 2913766 : // https://github.com/neondatabase/neon/pull/5056#discussion_r1301975061
5099 2913766 : let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
5100 2913766 : buf.clear();
5101 2913766 : value.ser_into(&mut buf)?;
5102 2913766 : let buf_size: u64 = buf.len().try_into().expect("oversized value buf");
5103 2913766 :
5104 2913766 : let action = self.get_open_layer_action(lsn, buf_size);
5105 2913766 : let layer = self.handle_open_layer_action(lsn, action).await?;
5106 2913766 : let res = layer.put_value(key, lsn, &buf, ctx).await;
5107 :
5108 2913766 : if res.is_ok() {
5109 2913766 : // Update the current size only when the entire write was ok.
5110 2913766 : // In case of failures, we may have had partial writes which
5111 2913766 : // render the size tracking out of sync. That's ok because
5112 2913766 : // the checkpoint distance should be significantly smaller
5113 2913766 : // than the S3 single shot upload limit of 5GiB.
5114 2913766 : let state = self.write_guard.as_mut().unwrap();
5115 2913766 :
5116 2913766 : state.current_size += buf_size;
5117 2913766 : state.prev_lsn = Some(lsn);
5118 2913766 : state.max_lsn = std::cmp::max(state.max_lsn, Some(lsn));
5119 2913766 : }
5120 :
5121 2913766 : res
5122 2913766 : }
5123 :
5124 2913768 : async fn handle_open_layer_action(
5125 2913768 : &mut self,
5126 2913768 : at: Lsn,
5127 2913768 : action: OpenLayerAction,
5128 2913768 : ) -> anyhow::Result<&Arc<InMemoryLayer>> {
5129 2913768 : match action {
5130 : OpenLayerAction::Roll => {
5131 0 : let max_lsn = self.write_guard.as_ref().unwrap().max_lsn.unwrap();
5132 0 : self.tl.freeze_inmem_layer_at(max_lsn).await;
5133 :
5134 0 : let now = Instant::now();
5135 0 : *(self.last_freeze_ts.write().unwrap()) = now;
5136 0 :
5137 0 : self.tl.flush_frozen_layers();
5138 0 :
5139 0 : let current_size = self.write_guard.as_ref().unwrap().current_size;
5140 0 : if current_size > self.get_checkpoint_distance() {
5141 0 : warn!("Flushed oversized open layer with size {}", current_size)
5142 0 : }
5143 :
5144 0 : assert!(self.write_guard.is_some());
5145 :
5146 0 : let layer = self.tl.get_layer_for_write(at).await?;
5147 0 : let initial_size = layer.size().await?;
5148 0 : self.write_guard.replace(TimelineWriterState::new(
5149 0 : layer,
5150 0 : initial_size,
5151 0 : Lsn(max_lsn.0 + 1),
5152 0 : now,
5153 0 : ));
5154 : }
5155 : OpenLayerAction::Open => {
5156 2627992 : assert!(self.write_guard.is_none());
5157 :
5158 2627992 : let layer = self.tl.get_layer_for_write(at).await?;
5159 2627992 : let initial_size = layer.size().await?;
5160 :
5161 2627992 : let last_freeze_at = self.last_freeze_at.load();
5162 2627992 : let last_freeze_ts = *self.last_freeze_ts.read().unwrap();
5163 2627992 : self.write_guard.replace(TimelineWriterState::new(
5164 2627992 : layer,
5165 2627992 : initial_size,
5166 2627992 : last_freeze_at,
5167 2627992 : last_freeze_ts,
5168 2627992 : ));
5169 : }
5170 : OpenLayerAction::None => {
5171 285776 : assert!(self.write_guard.is_some());
5172 : }
5173 : }
5174 :
5175 2913768 : Ok(&self.write_guard.as_ref().unwrap().open_layer)
5176 2913768 : }
5177 :
5178 2913768 : fn get_open_layer_action(&self, lsn: Lsn, new_value_size: u64) -> OpenLayerAction {
5179 2913768 : let state = &*self.write_guard;
5180 2913768 : let Some(state) = &state else {
5181 2627992 : return OpenLayerAction::Open;
5182 : };
5183 :
5184 285776 : if state.prev_lsn == Some(lsn) {
5185 : // Rolling mid LSN is not supported by downstream code.
5186 : // Hence, only roll at LSN boundaries.
5187 285728 : return OpenLayerAction::None;
5188 48 : }
5189 48 :
5190 48 : let distance = lsn.widening_sub(state.cached_last_freeze_at);
5191 48 : let proposed_open_layer_size = state.current_size + new_value_size;
5192 48 :
5193 48 : // Rolling the open layer can be triggered by:
5194 48 : // 1. The distance from the last LSN we rolled at. This bounds the amount of WAL that
5195 48 : // the safekeepers need to store. For sharded tenants, we multiply by shard count to
5196 48 : // account for how writes are distributed across shards: we expect each node to consume
5197 48 : // 1/count of the LSN on average.
5198 48 : // 2. The size of the currently open layer.
5199 48 : // 3. The time since the last roll. It helps safekeepers to regard pageserver as caught
5200 48 : // up and suspend activity.
5201 48 : if distance
5202 48 : >= self.get_checkpoint_distance() as i128 * self.shard_identity.count.count() as i128
5203 : {
5204 0 : info!(
5205 0 : "Will roll layer at {} with layer size {} due to LSN distance ({})",
5206 0 : lsn, state.current_size, distance
5207 0 : );
5208 :
5209 0 : OpenLayerAction::Roll
5210 48 : } else if state.current_size > 0
5211 48 : && proposed_open_layer_size >= self.get_checkpoint_distance()
5212 : {
5213 0 : info!(
5214 0 : "Will roll layer at {} with layer size {} due to layer size ({})",
5215 0 : lsn, state.current_size, proposed_open_layer_size
5216 0 : );
5217 :
5218 0 : OpenLayerAction::Roll
5219 48 : } else if distance > 0
5220 48 : && state.cached_last_freeze_ts.elapsed() >= self.get_checkpoint_timeout()
5221 : {
5222 0 : info!(
5223 0 : "Will roll layer at {} with layer size {} due to time since last flush ({:?})",
5224 0 : lsn,
5225 0 : state.current_size,
5226 0 : state.cached_last_freeze_ts.elapsed()
5227 0 : );
5228 :
5229 0 : OpenLayerAction::Roll
5230 : } else {
5231 48 : OpenLayerAction::None
5232 : }
5233 2913768 : }
5234 :
5235 : /// Put a batch keys at the specified Lsns.
5236 : ///
5237 : /// The batch should be sorted by Lsn such that it's safe
5238 : /// to roll the open layer mid batch.
5239 413936 : pub(crate) async fn put_batch(
5240 413936 : &mut self,
5241 413936 : batch: Vec<(Key, Lsn, Value)>,
5242 413936 : ctx: &RequestContext,
5243 413936 : ) -> anyhow::Result<()> {
5244 1113600 : for (key, lsn, val) in batch {
5245 699664 : self.put(key, lsn, &val, ctx).await?
5246 : }
5247 :
5248 413936 : Ok(())
5249 413936 : }
5250 :
5251 2 : pub(crate) async fn delete_batch(&mut self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
5252 2 : if let Some((_, lsn)) = batch.first() {
5253 2 : let action = self.get_open_layer_action(*lsn, 0);
5254 2 : let layer = self.handle_open_layer_action(*lsn, action).await?;
5255 2 : layer.put_tombstones(batch).await?;
5256 0 : }
5257 :
5258 2 : Ok(())
5259 2 : }
5260 :
5261 : /// Track the end of the latest digested WAL record.
5262 : /// Remember the (end of) last valid WAL record remembered in the timeline.
5263 : ///
5264 : /// Call this after you have finished writing all the WAL up to 'lsn'.
5265 : ///
5266 : /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for
5267 : /// the 'lsn' or anything older. The previous last record LSN is stored alongside
5268 : /// the latest and can be read.
5269 3102908 : pub(crate) fn finish_write(&self, new_lsn: Lsn) {
5270 3102908 : self.tl.finish_write(new_lsn);
5271 3102908 : }
5272 :
5273 270570 : pub(crate) fn update_current_logical_size(&self, delta: i64) {
5274 270570 : self.tl.update_current_logical_size(delta)
5275 270570 : }
5276 : }
5277 :
5278 : // We need TimelineWriter to be send in upcoming conversion of
5279 : // Timeline::layers to tokio::sync::RwLock.
5280 2 : #[test]
5281 2 : fn is_send() {
5282 2 : fn _assert_send<T: Send>() {}
5283 2 : _assert_send::<TimelineWriter<'_>>();
5284 2 : }
5285 :
5286 : /// Add a suffix to a layer file's name: .{num}.old
5287 : /// Uses the first available num (starts at 0)
5288 0 : fn rename_to_backup(path: &Utf8Path) -> anyhow::Result<()> {
5289 0 : let filename = path
5290 0 : .file_name()
5291 0 : .ok_or_else(|| anyhow!("Path {path} don't have a file name"))?;
5292 0 : let mut new_path = path.to_owned();
5293 :
5294 0 : for i in 0u32.. {
5295 0 : new_path.set_file_name(format!("{filename}.{i}.old"));
5296 0 : if !new_path.exists() {
5297 0 : std::fs::rename(path, &new_path)
5298 0 : .with_context(|| format!("rename {path:?} to {new_path:?}"))?;
5299 0 : return Ok(());
5300 0 : }
5301 : }
5302 :
5303 0 : bail!("couldn't find an unused backup number for {:?}", path)
5304 0 : }
5305 :
5306 : #[cfg(test)]
5307 : mod tests {
5308 : use utils::{id::TimelineId, lsn::Lsn};
5309 :
5310 : use crate::tenant::{
5311 : harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline,
5312 : };
5313 :
5314 2 : #[tokio::test]
5315 2 : async fn two_layer_eviction_attempts_at_the_same_time() {
5316 2 : let harness =
5317 2 : TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
5318 2 :
5319 2 : let ctx = any_context();
5320 2 : let tenant = harness.do_try_load(&ctx).await.unwrap();
5321 2 : let timeline = tenant
5322 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
5323 6 : .await
5324 2 : .unwrap();
5325 2 :
5326 2 : let layer = find_some_layer(&timeline).await;
5327 2 : let layer = layer
5328 2 : .keep_resident()
5329 2 : .await
5330 2 : .expect("no download => no downloading errors")
5331 2 : .expect("should had been resident")
5332 2 : .drop_eviction_guard();
5333 2 :
5334 2 : let first = async { layer.evict_and_wait().await };
5335 2 : let second = async { layer.evict_and_wait().await };
5336 2 :
5337 4 : let (first, second) = tokio::join!(first, second);
5338 2 :
5339 2 : let res = layer.keep_resident().await;
5340 2 : assert!(matches!(res, Ok(None)), "{res:?}");
5341 2 :
5342 2 : match (first, second) {
5343 2 : (Ok(()), Ok(())) => {
5344 0 : // because there are no more timeline locks being taken on eviction path, we can
5345 0 : // witness all three outcomes here.
5346 0 : }
5347 2 : (Ok(()), Err(EvictionError::NotFound)) | (Err(EvictionError::NotFound), Ok(())) => {
5348 2 : // if one completes before the other, this is fine just as well.
5349 2 : }
5350 2 : other => unreachable!("unexpected {:?}", other),
5351 2 : }
5352 2 : }
5353 :
5354 2 : fn any_context() -> crate::context::RequestContext {
5355 2 : use crate::context::*;
5356 2 : use crate::task_mgr::*;
5357 2 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
5358 2 : }
5359 :
5360 2 : async fn find_some_layer(timeline: &Timeline) -> Layer {
5361 2 : let layers = timeline.layers.read().await;
5362 2 : let desc = layers
5363 2 : .layer_map()
5364 2 : .iter_historic_layers()
5365 2 : .next()
5366 2 : .expect("must find one layer to evict");
5367 2 :
5368 2 : layers.get_from_desc(&desc)
5369 2 : }
5370 : }
|