TLA Line data Source code
1 : pub mod delete;
2 : mod eviction_task;
3 : mod init;
4 : pub mod layer_manager;
5 : mod logical_size;
6 : pub mod span;
7 : pub mod uninit;
8 : mod walreceiver;
9 :
10 : use anyhow::{anyhow, bail, ensure, Context, Result};
11 : use bytes::Bytes;
12 : use camino::{Utf8Path, Utf8PathBuf};
13 : use fail::fail_point;
14 : use futures::StreamExt;
15 : use itertools::Itertools;
16 : use pageserver_api::models::{
17 : DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
18 : DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus,
19 : TimelineState,
20 : };
21 : use remote_storage::GenericRemoteStorage;
22 : use serde_with::serde_as;
23 : use storage_broker::BrokerClientChannel;
24 : use tokio::runtime::Handle;
25 : use tokio::sync::{oneshot, watch, TryAcquireError};
26 : use tokio_util::sync::CancellationToken;
27 : use tracing::*;
28 : use utils::id::TenantTimelineId;
29 :
30 : use std::cmp::{max, min, Ordering};
31 : use std::collections::{BinaryHeap, HashMap, HashSet};
32 : use std::ops::{Deref, Range};
33 : use std::pin::pin;
34 : use std::sync::atomic::Ordering as AtomicOrdering;
35 : use std::sync::{Arc, Mutex, RwLock, Weak};
36 : use std::time::{Duration, Instant, SystemTime};
37 :
38 : use crate::context::{
39 : AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
40 : };
41 : use crate::deletion_queue::DeletionQueueClient;
42 : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
43 : use crate::tenant::storage_layer::delta_layer::DeltaEntry;
44 : use crate::tenant::storage_layer::{
45 : DeltaLayerWriter, ImageLayerWriter, InMemoryLayer, LayerAccessStats, LayerFileName, RemoteLayer,
46 : };
47 : use crate::tenant::tasks::{BackgroundLoopKind, RateLimitError};
48 : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
49 : use crate::tenant::{
50 : layer_map::{LayerMap, SearchResult},
51 : metadata::{save_metadata, TimelineMetadata},
52 : par_fsync,
53 : storage_layer::{PersistentLayer, ValueReconstructResult, ValueReconstructState},
54 : };
55 :
56 : use crate::config::PageServerConf;
57 : use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
58 : use crate::metrics::{
59 : TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
60 : UNEXPECTED_ONDEMAND_DOWNLOADS,
61 : };
62 : use crate::pgdatadir_mapping::LsnForTimestamp;
63 : use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
64 : use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
65 : use crate::tenant::config::{EvictionPolicy, TenantConfOpt};
66 : use pageserver_api::reltag::RelTag;
67 :
68 : use postgres_connection::PgConnectionConfig;
69 : use postgres_ffi::to_pg_timestamp;
70 : use utils::{
71 : completion,
72 : generation::Generation,
73 : id::{TenantId, TimelineId},
74 : lsn::{AtomicLsn, Lsn, RecordLsn},
75 : seqwait::SeqWait,
76 : simple_rcu::{Rcu, RcuReadGuard},
77 : };
78 :
79 : use crate::page_cache;
80 : use crate::repository::GcResult;
81 : use crate::repository::{Key, Value};
82 : use crate::task_mgr;
83 : use crate::task_mgr::TaskKind;
84 : use crate::ZERO_PAGE;
85 :
86 : use self::delete::DeleteTimelineFlow;
87 : pub(super) use self::eviction_task::EvictionTaskTenantState;
88 : use self::eviction_task::EvictionTaskTimelineState;
89 : use self::layer_manager::LayerManager;
90 : use self::logical_size::LogicalSize;
91 : use self::walreceiver::{WalReceiver, WalReceiverConf};
92 :
93 : use super::config::TenantConf;
94 : use super::remote_timeline_client::index::IndexPart;
95 : use super::remote_timeline_client::RemoteTimelineClient;
96 : use super::storage_layer::{
97 : AsLayerDesc, DeltaLayer, ImageLayer, LayerAccessStatsReset, PersistentLayerDesc,
98 : };
99 : use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
100 :
101 CBC 59 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
102 : pub(super) enum FlushLoopState {
103 : NotStarted,
104 : Running {
105 : #[cfg(test)]
106 : expect_initdb_optimization: bool,
107 : #[cfg(test)]
108 : initdb_optimization_count: usize,
109 : },
110 : Exited,
111 : }
112 :
113 : /// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
114 UBC 0 : #[derive(Debug, Clone, PartialEq, Eq)]
115 : pub struct Hole {
116 : key_range: Range<Key>,
117 : coverage_size: usize,
118 : }
119 :
120 : impl Ord for Hole {
121 CBC 31 : fn cmp(&self, other: &Self) -> Ordering {
122 31 : other.coverage_size.cmp(&self.coverage_size) // inverse order
123 31 : }
124 : }
125 :
126 : impl PartialOrd for Hole {
127 31 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
128 31 : Some(self.cmp(other))
129 31 : }
130 : }
131 :
132 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
133 : /// Can be removed after all refactors are done.
134 306 : fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
135 306 : drop(rlock)
136 306 : }
137 :
138 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
139 : /// Can be removed after all refactors are done.
140 3418 : fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
141 3418 : drop(rlock)
142 3418 : }
143 :
144 : /// The outward-facing resources required to build a Timeline
145 : pub struct TimelineResources {
146 : pub remote_client: Option<RemoteTimelineClient>,
147 : pub deletion_queue_client: DeletionQueueClient,
148 : }
149 :
150 : pub struct Timeline {
151 : conf: &'static PageServerConf,
152 : tenant_conf: Arc<RwLock<AttachedTenantConf>>,
153 :
154 : myself: Weak<Self>,
155 :
156 : pub tenant_id: TenantId,
157 : pub timeline_id: TimelineId,
158 :
159 : /// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
160 : /// Never changes for the lifetime of this [`Timeline`] object.
161 : ///
162 : /// This duplicates the generation stored in LocationConf, but that structure is mutable:
163 : /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
164 : generation: Generation,
165 :
166 : pub pg_version: u32,
167 :
168 : /// The tuple has two elements.
169 : /// 1. `LayerFileManager` keeps track of the various physical representations of the layer files (inmem, local, remote).
170 : /// 2. `LayerMap`, the acceleration data structure for `get_reconstruct_data`.
171 : ///
172 : /// `LayerMap` maps out the `(PAGE,LSN) / (KEY,LSN)` space, which is composed of `(KeyRange, LsnRange)` rectangles.
173 : /// We describe these rectangles through the `PersistentLayerDesc` struct.
174 : ///
175 : /// When we want to reconstruct a page, we first find the `PersistentLayerDesc`'s that we need for page reconstruction,
176 : /// using `LayerMap`. Then, we use `LayerFileManager` to get the `PersistentLayer`'s that correspond to the
177 : /// `PersistentLayerDesc`'s.
178 : ///
179 : /// Hence, it's important to keep things coherent. The `LayerFileManager` must always have an entry for all
180 : /// `PersistentLayerDesc`'s in the `LayerMap`. If it doesn't, `LayerFileManager::get_from_desc` will panic at
181 : /// runtime, e.g., during page reconstruction.
182 : ///
183 : /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
184 : /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
185 : pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
186 :
187 : /// Set of key ranges which should be covered by image layers to
188 : /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
189 : /// It is used by compaction task when it checks if new image layer should be created.
190 : /// Newly created image layer doesn't help to remove the delta layer, until the
191 : /// newly created image layer falls off the PITR horizon. So on next GC cycle,
192 : /// gc_timeline may still want the new image layer to be created. To avoid redundant
193 : /// image layers creation we should check if image layer exists but beyond PITR horizon.
194 : /// This is why we need remember GC cutoff LSN.
195 : ///
196 : wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
197 :
198 : last_freeze_at: AtomicLsn,
199 : // Atomic would be more appropriate here.
200 : last_freeze_ts: RwLock<Instant>,
201 :
202 : // WAL redo manager
203 : walredo_mgr: Arc<super::WalRedoManager>,
204 :
205 : /// Remote storage client.
206 : /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
207 : pub remote_client: Option<Arc<RemoteTimelineClient>>,
208 :
209 : // What page versions do we hold in the repository? If we get a
210 : // request > last_record_lsn, we need to wait until we receive all
211 : // the WAL up to the request. The SeqWait provides functions for
212 : // that. TODO: If we get a request for an old LSN, such that the
213 : // versions have already been garbage collected away, we should
214 : // throw an error, but we don't track that currently.
215 : //
216 : // last_record_lsn.load().last points to the end of last processed WAL record.
217 : //
218 : // We also remember the starting point of the previous record in
219 : // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
220 : // first WAL record when the node is started up. But here, we just
221 : // keep track of it.
222 : last_record_lsn: SeqWait<RecordLsn, Lsn>,
223 :
224 : // All WAL records have been processed and stored durably on files on
225 : // local disk, up to this LSN. On crash and restart, we need to re-process
226 : // the WAL starting from this point.
227 : //
228 : // Some later WAL records might have been processed and also flushed to disk
229 : // already, so don't be surprised to see some, but there's no guarantee on
230 : // them yet.
231 : disk_consistent_lsn: AtomicLsn,
232 :
233 : // Parent timeline that this timeline was branched from, and the LSN
234 : // of the branch point.
235 : ancestor_timeline: Option<Arc<Timeline>>,
236 : ancestor_lsn: Lsn,
237 :
238 : pub(super) metrics: TimelineMetrics,
239 :
240 : /// Ensures layers aren't frozen by checkpointer between
241 : /// [`Timeline::get_layer_for_write`] and layer reads.
242 : /// Locked automatically by [`TimelineWriter`] and checkpointer.
243 : /// Must always be acquired before the layer map/individual layer lock
244 : /// to avoid deadlock.
245 : write_lock: tokio::sync::Mutex<()>,
246 :
247 : /// Used to avoid multiple `flush_loop` tasks running
248 : pub(super) flush_loop_state: Mutex<FlushLoopState>,
249 :
250 : /// layer_flush_start_tx can be used to wake up the layer-flushing task.
251 : /// The value is a counter, incremented every time a new flush cycle is requested.
252 : /// The flush cycle counter is sent back on the layer_flush_done channel when
253 : /// the flush finishes. You can use that to wait for the flush to finish.
254 : layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
255 : /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
256 : layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>,
257 :
258 : /// Layer removal lock.
259 : /// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
260 : /// This lock is acquired in [`Timeline::gc`] and [`Timeline::compact`].
261 : /// This is an `Arc<Mutex>` lock because we need an owned
262 : /// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
263 : /// Note that [`DeleteTimelineFlow`] uses `delete_progress` field.
264 : pub(super) layer_removal_cs: Arc<tokio::sync::Mutex<()>>,
265 :
266 : // Needed to ensure that we can't create a branch at a point that was already garbage collected
267 : pub latest_gc_cutoff_lsn: Rcu<Lsn>,
268 :
269 : // List of child timelines and their branch points. This is needed to avoid
270 : // garbage collecting data that is still needed by the child timelines.
271 : pub gc_info: std::sync::RwLock<GcInfo>,
272 :
273 : // It may change across major versions so for simplicity
274 : // keep it after running initdb for a timeline.
275 : // It is needed in checks when we want to error on some operations
276 : // when they are requested for pre-initdb lsn.
277 : // It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn",
278 : // though let's keep them both for better error visibility.
279 : pub initdb_lsn: Lsn,
280 :
281 : /// When did we last calculate the partitioning?
282 : partitioning: Mutex<(KeyPartitioning, Lsn)>,
283 :
284 : /// Configuration: how often should the partitioning be recalculated.
285 : repartition_threshold: u64,
286 :
287 : /// Current logical size of the "datadir", at the last LSN.
288 : current_logical_size: LogicalSize,
289 :
290 : /// Information about the last processed message by the WAL receiver,
291 : /// or None if WAL receiver has not received anything for this timeline
292 : /// yet.
293 : pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
294 : pub walreceiver: Mutex<Option<WalReceiver>>,
295 :
296 : /// Relation size cache
297 : pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
298 :
299 : download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
300 :
301 : state: watch::Sender<TimelineState>,
302 :
303 : /// Prevent two tasks from deleting the timeline at the same time. If held, the
304 : /// timeline is being deleted. If 'true', the timeline has already been deleted.
305 : pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
306 :
307 : eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
308 :
309 : /// Barrier to wait before doing initial logical size calculation. Used only during startup.
310 : initial_logical_size_can_start: Option<completion::Barrier>,
311 :
312 : /// Completion shared between all timelines loaded during startup; used to delay heavier
313 : /// background tasks until some logical sizes have been calculated.
314 : initial_logical_size_attempt: Mutex<Option<completion::Completion>>,
315 :
316 : /// Load or creation time information about the disk_consistent_lsn and when the loading
317 : /// happened. Used for consumption metrics.
318 : pub(crate) loaded_at: (Lsn, SystemTime),
319 : }
320 :
321 : pub struct WalReceiverInfo {
322 : pub wal_source_connconf: PgConnectionConfig,
323 : pub last_received_msg_lsn: Lsn,
324 : pub last_received_msg_ts: u128,
325 : }
326 :
327 : ///
328 : /// Information about how much history needs to be retained, needed by
329 : /// Garbage Collection.
330 : ///
331 : pub struct GcInfo {
332 : /// Specific LSNs that are needed.
333 : ///
334 : /// Currently, this includes all points where child branches have
335 : /// been forked off from. In the future, could also include
336 : /// explicit user-defined snapshot points.
337 : pub retain_lsns: Vec<Lsn>,
338 :
339 : /// In addition to 'retain_lsns', keep everything newer than this
340 : /// point.
341 : ///
342 : /// This is calculated by subtracting 'gc_horizon' setting from
343 : /// last-record LSN
344 : ///
345 : /// FIXME: is this inclusive or exclusive?
346 : pub horizon_cutoff: Lsn,
347 :
348 : /// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this
349 : /// point.
350 : ///
351 : /// This is calculated by finding a number such that a record is needed for PITR
352 : /// if only if its LSN is larger than 'pitr_cutoff'.
353 : pub pitr_cutoff: Lsn,
354 : }
355 :
356 : /// An error happened in a get() operation.
357 26 : #[derive(thiserror::Error)]
358 : pub enum PageReconstructError {
359 : #[error(transparent)]
360 : Other(#[from] anyhow::Error),
361 :
362 : /// The operation would require downloading a layer that is missing locally.
363 : NeedsDownload(TenantTimelineId, LayerFileName),
364 :
365 : /// The operation was cancelled
366 : Cancelled,
367 :
368 : /// The ancestor of this is being stopped
369 : AncestorStopping(TimelineId),
370 :
371 : /// An error happened replaying WAL records
372 : #[error(transparent)]
373 : WalRedo(anyhow::Error),
374 : }
375 :
376 : impl std::fmt::Debug for PageReconstructError {
377 UBC 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
378 0 : match self {
379 0 : Self::Other(err) => err.fmt(f),
380 0 : Self::NeedsDownload(tenant_timeline_id, layer_file_name) => {
381 0 : write!(
382 0 : f,
383 0 : "layer {}/{} needs download",
384 0 : tenant_timeline_id,
385 0 : layer_file_name.file_name()
386 0 : )
387 : }
388 0 : Self::Cancelled => write!(f, "cancelled"),
389 0 : Self::AncestorStopping(timeline_id) => {
390 0 : write!(f, "ancestor timeline {timeline_id} is being stopped")
391 : }
392 0 : Self::WalRedo(err) => err.fmt(f),
393 : }
394 0 : }
395 : }
396 :
397 : impl std::fmt::Display for PageReconstructError {
398 CBC 20 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
399 20 : match self {
400 20 : Self::Other(err) => err.fmt(f),
401 UBC 0 : Self::NeedsDownload(tenant_timeline_id, layer_file_name) => {
402 0 : write!(
403 0 : f,
404 0 : "layer {}/{} needs download",
405 0 : tenant_timeline_id,
406 0 : layer_file_name.file_name()
407 0 : )
408 : }
409 0 : Self::Cancelled => write!(f, "cancelled"),
410 0 : Self::AncestorStopping(timeline_id) => {
411 0 : write!(f, "ancestor timeline {timeline_id} is being stopped")
412 : }
413 0 : Self::WalRedo(err) => err.fmt(f),
414 : }
415 CBC 20 : }
416 : }
417 :
418 UBC 0 : #[derive(Clone, Copy)]
419 : pub enum LogicalSizeCalculationCause {
420 : Initial,
421 : ConsumptionMetricsSyntheticSize,
422 : EvictionTaskImitation,
423 : TenantSizeHandler,
424 : }
425 :
426 : /// Public interface functions
427 : impl Timeline {
428 : /// Get the LSN where this branch was created
429 CBC 3151 : pub fn get_ancestor_lsn(&self) -> Lsn {
430 3151 : self.ancestor_lsn
431 3151 : }
432 :
433 : /// Get the ancestor's timeline id
434 4946 : pub fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
435 4946 : self.ancestor_timeline
436 4946 : .as_ref()
437 4946 : .map(|ancestor| ancestor.timeline_id)
438 4946 : }
439 :
440 : /// Lock and get timeline's GC cuttof
441 3783797 : pub fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
442 3783797 : self.latest_gc_cutoff_lsn.read()
443 3783797 : }
444 :
445 : /// Look up given page version.
446 : ///
447 : /// If a remote layer file is needed, it is downloaded as part of this
448 : /// call.
449 : ///
450 : /// NOTE: It is considered an error to 'get' a key that doesn't exist. The
451 : /// abstraction above this needs to store suitable metadata to track what
452 : /// data exists with what keys, in separate metadata entries. If a
453 : /// non-existent key is requested, we may incorrectly return a value from
454 : /// an ancestor branch, for example, or waste a lot of cycles chasing the
455 : /// non-existing key.
456 : ///
457 6372322 : pub async fn get(
458 6372322 : &self,
459 6372322 : key: Key,
460 6372322 : lsn: Lsn,
461 6372322 : ctx: &RequestContext,
462 6372323 : ) -> Result<Bytes, PageReconstructError> {
463 6372323 : if !lsn.is_valid() {
464 GBC 1 : return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
465 CBC 6372322 : }
466 :
467 : // XXX: structured stats collection for layer eviction here.
468 UBC 0 : trace!(
469 0 : "get page request for {}@{} from task kind {:?}",
470 0 : key,
471 0 : lsn,
472 0 : ctx.task_kind()
473 0 : );
474 :
475 : // Check the page cache. We will get back the most recent page with lsn <= `lsn`.
476 : // The cached image can be returned directly if there is no WAL between the cached image
477 : // and requested LSN. The cached image can also be used to reduce the amount of WAL needed
478 : // for redo.
479 CBC 6372322 : let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
480 1297801 : Some((cached_lsn, cached_img)) => {
481 1297801 : match cached_lsn.cmp(&lsn) {
482 1297785 : Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
483 : Ordering::Equal => {
484 16 : MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
485 16 : return Ok(cached_img); // exact LSN match, return the image
486 : }
487 : Ordering::Greater => {
488 UBC 0 : unreachable!("the returned lsn should never be after the requested lsn")
489 : }
490 : }
491 CBC 1297785 : Some((cached_lsn, cached_img))
492 : }
493 5074521 : None => None,
494 : };
495 :
496 6372306 : let mut reconstruct_state = ValueReconstructState {
497 6372306 : records: Vec::new(),
498 6372306 : img: cached_page_img,
499 6372306 : };
500 6372306 :
501 6372306 : let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
502 6372306 : let path = self
503 6372306 : .get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
504 6964680 : .await?;
505 6372249 : timer.stop_and_record();
506 6372249 :
507 6372249 : let start = Instant::now();
508 6372249 : let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
509 6372247 : let elapsed = start.elapsed();
510 6372247 : crate::metrics::RECONSTRUCT_TIME
511 6372247 : .for_result(&res)
512 6372247 : .observe(elapsed.as_secs_f64());
513 :
514 6372247 : if cfg!(feature = "testing") && res.is_err() {
515 : // it can only be walredo issue
516 : use std::fmt::Write;
517 :
518 UBC 0 : let mut msg = String::new();
519 0 :
520 0 : path.into_iter().for_each(|(res, cont_lsn, layer)| {
521 0 : writeln!(
522 0 : msg,
523 0 : "- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
524 0 : layer(),
525 0 : )
526 0 : .expect("string grows")
527 0 : });
528 :
529 : // this is to rule out or provide evidence that we could in some cases read a duplicate
530 : // walrecord
531 0 : tracing::info!("walredo failed, path:\n{msg}");
532 CBC 6372247 : }
533 :
534 6372247 : res
535 6372286 : }
536 :
537 : /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
538 82259045 : pub fn get_last_record_lsn(&self) -> Lsn {
539 82259045 : self.last_record_lsn.load().last
540 82259045 : }
541 :
542 2671 : pub fn get_prev_record_lsn(&self) -> Lsn {
543 2671 : self.last_record_lsn.load().prev
544 2671 : }
545 :
546 : /// Atomically get both last and prev.
547 999 : pub fn get_last_record_rlsn(&self) -> RecordLsn {
548 999 : self.last_record_lsn.load()
549 999 : }
550 :
551 781547 : pub fn get_disk_consistent_lsn(&self) -> Lsn {
552 781547 : self.disk_consistent_lsn.load()
553 781547 : }
554 :
555 : /// remote_consistent_lsn from the perspective of the tenant's current generation,
556 : /// not validated with control plane yet.
557 : /// See [`Self::get_remote_consistent_lsn_visible`].
558 : pub fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
559 2671 : if let Some(remote_client) = &self.remote_client {
560 2671 : remote_client.remote_consistent_lsn_projected()
561 : } else {
562 UBC 0 : None
563 : }
564 CBC 2671 : }
565 :
566 : /// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
567 : /// i.e. a value of remote_consistent_lsn_projected which has undergone
568 : /// generation validation in the deletion queue.
569 : pub fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
570 779573 : if let Some(remote_client) = &self.remote_client {
571 779573 : remote_client.remote_consistent_lsn_visible()
572 : } else {
573 UBC 0 : None
574 : }
575 CBC 779573 : }
576 :
577 : /// The sum of the file size of all historic layers in the layer map.
578 : /// This method makes no distinction between local and remote layers.
579 : /// Hence, the result **does not represent local filesystem usage**.
580 3137 : pub async fn layer_size_sum(&self) -> u64 {
581 3137 : let guard = self.layers.read().await;
582 3137 : let layer_map = guard.layer_map();
583 3137 : let mut size = 0;
584 94649 : for l in layer_map.iter_historic_layers() {
585 94649 : size += l.file_size();
586 94649 : }
587 3137 : size
588 3137 : }
589 :
590 12 : pub fn resident_physical_size(&self) -> u64 {
591 12 : self.metrics.resident_physical_size_get()
592 12 : }
593 :
594 : ///
595 : /// Wait until WAL has been received and processed up to this LSN.
596 : ///
597 : /// You should call this before any of the other get_* or list_* functions. Calling
598 : /// those functions with an LSN that has been processed yet is an error.
599 : ///
600 1278620 : pub async fn wait_lsn(
601 1278620 : &self,
602 1278620 : lsn: Lsn,
603 1278620 : _ctx: &RequestContext, /* Prepare for use by cancellation */
604 1278620 : ) -> anyhow::Result<()> {
605 1278620 : anyhow::ensure!(self.is_active(), "Cannot wait for Lsn on inactive timeline");
606 :
607 : // This should never be called from the WAL receiver, because that could lead
608 : // to a deadlock.
609 1278620 : anyhow::ensure!(
610 1278620 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
611 UBC 0 : "wait_lsn cannot be called in WAL receiver"
612 : );
613 CBC 1278620 : anyhow::ensure!(
614 1278620 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
615 UBC 0 : "wait_lsn cannot be called in WAL receiver"
616 : );
617 CBC 1278620 : anyhow::ensure!(
618 1278620 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
619 UBC 0 : "wait_lsn cannot be called in WAL receiver"
620 : );
621 :
622 CBC 1278620 : let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
623 1278620 :
624 1278620 : match self
625 1278620 : .last_record_lsn
626 1278620 : .wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
627 145151 : .await
628 : {
629 1278615 : Ok(()) => Ok(()),
630 4 : Err(e) => {
631 4 : // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
632 4 : drop(_timer);
633 4 : let walreceiver_status = self.walreceiver_status();
634 4 : Err(anyhow::Error::new(e).context({
635 4 : format!(
636 4 : "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
637 4 : lsn,
638 4 : self.get_last_record_lsn(),
639 4 : self.get_disk_consistent_lsn(),
640 4 : walreceiver_status,
641 4 : )
642 4 : }))
643 : }
644 : }
645 1278619 : }
646 :
647 2675 : pub(crate) fn walreceiver_status(&self) -> String {
648 2675 : match &*self.walreceiver.lock().unwrap() {
649 162 : None => "stopping or stopped".to_string(),
650 2513 : Some(walreceiver) => match walreceiver.status() {
651 1316 : Some(status) => status.to_human_readable_string(),
652 1197 : None => "Not active".to_string(),
653 : },
654 : }
655 2675 : }
656 :
657 : /// Check that it is valid to request operations with that lsn.
658 591 : pub fn check_lsn_is_in_scope(
659 591 : &self,
660 591 : lsn: Lsn,
661 591 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
662 591 : ) -> anyhow::Result<()> {
663 591 : ensure!(
664 591 : lsn >= **latest_gc_cutoff_lsn,
665 9 : "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
666 9 : lsn,
667 9 : **latest_gc_cutoff_lsn,
668 : );
669 582 : Ok(())
670 591 : }
671 :
672 : /// Flush to disk all data that was written with the put_* functions
673 6368 : #[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
674 : pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
675 : self.freeze_inmem_layer(false).await;
676 : self.flush_frozen_layers_and_wait().await
677 : }
678 :
679 : /// Outermost timeline compaction operation; downloads needed layers.
680 1262 : pub async fn compact(
681 1262 : self: &Arc<Self>,
682 1262 : cancel: &CancellationToken,
683 1262 : ctx: &RequestContext,
684 1262 : ) -> anyhow::Result<()> {
685 : const ROUNDS: usize = 2;
686 :
687 : // this wait probably never needs any "long time spent" logging, because we already nag if
688 : // compaction task goes over it's period (20s) which is quite often in production.
689 1262 : let _permit = match super::tasks::concurrent_background_tasks_rate_limit(
690 1262 : BackgroundLoopKind::Compaction,
691 1262 : ctx,
692 1262 : cancel,
693 1262 : )
694 GBC 1 : .await
695 : {
696 CBC 1262 : Ok(permit) => permit,
697 UBC 0 : Err(RateLimitError::Cancelled) => return Ok(()),
698 : };
699 :
700 CBC 1262 : let last_record_lsn = self.get_last_record_lsn();
701 1262 :
702 1262 : // Last record Lsn could be zero in case the timeline was just created
703 1262 : if !last_record_lsn.is_valid() {
704 UBC 0 : warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
705 0 : return Ok(());
706 CBC 1262 : }
707 :
708 : // retry two times to allow first round to find layers which need to be downloaded, then
709 : // download them, then retry compaction
710 1263 : for round in 0..ROUNDS {
711 : // should we error out with the most specific error?
712 1263 : let last_round = round == ROUNDS - 1;
713 :
714 1498331 : let res = self.compact_inner(ctx).await;
715 :
716 : // If `create_image_layers' or `compact_level0` scheduled any
717 : // uploads or deletions, but didn't update the index file yet,
718 : // do it now.
719 : //
720 : // This isn't necessary for correctness, the remote state is
721 : // consistent without the uploads and deletions, and we would
722 : // update the index file on next flush iteration too. But it
723 : // could take a while until that happens.
724 : //
725 : // Additionally, only do this once before we return from this function.
726 1254 : if last_round || res.is_ok() {
727 1252 : if let Some(remote_client) = &self.remote_client {
728 1252 : remote_client.schedule_index_upload_for_file_changes()?;
729 UBC 0 : }
730 CBC 2 : }
731 :
732 1 : let rls = match res {
733 1252 : Ok(()) => return Ok(()),
734 1 : Err(CompactionError::DownloadRequired(rls)) if !last_round => {
735 1 : // this can be done at most one time before exiting, waiting
736 1 : rls
737 : }
738 UBC 0 : Err(CompactionError::DownloadRequired(rls)) => {
739 0 : anyhow::bail!("Compaction requires downloading multiple times (last was {} layers), possibly battling against eviction", rls.len())
740 : }
741 : Err(CompactionError::ShuttingDown) => {
742 0 : return Ok(());
743 : }
744 CBC 1 : Err(CompactionError::Other(e)) => {
745 1 : return Err(e);
746 : }
747 : };
748 :
749 : // this path can be visited in the second round of retrying, if first one found that we
750 : // must first download some remote layers
751 1 : let total = rls.len();
752 1 :
753 1 : let mut downloads = rls
754 1 : .into_iter()
755 3 : .map(|rl| self.download_remote_layer(rl))
756 1 : .collect::<futures::stream::FuturesUnordered<_>>();
757 1 :
758 1 : let mut failed = 0;
759 :
760 : loop {
761 4 : tokio::select! {
762 : _ = cancel.cancelled() => anyhow::bail!("Cancelled while downloading remote layers"),
763 4 : res = downloads.next() => {
764 : match res {
765 : Some(Ok(())) => {},
766 : Some(Err(e)) => {
767 UBC 0 : warn!("Downloading remote layer for compaction failed: {e:#}");
768 : failed += 1;
769 : }
770 : None => break,
771 : }
772 : }
773 : }
774 : }
775 :
776 CBC 1 : if failed != 0 {
777 UBC 0 : anyhow::bail!("{failed} out of {total} layers failed to download, retrying later");
778 CBC 1 : }
779 :
780 : // if everything downloaded fine, lets try again
781 : }
782 :
783 UBC 0 : unreachable!("retry loop exits")
784 CBC 1253 : }
785 :
786 : /// Compaction which might need to be retried after downloading remote layers.
787 1263 : async fn compact_inner(self: &Arc<Self>, ctx: &RequestContext) -> Result<(), CompactionError> {
788 : //
789 : // High level strategy for compaction / image creation:
790 : //
791 : // 1. First, calculate the desired "partitioning" of the
792 : // currently in-use key space. The goal is to partition the
793 : // key space into roughly fixed-size chunks, but also take into
794 : // account any existing image layers, and try to align the
795 : // chunk boundaries with the existing image layers to avoid
796 : // too much churn. Also try to align chunk boundaries with
797 : // relation boundaries. In principle, we don't know about
798 : // relation boundaries here, we just deal with key-value
799 : // pairs, and the code in pgdatadir_mapping.rs knows how to
800 : // map relations into key-value pairs. But in practice we know
801 : // that 'field6' is the block number, and the fields 1-5
802 : // identify a relation. This is just an optimization,
803 : // though.
804 : //
805 : // 2. Once we know the partitioning, for each partition,
806 : // decide if it's time to create a new image layer. The
807 : // criteria is: there has been too much "churn" since the last
808 : // image layer? The "churn" is fuzzy concept, it's a
809 : // combination of too many delta files, or too much WAL in
810 : // total in the delta file. Or perhaps: if creating an image
811 : // file would allow to delete some older files.
812 : //
813 : // 3. After that, we compact all level0 delta files if there
814 : // are too many of them. While compacting, we also garbage
815 : // collect any page versions that are no longer needed because
816 : // of the new image layers we created in step 2.
817 : //
818 : // TODO: This high level strategy hasn't been implemented yet.
819 : // Below are functions compact_level0() and create_image_layers()
820 : // but they are a bit ad hoc and don't quite work like it's explained
821 : // above. Rewrite it.
822 1263 : let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
823 : // Is the timeline being deleted?
824 1263 : if self.is_stopping() {
825 UBC 0 : trace!("Dropping out of compaction on timeline shutdown");
826 0 : return Err(CompactionError::ShuttingDown);
827 CBC 1263 : }
828 1263 :
829 1263 : let target_file_size = self.get_checkpoint_distance();
830 1263 :
831 1263 : // Define partitioning schema if needed
832 1263 :
833 1263 : match self
834 1263 : .repartition(
835 1263 : self.get_last_record_lsn(),
836 1263 : self.get_compaction_target_size(),
837 1263 : ctx,
838 1263 : )
839 279450 : .await
840 : {
841 1262 : Ok((partitioning, lsn)) => {
842 1262 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
843 1262 : let image_ctx = RequestContextBuilder::extend(ctx)
844 1262 : .access_stats_behavior(AccessStatsBehavior::Skip)
845 1262 : .build();
846 :
847 : // 2. Create new image layers for partitions that have been modified
848 : // "enough".
849 1262 : let layer_paths_to_upload = self
850 1262 : .create_image_layers(&partitioning, lsn, false, &image_ctx)
851 741358 : .await
852 1257 : .map_err(anyhow::Error::from)?;
853 1257 : if let Some(remote_client) = &self.remote_client {
854 4620 : for (path, layer_metadata) in layer_paths_to_upload {
855 3363 : remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
856 : }
857 UBC 0 : }
858 :
859 : // 3. Compact
860 CBC 1257 : let timer = self.metrics.compact_time_histo.start_timer();
861 1257 : self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx)
862 477519 : .await?;
863 1252 : timer.stop_and_record();
864 : }
865 UBC 0 : Err(err) => {
866 : // no partitioning? This is normal, if the timeline was just created
867 : // as an empty timeline. Also in unit tests, when we use the timeline
868 : // as a simple key-value store, ignoring the datadir layout. Log the
869 : // error but continue.
870 0 : error!("could not compact, repartitioning keyspace failed: {err:?}");
871 : }
872 : };
873 :
874 CBC 1252 : Ok(())
875 1254 : }
876 :
877 : /// Mutate the timeline with a [`TimelineWriter`].
878 69330484 : pub async fn writer(&self) -> TimelineWriter<'_> {
879 : TimelineWriter {
880 69330519 : tl: self,
881 69330519 : _write_guard: self.write_lock.lock().await,
882 : }
883 69330519 : }
884 :
885 : /// Retrieve current logical size of the timeline.
886 : ///
887 : /// The size could be lagging behind the actual number, in case
888 : /// the initial size calculation has not been run (gets triggered on the first size access).
889 : ///
890 : /// return size and boolean flag that shows if the size is exact
891 779585 : pub fn get_current_logical_size(
892 779585 : self: &Arc<Self>,
893 779585 : ctx: &RequestContext,
894 779585 : ) -> anyhow::Result<(u64, bool)> {
895 779585 : let current_size = self.current_logical_size.current_size()?;
896 779585 : debug!("Current size: {current_size:?}");
897 :
898 779585 : let mut is_exact = true;
899 779585 : let size = current_size.size();
900 717 : if let (CurrentLogicalSize::Approximate(_), Some(initial_part_end)) =
901 779585 : (current_size, self.current_logical_size.initial_part_end)
902 717 : {
903 717 : is_exact = false;
904 717 : self.try_spawn_size_init_task(initial_part_end, ctx);
905 778868 : }
906 :
907 779585 : Ok((size, is_exact))
908 779585 : }
909 :
910 : /// Check if more than 'checkpoint_distance' of WAL has been accumulated in
911 : /// the in-memory layer, and initiate flushing it if so.
912 : ///
913 : /// Also flush after a period of time without new data -- it helps
914 : /// safekeepers to regard pageserver as caught up and suspend activity.
915 776902 : pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
916 776902 : let last_lsn = self.get_last_record_lsn();
917 776346 : let open_layer_size = {
918 776902 : let guard = self.layers.read().await;
919 776902 : let layers = guard.layer_map();
920 776902 : let Some(open_layer) = layers.open_layer.as_ref() else {
921 556 : return Ok(());
922 : };
923 776346 : open_layer.size().await?
924 : };
925 776346 : let last_freeze_at = self.last_freeze_at.load();
926 776346 : let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
927 776346 : let distance = last_lsn.widening_sub(last_freeze_at);
928 776346 : // Checkpointing the open layer can be triggered by layer size or LSN range.
929 776346 : // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
930 776346 : // we want to stay below that with a big margin. The LSN distance determines how
931 776346 : // much WAL the safekeepers need to store.
932 776346 : if distance >= self.get_checkpoint_distance().into()
933 774724 : || open_layer_size > self.get_checkpoint_distance()
934 772363 : || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
935 : {
936 3983 : info!(
937 3983 : "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
938 3983 : distance,
939 3983 : open_layer_size,
940 3983 : last_freeze_ts.elapsed()
941 3983 : );
942 :
943 3983 : self.freeze_inmem_layer(true).await;
944 3983 : self.last_freeze_at.store(last_lsn);
945 3983 : *(self.last_freeze_ts.write().unwrap()) = Instant::now();
946 3983 :
947 3983 : // Wake up the layer flusher
948 3983 : self.flush_frozen_layers();
949 772363 : }
950 776346 : Ok(())
951 776902 : }
952 :
953 1103 : pub fn activate(
954 1103 : self: &Arc<Self>,
955 1103 : broker_client: BrokerClientChannel,
956 1103 : background_jobs_can_start: Option<&completion::Barrier>,
957 1103 : ctx: &RequestContext,
958 1103 : ) {
959 1103 : self.launch_wal_receiver(ctx, broker_client);
960 1103 : self.set_state(TimelineState::Active);
961 1103 : self.launch_eviction_task(background_jobs_can_start);
962 1103 : }
963 :
964 864 : #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
965 : pub async fn shutdown(self: &Arc<Self>, freeze_and_flush: bool) {
966 : debug_assert_current_span_has_tenant_and_timeline_id();
967 :
968 : // prevent writes to the InMemoryLayer
969 : task_mgr::shutdown_tasks(
970 : Some(TaskKind::WalReceiverManager),
971 : Some(self.tenant_id),
972 : Some(self.timeline_id),
973 : )
974 : .await;
975 :
976 : // now all writers to InMemory layer are gone, do the final flush if requested
977 : if freeze_and_flush {
978 : match self.freeze_and_flush().await {
979 : Ok(()) => {}
980 : Err(e) => {
981 59 : warn!("failed to freeze and flush: {e:#}");
982 : return; // TODO: should probably drain remote timeline client anyways?
983 : }
984 : }
985 :
986 : // drain the upload queue
987 : let res = if let Some(client) = self.remote_client.as_ref() {
988 : // if we did not wait for completion here, it might be our shutdown process
989 : // didn't wait for remote uploads to complete at all, as new tasks can forever
990 : // be spawned.
991 : //
992 : // what is problematic is the shutting down of RemoteTimelineClient, because
993 : // obviously it does not make sense to stop while we wait for it, but what
994 : // about corner cases like s3 suddenly hanging up?
995 : client.wait_completion().await
996 : } else {
997 : Ok(())
998 : };
999 :
1000 : if let Err(e) = res {
1001 UBC 0 : warn!("failed to await for frozen and flushed uploads: {e:#}");
1002 : }
1003 : }
1004 : }
1005 :
1006 : pub fn set_state(&self, new_state: TimelineState) {
1007 CBC 1853 : match (self.current_state(), new_state) {
1008 1853 : (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
1009 117 : info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
1010 : }
1011 UBC 0 : (st, TimelineState::Loading) => {
1012 0 : error!("ignoring transition from {st:?} into Loading state");
1013 : }
1014 CBC 14 : (TimelineState::Broken { .. }, new_state) => {
1015 14 : error!("Ignoring state update {new_state:?} for broken timeline");
1016 : }
1017 : (TimelineState::Stopping, TimelineState::Active) => {
1018 UBC 0 : error!("Not activating a Stopping timeline");
1019 : }
1020 CBC 1722 : (_, new_state) => {
1021 1246 : if matches!(
1022 1722 : new_state,
1023 : TimelineState::Stopping | TimelineState::Broken { .. }
1024 476 : ) {
1025 476 : // drop the completion guard, if any; it might be holding off the completion
1026 476 : // forever needlessly
1027 476 : self.initial_logical_size_attempt
1028 476 : .lock()
1029 476 : .unwrap_or_else(|e| e.into_inner())
1030 476 : .take();
1031 1246 : }
1032 1722 : self.state.send_replace(new_state);
1033 : }
1034 : }
1035 1853 : }
1036 :
1037 34 : pub fn set_broken(&self, reason: String) {
1038 34 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
1039 34 : let broken_state = TimelineState::Broken {
1040 34 : reason,
1041 34 : backtrace: backtrace_str,
1042 34 : };
1043 34 : self.set_state(broken_state)
1044 34 : }
1045 :
1046 2042347 : pub fn current_state(&self) -> TimelineState {
1047 2042347 : self.state.borrow().clone()
1048 2042347 : }
1049 :
1050 666 : pub fn is_broken(&self) -> bool {
1051 666 : matches!(&*self.state.borrow(), TimelineState::Broken { .. })
1052 666 : }
1053 :
1054 1285867 : pub fn is_active(&self) -> bool {
1055 1285867 : self.current_state() == TimelineState::Active
1056 1285867 : }
1057 :
1058 2259 : pub fn is_stopping(&self) -> bool {
1059 2259 : self.current_state() == TimelineState::Stopping
1060 2259 : }
1061 :
1062 1514 : pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
1063 1514 : self.state.subscribe()
1064 1514 : }
1065 :
1066 1063161 : pub async fn wait_to_become_active(
1067 1063161 : &self,
1068 1063161 : _ctx: &RequestContext, // Prepare for use by cancellation
1069 1063161 : ) -> Result<(), TimelineState> {
1070 1063161 : let mut receiver = self.state.subscribe();
1071 1063167 : loop {
1072 1063167 : let current_state = receiver.borrow().clone();
1073 1063167 : match current_state {
1074 : TimelineState::Loading => {
1075 6 : receiver
1076 6 : .changed()
1077 6 : .await
1078 6 : .expect("holding a reference to self");
1079 : }
1080 : TimelineState::Active { .. } => {
1081 1063159 : return Ok(());
1082 : }
1083 : TimelineState::Broken { .. } | TimelineState::Stopping => {
1084 : // There's no chance the timeline can transition back into ::Active
1085 2 : return Err(current_state);
1086 : }
1087 : }
1088 : }
1089 1063161 : }
1090 :
1091 85 : pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
1092 85 : let guard = self.layers.read().await;
1093 85 : let layer_map = guard.layer_map();
1094 85 : let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
1095 85 : if let Some(open_layer) = &layer_map.open_layer {
1096 21 : in_memory_layers.push(open_layer.info());
1097 64 : }
1098 85 : for frozen_layer in &layer_map.frozen_layers {
1099 UBC 0 : in_memory_layers.push(frozen_layer.info());
1100 0 : }
1101 :
1102 CBC 85 : let mut historic_layers = Vec::new();
1103 1775 : for historic_layer in layer_map.iter_historic_layers() {
1104 1775 : let historic_layer = guard.get_from_desc(&historic_layer);
1105 1775 : historic_layers.push(historic_layer.info(reset));
1106 1775 : }
1107 :
1108 85 : LayerMapInfo {
1109 85 : in_memory_layers,
1110 85 : historic_layers,
1111 85 : }
1112 85 : }
1113 :
1114 24 : #[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))]
1115 : pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
1116 : let Some(layer) = self.find_layer(layer_file_name).await else {
1117 : return Ok(None);
1118 : };
1119 : let Some(remote_layer) = layer.downcast_remote_layer() else {
1120 : return Ok(Some(false));
1121 : };
1122 : if self.remote_client.is_none() {
1123 : return Ok(Some(false));
1124 : }
1125 :
1126 : self.download_remote_layer(remote_layer).await?;
1127 : Ok(Some(true))
1128 : }
1129 :
1130 : /// Like [`evict_layer_batch`](Self::evict_layer_batch), but for just one layer.
1131 : /// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
1132 407 : pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
1133 407 : let Some(local_layer) = self.find_layer(layer_file_name).await else {
1134 UBC 0 : return Ok(None);
1135 : };
1136 CBC 407 : let remote_client = self
1137 407 : .remote_client
1138 407 : .as_ref()
1139 407 : .ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?;
1140 :
1141 407 : let cancel = CancellationToken::new();
1142 407 : let results = self
1143 407 : .evict_layer_batch(remote_client, &[local_layer], cancel)
1144 6 : .await?;
1145 407 : assert_eq!(results.len(), 1);
1146 407 : let result: Option<Result<(), EvictionError>> = results.into_iter().next().unwrap();
1147 407 : match result {
1148 UBC 0 : None => anyhow::bail!("task_mgr shutdown requested"),
1149 CBC 407 : Some(Ok(())) => Ok(Some(true)),
1150 UBC 0 : Some(Err(e)) => Err(anyhow::Error::new(e)),
1151 : }
1152 CBC 407 : }
1153 :
1154 : /// Evict a batch of layers.
1155 : ///
1156 : /// GenericRemoteStorage reference is required as a (witness)[witness_article] for "remote storage is configured."
1157 : ///
1158 : /// [witness_article]: https://willcrichton.net/rust-api-type-patterns/witnesses.html
1159 11 : pub(crate) async fn evict_layers(
1160 11 : &self,
1161 11 : _: &GenericRemoteStorage,
1162 11 : layers_to_evict: &[Arc<dyn PersistentLayer>],
1163 11 : cancel: CancellationToken,
1164 11 : ) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
1165 11 : let remote_client = self.remote_client.clone().expect(
1166 11 : "GenericRemoteStorage is configured, so timeline must have RemoteTimelineClient",
1167 11 : );
1168 11 :
1169 11 : self.evict_layer_batch(&remote_client, layers_to_evict, cancel)
1170 UBC 0 : .await
1171 CBC 11 : }
1172 :
1173 : /// Evict multiple layers at once, continuing through errors.
1174 : ///
1175 : /// Try to evict the given `layers_to_evict` by
1176 : ///
1177 : /// 1. Replacing the given layer object in the layer map with a corresponding [`RemoteLayer`] object.
1178 : /// 2. Deleting the now unreferenced layer file from disk.
1179 : ///
1180 : /// The `remote_client` should be this timeline's `self.remote_client`.
1181 : /// We make the caller provide it so that they are responsible for handling the case
1182 : /// where someone wants to evict the layer but no remote storage is configured.
1183 : ///
1184 : /// Returns either `Err()` or `Ok(results)` where `results.len() == layers_to_evict.len()`.
1185 : /// If `Err()` is returned, no eviction was attempted.
1186 : /// Each position of `Ok(results)` corresponds to the layer in `layers_to_evict`.
1187 : /// Meaning of each `result[i]`:
1188 : /// - `Some(Err(...))` if layer replacement failed for an unexpected reason
1189 : /// - `Some(Ok(true))` if everything went well.
1190 : /// - `Some(Ok(false))` if there was an expected reason why the layer could not be replaced, e.g.:
1191 : /// - evictee was not yet downloaded
1192 : /// - replacement failed for an expectable reason (e.g., layer removed by GC before we grabbed all locks)
1193 : /// - `None` if no eviction attempt was made for the layer because `cancel.is_cancelled() == true`.
1194 448 : async fn evict_layer_batch(
1195 448 : &self,
1196 448 : remote_client: &Arc<RemoteTimelineClient>,
1197 448 : layers_to_evict: &[Arc<dyn PersistentLayer>],
1198 448 : cancel: CancellationToken,
1199 448 : ) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
1200 448 : // ensure that the layers have finished uploading
1201 448 : // (don't hold the layer_removal_cs while we do it, we're not removing anything yet)
1202 448 : remote_client
1203 448 : .wait_completion()
1204 4 : .await
1205 448 : .context("wait for layer upload ops to complete")?;
1206 :
1207 : // now lock out layer removal (compaction, gc, timeline deletion)
1208 448 : let layer_removal_guard = self.layer_removal_cs.lock().await;
1209 :
1210 : {
1211 : // to avoid racing with detach and delete_timeline
1212 448 : let state = self.current_state();
1213 448 : anyhow::ensure!(
1214 448 : state == TimelineState::Active,
1215 UBC 0 : "timeline is not active but {state:?}"
1216 : );
1217 : }
1218 :
1219 : // start the batch update
1220 CBC 448 : let mut guard = self.layers.write().await;
1221 448 : let mut results = Vec::with_capacity(layers_to_evict.len());
1222 :
1223 592 : for l in layers_to_evict.iter() {
1224 592 : let res = if cancel.is_cancelled() {
1225 UBC 0 : None
1226 : } else {
1227 CBC 592 : Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut guard))
1228 : };
1229 592 : results.push(res);
1230 : }
1231 :
1232 : // commit the updates & release locks
1233 448 : drop_wlock(guard);
1234 448 : drop(layer_removal_guard);
1235 448 :
1236 448 : assert_eq!(results.len(), layers_to_evict.len());
1237 448 : Ok(results)
1238 448 : }
1239 :
1240 592 : fn evict_layer_batch_impl(
1241 592 : &self,
1242 592 : _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
1243 592 : local_layer: &Arc<dyn PersistentLayer>,
1244 592 : layer_mgr: &mut LayerManager,
1245 592 : ) -> Result<(), EvictionError> {
1246 592 : if local_layer.is_remote_layer() {
1247 UBC 0 : return Err(EvictionError::CannotEvictRemoteLayer);
1248 CBC 592 : }
1249 592 :
1250 592 : let layer_file_size = local_layer.layer_desc().file_size;
1251 :
1252 592 : let local_layer_mtime = local_layer
1253 592 : .local_path()
1254 592 : .expect("local layer should have a local path")
1255 592 : .metadata()
1256 592 : // when the eviction fails because we have already deleted the layer in compaction for
1257 592 : // example, a NotFound error bubbles up from here.
1258 592 : .map_err(|e| {
1259 1 : if e.kind() == std::io::ErrorKind::NotFound {
1260 1 : EvictionError::FileNotFound
1261 : } else {
1262 UBC 0 : EvictionError::StatFailed(e)
1263 : }
1264 CBC 592 : })?
1265 591 : .modified()
1266 591 : .map_err(EvictionError::StatFailed)?;
1267 :
1268 591 : let local_layer_residence_duration =
1269 591 : match SystemTime::now().duration_since(local_layer_mtime) {
1270 UBC 0 : Err(e) => {
1271 0 : warn!(layer = %local_layer, "layer mtime is in the future: {}", e);
1272 0 : None
1273 : }
1274 CBC 591 : Ok(delta) => Some(delta),
1275 : };
1276 :
1277 : // RemoteTimelineClient holds the metadata on layers' remote generations, so
1278 : // query it to construct a RemoteLayer.
1279 591 : let layer_metadata = self
1280 591 : .remote_client
1281 591 : .as_ref()
1282 591 : .expect("Eviction is not called without remote storage")
1283 591 : .get_layer_metadata(&local_layer.filename())
1284 591 : .map_err(EvictionError::LayerNotFound)?
1285 591 : .ok_or_else(|| {
1286 UBC 0 : EvictionError::LayerNotFound(anyhow::anyhow!("Layer not in remote metadata"))
1287 CBC 591 : })?;
1288 591 : if layer_metadata.file_size() != layer_file_size {
1289 UBC 0 : return Err(EvictionError::MetadataInconsistency(format!(
1290 0 : "Layer size {layer_file_size} doesn't match remote metadata file size {}",
1291 0 : layer_metadata.file_size()
1292 0 : )));
1293 CBC 591 : }
1294 :
1295 591 : let new_remote_layer = Arc::new(match local_layer.filename() {
1296 381 : LayerFileName::Image(image_name) => RemoteLayer::new_img(
1297 381 : self.tenant_id,
1298 381 : self.timeline_id,
1299 381 : &image_name,
1300 381 : &layer_metadata,
1301 381 : local_layer
1302 381 : .access_stats()
1303 381 : .clone_for_residence_change(LayerResidenceStatus::Evicted),
1304 381 : ),
1305 210 : LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
1306 210 : self.tenant_id,
1307 210 : self.timeline_id,
1308 210 : &delta_name,
1309 210 : &layer_metadata,
1310 210 : local_layer
1311 210 : .access_stats()
1312 210 : .clone_for_residence_change(LayerResidenceStatus::Evicted),
1313 210 : ),
1314 : });
1315 :
1316 591 : assert_eq!(local_layer.layer_desc(), new_remote_layer.layer_desc());
1317 :
1318 591 : layer_mgr
1319 591 : .replace_and_verify(local_layer.clone(), new_remote_layer)
1320 591 : .map_err(EvictionError::LayerNotFound)?;
1321 :
1322 590 : if let Err(e) = local_layer.delete_resident_layer_file() {
1323 : // this should never happen, because of layer_removal_cs usage and above stat
1324 : // access for mtime
1325 UBC 0 : error!("failed to remove layer file on evict after replacement: {e:#?}");
1326 CBC 590 : }
1327 : // Always decrement the physical size gauge, even if we failed to delete the file.
1328 : // Rationale: we already replaced the layer with a remote layer in the layer map,
1329 : // and any subsequent download_remote_layer will
1330 : // 1. overwrite the file on disk and
1331 : // 2. add the downloaded size to the resident size gauge.
1332 : //
1333 : // If there is no re-download, and we restart the pageserver, then load_layer_map
1334 : // will treat the file as a local layer again, count it towards resident size,
1335 : // and it'll be like the layer removal never happened.
1336 : // The bump in resident size is perhaps unexpected but overall a robust behavior.
1337 590 : self.metrics.resident_physical_size_sub(layer_file_size);
1338 590 : self.metrics.evictions.inc();
1339 :
1340 590 : if let Some(delta) = local_layer_residence_duration {
1341 590 : self.metrics
1342 590 : .evictions_with_low_residence_duration
1343 590 : .read()
1344 590 : .unwrap()
1345 590 : .observe(delta);
1346 590 : info!(layer=%local_layer, residence_millis=delta.as_millis(), "evicted layer after known residence period");
1347 : } else {
1348 UBC 0 : info!(layer=%local_layer, "evicted layer after unknown residence period");
1349 : }
1350 :
1351 CBC 590 : Ok(())
1352 592 : }
1353 : }
1354 :
1355 UBC 0 : #[derive(Debug, thiserror::Error)]
1356 : pub(crate) enum EvictionError {
1357 : #[error("cannot evict a remote layer")]
1358 : CannotEvictRemoteLayer,
1359 : /// Most likely the to-be evicted layer has been deleted by compaction or gc which use the same
1360 : /// locks, so they got to execute before the eviction.
1361 : #[error("file backing the layer has been removed already")]
1362 : FileNotFound,
1363 : #[error("stat failed")]
1364 : StatFailed(#[source] std::io::Error),
1365 : /// In practice, this can be a number of things, but lets assume it means only this.
1366 : ///
1367 : /// This case includes situations such as the Layer was evicted and redownloaded in between,
1368 : /// because the file existed before an replacement attempt was made but now the Layers are
1369 : /// different objects in memory.
1370 : #[error("layer was no longer part of LayerMap")]
1371 : LayerNotFound(#[source] anyhow::Error),
1372 :
1373 : /// This should never happen
1374 : #[error("Metadata inconsistency")]
1375 : MetadataInconsistency(String),
1376 : }
1377 :
1378 : /// Number of times we will compute partition within a checkpoint distance.
1379 : const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
1380 :
1381 : // Private functions
1382 : impl Timeline {
1383 CBC 1553635 : fn get_checkpoint_distance(&self) -> u64 {
1384 1553635 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1385 1553635 : tenant_conf
1386 1553635 : .checkpoint_distance
1387 1553635 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
1388 1553635 : }
1389 :
1390 772363 : fn get_checkpoint_timeout(&self) -> Duration {
1391 772363 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1392 772363 : tenant_conf
1393 772363 : .checkpoint_timeout
1394 772363 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
1395 772363 : }
1396 :
1397 1302 : fn get_compaction_target_size(&self) -> u64 {
1398 1302 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1399 1302 : tenant_conf
1400 1302 : .compaction_target_size
1401 1302 : .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
1402 1302 : }
1403 :
1404 1257 : fn get_compaction_threshold(&self) -> usize {
1405 1257 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1406 1257 : tenant_conf
1407 1257 : .compaction_threshold
1408 1257 : .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
1409 1257 : }
1410 :
1411 23824 : fn get_image_creation_threshold(&self) -> usize {
1412 23824 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1413 23824 : tenant_conf
1414 23824 : .image_creation_threshold
1415 23824 : .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
1416 23824 : }
1417 :
1418 1958 : fn get_eviction_policy(&self) -> EvictionPolicy {
1419 1958 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1420 1958 : tenant_conf
1421 1958 : .eviction_policy
1422 1958 : .unwrap_or(self.conf.default_tenant_conf.eviction_policy)
1423 1958 : }
1424 :
1425 1331 : fn get_evictions_low_residence_duration_metric_threshold(
1426 1331 : tenant_conf: &TenantConfOpt,
1427 1331 : default_tenant_conf: &TenantConf,
1428 1331 : ) -> Duration {
1429 1331 : tenant_conf
1430 1331 : .evictions_low_residence_duration_metric_threshold
1431 1331 : .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
1432 1331 : }
1433 :
1434 11289 : fn get_gc_feedback(&self) -> bool {
1435 11289 : let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
1436 11289 : tenant_conf
1437 11289 : .gc_feedback
1438 11289 : .unwrap_or(self.conf.default_tenant_conf.gc_feedback)
1439 11289 : }
1440 :
1441 29 : pub(super) fn tenant_conf_updated(&self) {
1442 29 : // NB: Most tenant conf options are read by background loops, so,
1443 29 : // changes will automatically be picked up.
1444 29 :
1445 29 : // The threshold is embedded in the metric. So, we need to update it.
1446 29 : {
1447 29 : let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
1448 29 : &self.tenant_conf.read().unwrap().tenant_conf,
1449 29 : &self.conf.default_tenant_conf,
1450 29 : );
1451 29 : let tenant_id_str = self.tenant_id.to_string();
1452 29 : let timeline_id_str = self.timeline_id.to_string();
1453 29 : self.metrics
1454 29 : .evictions_with_low_residence_duration
1455 29 : .write()
1456 29 : .unwrap()
1457 29 : .change_threshold(&tenant_id_str, &timeline_id_str, new_threshold);
1458 29 : }
1459 29 : }
1460 :
1461 : /// Open a Timeline handle.
1462 : ///
1463 : /// Loads the metadata for the timeline into memory, but not the layer map.
1464 : #[allow(clippy::too_many_arguments)]
1465 1302 : pub(super) fn new(
1466 1302 : conf: &'static PageServerConf,
1467 1302 : tenant_conf: Arc<RwLock<AttachedTenantConf>>,
1468 1302 : metadata: &TimelineMetadata,
1469 1302 : ancestor: Option<Arc<Timeline>>,
1470 1302 : timeline_id: TimelineId,
1471 1302 : tenant_id: TenantId,
1472 1302 : generation: Generation,
1473 1302 : walredo_mgr: Arc<super::WalRedoManager>,
1474 1302 : resources: TimelineResources,
1475 1302 : pg_version: u32,
1476 1302 : initial_logical_size_can_start: Option<completion::Barrier>,
1477 1302 : initial_logical_size_attempt: Option<completion::Completion>,
1478 1302 : state: TimelineState,
1479 1302 : ) -> Arc<Self> {
1480 1302 : let disk_consistent_lsn = metadata.disk_consistent_lsn();
1481 1302 : let (state, _) = watch::channel(state);
1482 1302 :
1483 1302 : let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
1484 1302 : let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
1485 1302 :
1486 1302 : let tenant_conf_guard = tenant_conf.read().unwrap();
1487 1302 :
1488 1302 : let evictions_low_residence_duration_metric_threshold =
1489 1302 : Self::get_evictions_low_residence_duration_metric_threshold(
1490 1302 : &tenant_conf_guard.tenant_conf,
1491 1302 : &conf.default_tenant_conf,
1492 1302 : );
1493 1302 : drop(tenant_conf_guard);
1494 1302 :
1495 1302 : Arc::new_cyclic(|myself| {
1496 1302 : let mut result = Timeline {
1497 1302 : conf,
1498 1302 : tenant_conf,
1499 1302 : myself: myself.clone(),
1500 1302 : timeline_id,
1501 1302 : tenant_id,
1502 1302 : generation,
1503 1302 : pg_version,
1504 1302 : layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())),
1505 1302 : wanted_image_layers: Mutex::new(None),
1506 1302 :
1507 1302 : walredo_mgr,
1508 1302 : walreceiver: Mutex::new(None),
1509 1302 :
1510 1302 : remote_client: resources.remote_client.map(Arc::new),
1511 1302 :
1512 1302 : // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
1513 1302 : last_record_lsn: SeqWait::new(RecordLsn {
1514 1302 : last: disk_consistent_lsn,
1515 1302 : prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
1516 1302 : }),
1517 1302 : disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
1518 1302 :
1519 1302 : last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
1520 1302 : last_freeze_ts: RwLock::new(Instant::now()),
1521 1302 :
1522 1302 : loaded_at: (disk_consistent_lsn, SystemTime::now()),
1523 1302 :
1524 1302 : ancestor_timeline: ancestor,
1525 1302 : ancestor_lsn: metadata.ancestor_lsn(),
1526 1302 :
1527 1302 : metrics: TimelineMetrics::new(
1528 1302 : &tenant_id,
1529 1302 : &timeline_id,
1530 1302 : crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
1531 1302 : "mtime",
1532 1302 : evictions_low_residence_duration_metric_threshold,
1533 1302 : ),
1534 1302 : ),
1535 1302 :
1536 1302 : flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
1537 1302 :
1538 1302 : layer_flush_start_tx,
1539 1302 : layer_flush_done_tx,
1540 1302 :
1541 1302 : write_lock: tokio::sync::Mutex::new(()),
1542 1302 : layer_removal_cs: Default::default(),
1543 1302 :
1544 1302 : gc_info: std::sync::RwLock::new(GcInfo {
1545 1302 : retain_lsns: Vec::new(),
1546 1302 : horizon_cutoff: Lsn(0),
1547 1302 : pitr_cutoff: Lsn(0),
1548 1302 : }),
1549 1302 :
1550 1302 : latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
1551 1302 : initdb_lsn: metadata.initdb_lsn(),
1552 1302 :
1553 1302 : current_logical_size: if disk_consistent_lsn.is_valid() {
1554 : // we're creating timeline data with some layer files existing locally,
1555 : // need to recalculate timeline's logical size based on data in the layers.
1556 685 : LogicalSize::deferred_initial(disk_consistent_lsn)
1557 : } else {
1558 : // we're creating timeline data without any layers existing locally,
1559 : // initial logical size is 0.
1560 617 : LogicalSize::empty_initial()
1561 : },
1562 1302 : partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
1563 1302 : repartition_threshold: 0,
1564 1302 :
1565 1302 : last_received_wal: Mutex::new(None),
1566 1302 : rel_size_cache: RwLock::new(HashMap::new()),
1567 1302 :
1568 1302 : download_all_remote_layers_task_info: RwLock::new(None),
1569 1302 :
1570 1302 : state,
1571 1302 :
1572 1302 : eviction_task_timeline_state: tokio::sync::Mutex::new(
1573 1302 : EvictionTaskTimelineState::default(),
1574 1302 : ),
1575 1302 : delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
1576 1302 :
1577 1302 : initial_logical_size_can_start,
1578 1302 : initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt),
1579 1302 : };
1580 1302 : result.repartition_threshold =
1581 1302 : result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
1582 1302 : result
1583 1302 : .metrics
1584 1302 : .last_record_gauge
1585 1302 : .set(disk_consistent_lsn.0 as i64);
1586 1302 : result
1587 1302 : })
1588 1302 : }
1589 :
1590 1888 : pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
1591 1888 : let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
1592 1888 : match *flush_loop_state {
1593 1277 : FlushLoopState::NotStarted => (),
1594 : FlushLoopState::Running { .. } => {
1595 611 : info!(
1596 611 : "skipping attempt to start flush_loop twice {}/{}",
1597 611 : self.tenant_id, self.timeline_id
1598 611 : );
1599 611 : return;
1600 : }
1601 : FlushLoopState::Exited => {
1602 UBC 0 : warn!(
1603 0 : "ignoring attempt to restart exited flush_loop {}/{}",
1604 0 : self.tenant_id, self.timeline_id
1605 0 : );
1606 0 : return;
1607 : }
1608 : }
1609 :
1610 CBC 1277 : let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
1611 1277 : let self_clone = Arc::clone(self);
1612 1277 :
1613 1277 : debug!("spawning flush loop");
1614 1277 : *flush_loop_state = FlushLoopState::Running {
1615 1277 : #[cfg(test)]
1616 1277 : expect_initdb_optimization: false,
1617 1277 : #[cfg(test)]
1618 1277 : initdb_optimization_count: 0,
1619 1277 : };
1620 1277 : task_mgr::spawn(
1621 1277 : task_mgr::BACKGROUND_RUNTIME.handle(),
1622 1277 : task_mgr::TaskKind::LayerFlushTask,
1623 1277 : Some(self.tenant_id),
1624 1277 : Some(self.timeline_id),
1625 1277 : "layer flush task",
1626 : false,
1627 1277 : async move {
1628 1277 : let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
1629 19473 : self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
1630 443 : let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
1631 443 : assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
1632 443 : *flush_loop_state = FlushLoopState::Exited;
1633 443 : Ok(())
1634 443 : }
1635 1277 : .instrument(info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))
1636 : );
1637 1888 : }
1638 :
1639 : /// Creates and starts the wal receiver.
1640 : ///
1641 : /// This function is expected to be called at most once per Timeline's lifecycle
1642 : /// when the timeline is activated.
1643 1103 : fn launch_wal_receiver(
1644 1103 : self: &Arc<Self>,
1645 1103 : ctx: &RequestContext,
1646 1103 : broker_client: BrokerClientChannel,
1647 1103 : ) {
1648 1103 : info!(
1649 1103 : "launching WAL receiver for timeline {} of tenant {}",
1650 1103 : self.timeline_id, self.tenant_id
1651 1103 : );
1652 :
1653 1103 : let tenant_conf_guard = self.tenant_conf.read().unwrap();
1654 1103 : let wal_connect_timeout = tenant_conf_guard
1655 1103 : .tenant_conf
1656 1103 : .walreceiver_connect_timeout
1657 1103 : .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
1658 1103 : let lagging_wal_timeout = tenant_conf_guard
1659 1103 : .tenant_conf
1660 1103 : .lagging_wal_timeout
1661 1103 : .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
1662 1103 : let max_lsn_wal_lag = tenant_conf_guard
1663 1103 : .tenant_conf
1664 1103 : .max_lsn_wal_lag
1665 1103 : .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
1666 1103 : drop(tenant_conf_guard);
1667 1103 :
1668 1103 : let mut guard = self.walreceiver.lock().unwrap();
1669 1103 : assert!(
1670 1103 : guard.is_none(),
1671 UBC 0 : "multiple launches / re-launches of WAL receiver are not supported"
1672 : );
1673 CBC 1103 : *guard = Some(WalReceiver::start(
1674 1103 : Arc::clone(self),
1675 1103 : WalReceiverConf {
1676 1103 : wal_connect_timeout,
1677 1103 : lagging_wal_timeout,
1678 1103 : max_lsn_wal_lag,
1679 1103 : auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
1680 1103 : availability_zone: self.conf.availability_zone.clone(),
1681 1103 : },
1682 1103 : broker_client,
1683 1103 : ctx,
1684 1103 : ));
1685 1103 : }
1686 :
1687 : /// Initialize with an empty layer map. Used when creating a new timeline.
1688 967 : pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
1689 967 : let mut layers = self.layers.try_write().expect(
1690 967 : "in the context where we call this function, no other task has access to the object",
1691 967 : );
1692 967 : layers.initialize_empty(Lsn(start_lsn.0));
1693 967 : }
1694 :
1695 : /// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
1696 : /// files.
1697 314 : pub(super) async fn load_layer_map(
1698 314 : &self,
1699 314 : disk_consistent_lsn: Lsn,
1700 314 : index_part: Option<IndexPart>,
1701 314 : ) -> anyhow::Result<()> {
1702 : use init::{Decision::*, Discovered, DismissedLayer};
1703 : use LayerFileName::*;
1704 :
1705 314 : let mut guard = self.layers.write().await;
1706 :
1707 314 : let timer = self.metrics.load_layer_map_histo.start_timer();
1708 314 :
1709 314 : // Scan timeline directory and create ImageFileName and DeltaFilename
1710 314 : // structs representing all files on disk
1711 314 : let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id);
1712 314 : let (conf, tenant_id, timeline_id) = (self.conf, self.tenant_id, self.timeline_id);
1713 314 : let span = tracing::Span::current();
1714 314 :
1715 314 : // Copy to move into the task we're about to spawn
1716 314 : let generation = self.generation;
1717 :
1718 314 : let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
1719 314 : move || {
1720 314 : let _g = span.entered();
1721 314 : let discovered = init::scan_timeline_dir(&timeline_path)?;
1722 314 : let mut discovered_layers = Vec::with_capacity(discovered.len());
1723 314 : let mut unrecognized_files = Vec::new();
1724 314 :
1725 314 : let mut path = timeline_path;
1726 :
1727 5328 : for discovered in discovered {
1728 5014 : let (name, kind) = match discovered {
1729 4615 : Discovered::Layer(file_name, file_size) => {
1730 4615 : discovered_layers.push((file_name, file_size));
1731 4615 : continue;
1732 : }
1733 : Discovered::Metadata | Discovered::IgnoredBackup => {
1734 314 : continue;
1735 : }
1736 UBC 0 : Discovered::Unknown(file_name) => {
1737 0 : // we will later error if there are any
1738 0 : unrecognized_files.push(file_name);
1739 0 : continue;
1740 : }
1741 CBC 77 : Discovered::Ephemeral(name) => (name, "old ephemeral file"),
1742 8 : Discovered::Temporary(name) => (name, "temporary timeline file"),
1743 UBC 0 : Discovered::TemporaryDownload(name) => (name, "temporary download"),
1744 : };
1745 CBC 85 : path.push(Utf8Path::new(&name));
1746 85 : init::cleanup(&path, kind)?;
1747 85 : path.pop();
1748 : }
1749 :
1750 314 : if !unrecognized_files.is_empty() {
1751 : // assume that if there are any there are many many.
1752 UBC 0 : let n = unrecognized_files.len();
1753 0 : let first = &unrecognized_files[..n.min(10)];
1754 0 : anyhow::bail!(
1755 0 : "unrecognized files in timeline dir (total {n}), first 10: {first:?}"
1756 0 : );
1757 CBC 314 : }
1758 314 :
1759 314 : let decided = init::reconcile(
1760 314 : discovered_layers,
1761 314 : index_part.as_ref(),
1762 314 : disk_consistent_lsn,
1763 314 : generation,
1764 314 : );
1765 314 :
1766 314 : let mut loaded_layers = Vec::new();
1767 314 : let mut needs_cleanup = Vec::new();
1768 314 : let mut total_physical_size = 0;
1769 :
1770 8660 : for (name, decision) in decided {
1771 8346 : let decision = match decision {
1772 1768 : Ok(UseRemote { local, remote }) => {
1773 1768 : // Remote is authoritative, but we may still choose to retain
1774 1768 : // the local file if the contents appear to match
1775 1768 : if local.file_size() == remote.file_size() {
1776 : // Use the local file, but take the remote metadata so that we pick up
1777 : // the correct generation.
1778 1767 : UseLocal(remote)
1779 : } else {
1780 1 : path.push(name.file_name());
1781 1 : init::cleanup_local_file_for_remote(&path, &local, &remote)?;
1782 1 : path.pop();
1783 1 : UseRemote { local, remote }
1784 : }
1785 : }
1786 6310 : Ok(decision) => decision,
1787 7 : Err(DismissedLayer::Future { local }) => {
1788 7 : if local.is_some() {
1789 7 : path.push(name.file_name());
1790 7 : init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
1791 7 : path.pop();
1792 LBC (20) : }
1793 CBC 7 : needs_cleanup.push(name);
1794 7 : continue;
1795 : }
1796 261 : Err(DismissedLayer::LocalOnly(local)) => {
1797 261 : path.push(name.file_name());
1798 261 : init::cleanup_local_only_file(&path, &name, &local)?;
1799 261 : path.pop();
1800 261 : // this file never existed remotely, we will have to do rework
1801 261 : continue;
1802 : }
1803 : };
1804 :
1805 8078 : match &name {
1806 4853 : Delta(d) => assert!(d.lsn_range.end <= disk_consistent_lsn + 1),
1807 3225 : Image(i) => assert!(i.lsn <= disk_consistent_lsn),
1808 : }
1809 :
1810 8078 : let status = match &decision {
1811 4346 : UseLocal(_) => LayerResidenceStatus::Resident,
1812 3732 : Evicted(_) | UseRemote { .. } => LayerResidenceStatus::Evicted,
1813 : };
1814 :
1815 8078 : tracing::debug!(layer=%name, ?decision, ?status, "applied");
1816 :
1817 8078 : let stats = LayerAccessStats::for_loading_layer(status);
1818 :
1819 8078 : let layer: Arc<dyn PersistentLayer> = match (name, &decision) {
1820 2321 : (Delta(d), UseLocal(m)) => {
1821 2321 : total_physical_size += m.file_size();
1822 2321 : Arc::new(DeltaLayer::new(
1823 2321 : conf,
1824 2321 : timeline_id,
1825 2321 : tenant_id,
1826 2321 : &d,
1827 2321 : m.file_size(),
1828 2321 : stats,
1829 2321 : ))
1830 : }
1831 2025 : (Image(i), UseLocal(m)) => {
1832 2025 : total_physical_size += m.file_size();
1833 2025 : Arc::new(ImageLayer::new(
1834 2025 : conf,
1835 2025 : timeline_id,
1836 2025 : tenant_id,
1837 2025 : &i,
1838 2025 : m.file_size(),
1839 2025 : stats,
1840 2025 : ))
1841 : }
1842 2532 : (Delta(d), Evicted(remote) | UseRemote { remote, .. }) => Arc::new(
1843 2532 : RemoteLayer::new_delta(tenant_id, timeline_id, &d, remote, stats),
1844 2532 : ),
1845 1200 : (Image(i), Evicted(remote) | UseRemote { remote, .. }) => Arc::new(
1846 1200 : RemoteLayer::new_img(tenant_id, timeline_id, &i, remote, stats),
1847 1200 : ),
1848 : };
1849 :
1850 8078 : loaded_layers.push(layer);
1851 : }
1852 314 : Ok((loaded_layers, needs_cleanup, total_physical_size))
1853 314 : }
1854 314 : })
1855 314 : .await
1856 314 : .map_err(anyhow::Error::new)
1857 314 : .and_then(|x| x)?;
1858 :
1859 314 : let num_layers = loaded_layers.len();
1860 314 :
1861 314 : guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
1862 :
1863 314 : if let Some(rtc) = self.remote_client.as_ref() {
1864 314 : rtc.schedule_layer_file_deletion(needs_cleanup)?;
1865 314 : rtc.schedule_index_upload_for_file_changes()?;
1866 : // Tenant::create_timeline will wait for these uploads to happen before returning, or
1867 : // on retry.
1868 UBC 0 : }
1869 :
1870 CBC 314 : info!(
1871 314 : "loaded layer map with {} layers at {}, total physical size: {}",
1872 314 : num_layers, disk_consistent_lsn, total_physical_size
1873 314 : );
1874 314 : self.metrics.resident_physical_size_set(total_physical_size);
1875 314 :
1876 314 : timer.stop_and_record();
1877 314 : Ok(())
1878 314 : }
1879 :
1880 717 : fn try_spawn_size_init_task(self: &Arc<Self>, lsn: Lsn, ctx: &RequestContext) {
1881 717 : let state = self.current_state();
1882 691 : if matches!(
1883 717 : state,
1884 : TimelineState::Broken { .. } | TimelineState::Stopping
1885 : ) {
1886 : // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
1887 26 : return;
1888 691 : }
1889 :
1890 691 : let permit = match Arc::clone(&self.current_logical_size.initial_size_computation)
1891 691 : .try_acquire_owned()
1892 : {
1893 373 : Ok(permit) => permit,
1894 : Err(TryAcquireError::NoPermits) => {
1895 : // computation already ongoing or finished with success
1896 318 : return;
1897 : }
1898 UBC 0 : Err(TryAcquireError::Closed) => unreachable!("we never call close"),
1899 : };
1900 CBC 373 : debug_assert!(self
1901 373 : .current_logical_size
1902 373 : .initial_logical_size
1903 373 : .get()
1904 373 : .is_none());
1905 :
1906 373 : info!(
1907 373 : "spawning logical size computation from context of task kind {:?}",
1908 373 : ctx.task_kind()
1909 373 : );
1910 : // We need to start the computation task.
1911 : // It gets a separate context since it will outlive the request that called this function.
1912 373 : let self_clone = Arc::clone(self);
1913 373 : let background_ctx = ctx.detached_child(
1914 373 : TaskKind::InitialLogicalSizeCalculation,
1915 373 : DownloadBehavior::Download,
1916 373 : );
1917 373 : task_mgr::spawn(
1918 373 : task_mgr::BACKGROUND_RUNTIME.handle(),
1919 373 : task_mgr::TaskKind::InitialLogicalSizeCalculation,
1920 373 : Some(self.tenant_id),
1921 373 : Some(self.timeline_id),
1922 373 : "initial size calculation",
1923 373 : false,
1924 373 : // NB: don't log errors here, task_mgr will do that.
1925 373 : async move {
1926 373 :
1927 373 : let cancel = task_mgr::shutdown_token();
1928 :
1929 : // in case we were created during pageserver initialization, wait for
1930 : // initialization to complete before proceeding. startup time init runs on the same
1931 : // runtime.
1932 373 : tokio::select! {
1933 373 : _ = cancel.cancelled() => { return Ok(()); },
1934 373 : _ = completion::Barrier::maybe_wait(self_clone.initial_logical_size_can_start.clone()) => {}
1935 373 : };
1936 :
1937 : // hold off background tasks from starting until all timelines get to try at least
1938 : // once initial logical size calculation; though retry will rarely be useful.
1939 : // holding off is done because heavier tasks execute blockingly on the same
1940 : // runtime.
1941 : //
1942 : // dropping this at every outcome is probably better than trying to cling on to it,
1943 : // delay will be terminated by a timeout regardless.
1944 373 : let _completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() };
1945 373 :
1946 373 : // no extra cancellation here, because nothing really waits for this to complete compared
1947 373 : // to spawn_ondemand_logical_size_calculation.
1948 373 : let cancel = CancellationToken::new();
1949 :
1950 373 : let calculated_size = match self_clone
1951 373 : .logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel)
1952 193882 : .await
1953 : {
1954 368 : Ok(s) => s,
1955 : Err(CalculateLogicalSizeError::Cancelled) => {
1956 : // Don't make noise, this is a common task.
1957 : // In the unlikely case that there is another call to this function, we'll retry
1958 : // because initial_logical_size is still None.
1959 LBC (2) : info!("initial size calculation cancelled, likely timeline delete / tenant detach");
1960 (2) : return Ok(());
1961 : }
1962 CBC 3 : Err(CalculateLogicalSizeError::Other(err)) => {
1963 UBC 0 : if let Some(e @ PageReconstructError::AncestorStopping(_)) =
1964 CBC 3 : err.root_cause().downcast_ref()
1965 : {
1966 : // This can happen if the timeline parent timeline switches to
1967 : // Stopping state while we're still calculating the initial
1968 : // timeline size for the child, for example if the tenant is
1969 : // being detached or the pageserver is shut down. Like with
1970 : // CalculateLogicalSizeError::Cancelled, don't make noise.
1971 UBC 0 : info!("initial size calculation failed because the timeline or its ancestor is Stopping, likely because the tenant is being detached: {e:#}");
1972 0 : return Ok(());
1973 CBC 3 : }
1974 3 : return Err(err.context("Failed to calculate logical size"));
1975 : }
1976 : };
1977 :
1978 : // we cannot query current_logical_size.current_size() to know the current
1979 : // *negative* value, only truncated to u64.
1980 368 : let added = self_clone
1981 368 : .current_logical_size
1982 368 : .size_added_after_initial
1983 368 : .load(AtomicOrdering::Relaxed);
1984 368 :
1985 368 : let sum = calculated_size.saturating_add_signed(added);
1986 368 :
1987 368 : // set the gauge value before it can be set in `update_current_logical_size`.
1988 368 : self_clone.metrics.current_logical_size_gauge.set(sum);
1989 368 :
1990 368 : match self_clone
1991 368 : .current_logical_size
1992 368 : .initial_logical_size
1993 368 : .set(calculated_size)
1994 : {
1995 368 : Ok(()) => (),
1996 UBC 0 : Err(_what_we_just_attempted_to_set) => {
1997 0 : let existing_size = self_clone
1998 0 : .current_logical_size
1999 0 : .initial_logical_size
2000 0 : .get()
2001 0 : .expect("once_cell set was lost, then get failed, impossible.");
2002 0 : // This shouldn't happen because the semaphore is initialized with 1.
2003 0 : // But if it happens, just complain & report success so there are no further retries.
2004 0 : error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing")
2005 : }
2006 : }
2007 : // now that `initial_logical_size.is_some()`, reduce permit count to 0
2008 : // so that we prevent future callers from spawning this task
2009 CBC 368 : permit.forget();
2010 368 : Ok(())
2011 373 : }.in_current_span(),
2012 373 : );
2013 717 : }
2014 :
2015 39 : pub fn spawn_ondemand_logical_size_calculation(
2016 39 : self: &Arc<Self>,
2017 39 : lsn: Lsn,
2018 39 : cause: LogicalSizeCalculationCause,
2019 39 : ctx: RequestContext,
2020 39 : cancel: CancellationToken,
2021 39 : ) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
2022 39 : let (sender, receiver) = oneshot::channel();
2023 39 : let self_clone = Arc::clone(self);
2024 39 : // XXX if our caller loses interest, i.e., ctx is cancelled,
2025 39 : // we should stop the size calculation work and return an error.
2026 39 : // That would require restructuring this function's API to
2027 39 : // return the result directly, instead of a Receiver for the result.
2028 39 : let ctx = ctx.detached_child(
2029 39 : TaskKind::OndemandLogicalSizeCalculation,
2030 39 : DownloadBehavior::Download,
2031 39 : );
2032 39 : task_mgr::spawn(
2033 39 : task_mgr::BACKGROUND_RUNTIME.handle(),
2034 39 : task_mgr::TaskKind::OndemandLogicalSizeCalculation,
2035 39 : Some(self.tenant_id),
2036 39 : Some(self.timeline_id),
2037 39 : "ondemand logical size calculation",
2038 39 : false,
2039 39 : async move {
2040 39 : let res = self_clone
2041 39 : .logical_size_calculation_task(lsn, cause, &ctx, cancel)
2042 2589 : .await;
2043 39 : let _ = sender.send(res).ok();
2044 39 : Ok(()) // Receiver is responsible for handling errors
2045 39 : }
2046 39 : .in_current_span(),
2047 39 : );
2048 39 : receiver
2049 39 : }
2050 :
2051 1648 : #[instrument(skip_all)]
2052 : async fn logical_size_calculation_task(
2053 : self: &Arc<Self>,
2054 : lsn: Lsn,
2055 : cause: LogicalSizeCalculationCause,
2056 : ctx: &RequestContext,
2057 : cancel: CancellationToken,
2058 : ) -> Result<u64, CalculateLogicalSizeError> {
2059 : span::debug_assert_current_span_has_tenant_and_timeline_id();
2060 :
2061 : let mut timeline_state_updates = self.subscribe_for_state_updates();
2062 : let self_calculation = Arc::clone(self);
2063 :
2064 412 : let mut calculation = pin!(async {
2065 412 : let cancel = cancel.child_token();
2066 412 : let ctx = ctx.attached_child();
2067 412 : self_calculation
2068 412 : .calculate_logical_size(lsn, cause, cancel, &ctx)
2069 196471 : .await
2070 410 : });
2071 407 : let timeline_state_cancellation = async {
2072 : loop {
2073 196207 : match timeline_state_updates.changed().await {
2074 : Ok(()) => {
2075 LBC (2) : let new_state = timeline_state_updates.borrow().clone();
2076 (2) : match new_state {
2077 : // we're running this job for active timelines only
2078 UBC 0 : TimelineState::Active => continue,
2079 : TimelineState::Broken { .. }
2080 : | TimelineState::Stopping
2081 : | TimelineState::Loading => {
2082 LBC (2) : break format!("aborted because timeline became inactive (new state: {new_state:?})")
2083 : }
2084 : }
2085 : }
2086 UBC 0 : Err(_sender_dropped_error) => {
2087 0 : // can't happen, the sender is not dropped as long as the Timeline exists
2088 0 : break "aborted because state watch was dropped".to_string();
2089 : }
2090 : }
2091 : }
2092 LBC (2) : };
2093 :
2094 CBC 407 : let taskmgr_shutdown_cancellation = async {
2095 196336 : task_mgr::shutdown_watcher().await;
2096 UBC 0 : "aborted because task_mgr shutdown requested".to_string()
2097 0 : };
2098 :
2099 : loop {
2100 CBC 196883 : tokio::select! {
2101 410 : res = &mut calculation => { return res }
2102 LBC (2) : reason = timeline_state_cancellation => {
2103 UBC 0 : debug!(reason = reason, "cancelling calculation");
2104 : cancel.cancel();
2105 : return calculation.await;
2106 : }
2107 0 : reason = taskmgr_shutdown_cancellation => {
2108 0 : debug!(reason = reason, "cancelling calculation");
2109 : cancel.cancel();
2110 : return calculation.await;
2111 : }
2112 : }
2113 : }
2114 : }
2115 :
2116 : /// Calculate the logical size of the database at the latest LSN.
2117 : ///
2118 : /// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
2119 : /// especially if we need to download remote layers.
2120 CBC 418 : pub async fn calculate_logical_size(
2121 418 : &self,
2122 418 : up_to_lsn: Lsn,
2123 418 : cause: LogicalSizeCalculationCause,
2124 418 : cancel: CancellationToken,
2125 418 : ctx: &RequestContext,
2126 418 : ) -> Result<u64, CalculateLogicalSizeError> {
2127 418 : info!(
2128 418 : "Calculating logical size for timeline {} at {}",
2129 418 : self.timeline_id, up_to_lsn
2130 418 : );
2131 : // These failpoints are used by python tests to ensure that we don't delete
2132 : // the timeline while the logical size computation is ongoing.
2133 : // The first failpoint is used to make this function pause.
2134 : // Then the python test initiates timeline delete operation in a thread.
2135 : // It waits for a few seconds, then arms the second failpoint and disables
2136 : // the first failpoint. The second failpoint prints an error if the timeline
2137 : // delete code has deleted the on-disk state while we're still running here.
2138 : // It shouldn't do that. If it does it anyway, the error will be caught
2139 : // by the test suite, highlighting the problem.
2140 418 : fail::fail_point!("timeline-calculate-logical-size-pause");
2141 418 : fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
2142 2 : if !self
2143 2 : .conf
2144 2 : .metadata_path(&self.tenant_id, &self.timeline_id)
2145 2 : .exists()
2146 : {
2147 UBC 0 : error!("timeline-calculate-logical-size-pre metadata file does not exist")
2148 CBC 2 : }
2149 : // need to return something
2150 2 : Ok(0)
2151 418 : });
2152 : // See if we've already done the work for initial size calculation.
2153 : // This is a short-cut for timelines that are mostly unused.
2154 416 : if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
2155 4 : return Ok(size);
2156 412 : }
2157 412 : let storage_time_metrics = match cause {
2158 : LogicalSizeCalculationCause::Initial
2159 : | LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
2160 400 : | LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
2161 : LogicalSizeCalculationCause::EvictionTaskImitation => {
2162 12 : &self.metrics.imitate_logical_size_histo
2163 : }
2164 : };
2165 412 : let timer = storage_time_metrics.start_timer();
2166 412 : let logical_size = self
2167 412 : .get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx)
2168 197786 : .await?;
2169 UBC 0 : debug!("calculated logical size: {logical_size}");
2170 CBC 407 : timer.stop_and_record();
2171 407 : Ok(logical_size)
2172 416 : }
2173 :
2174 : /// Update current logical size, adding `delta' to the old value.
2175 1029042 : fn update_current_logical_size(&self, delta: i64) {
2176 1029042 : let logical_size = &self.current_logical_size;
2177 1029042 : logical_size.increment_size(delta);
2178 1029042 :
2179 1029042 : // Also set the value in the prometheus gauge. Note that
2180 1029042 : // there is a race condition here: if this is is called by two
2181 1029042 : // threads concurrently, the prometheus gauge might be set to
2182 1029042 : // one value while current_logical_size is set to the
2183 1029042 : // other.
2184 1029042 : match logical_size.current_size() {
2185 1027550 : Ok(CurrentLogicalSize::Exact(new_current_size)) => self
2186 1027550 : .metrics
2187 1027550 : .current_logical_size_gauge
2188 1027550 : .set(new_current_size),
2189 1492 : Ok(CurrentLogicalSize::Approximate(_)) => {
2190 1492 : // don't update the gauge yet, this allows us not to update the gauge back and
2191 1492 : // forth between the initial size calculation task.
2192 1492 : }
2193 : // this is overflow
2194 UBC 0 : Err(e) => error!("Failed to compute current logical size for metrics update: {e:?}"),
2195 : }
2196 CBC 1029042 : }
2197 :
2198 413 : async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
2199 413 : let guard = self.layers.read().await;
2200 54386 : for historic_layer in guard.layer_map().iter_historic_layers() {
2201 54386 : let historic_layer_name = historic_layer.filename().file_name();
2202 54386 : if layer_file_name == historic_layer_name {
2203 413 : return Some(guard.get_from_desc(&historic_layer));
2204 53973 : }
2205 : }
2206 :
2207 UBC 0 : None
2208 CBC 413 : }
2209 : }
2210 :
2211 : type TraversalId = String;
2212 :
2213 : trait TraversalLayerExt {
2214 : fn traversal_id(&self) -> TraversalId;
2215 : }
2216 :
2217 : impl TraversalLayerExt for Arc<dyn PersistentLayer> {
2218 1434 : fn traversal_id(&self) -> TraversalId {
2219 1434 : let timeline_id = self.layer_desc().timeline_id;
2220 1434 : match self.local_path() {
2221 10 : Some(local_path) => {
2222 10 : debug_assert!(local_path.to_string().contains(&format!("{}", timeline_id)),
2223 UBC 0 : "need timeline ID to uniquely identify the layer when traversal crosses ancestor boundary",
2224 : );
2225 CBC 10 : format!("{local_path}")
2226 : }
2227 : None => {
2228 1424 : format!("remote {}/{self}", timeline_id)
2229 : }
2230 : }
2231 1434 : }
2232 : }
2233 :
2234 : impl TraversalLayerExt for Arc<InMemoryLayer> {
2235 2 : fn traversal_id(&self) -> TraversalId {
2236 2 : format!("timeline {} in-memory {self}", self.get_timeline_id())
2237 2 : }
2238 : }
2239 :
2240 : impl Timeline {
2241 : ///
2242 : /// Get a handle to a Layer for reading.
2243 : ///
2244 : /// The returned Layer might be from an ancestor timeline, if the
2245 : /// segment hasn't been updated on this timeline yet.
2246 : ///
2247 : /// This function takes the current timeline's locked LayerMap as an argument,
2248 : /// so callers can avoid potential race conditions.
2249 6372305 : async fn get_reconstruct_data(
2250 6372305 : &self,
2251 6372305 : key: Key,
2252 6372305 : request_lsn: Lsn,
2253 6372305 : reconstruct_state: &mut ValueReconstructState,
2254 6372305 : ctx: &RequestContext,
2255 6372307 : ) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
2256 6372307 : // Start from the current timeline.
2257 6372307 : let mut timeline_owned;
2258 6372307 : let mut timeline = self;
2259 6372307 :
2260 6372307 : let mut read_count = scopeguard::guard(0, |cnt| {
2261 6372271 : crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
2262 6372307 : });
2263 6372307 :
2264 6372307 : // For debugging purposes, collect the path of layers that we traversed
2265 6372307 : // through. It's included in the error message if we fail to find the key.
2266 6372307 : let mut traversal_path = Vec::<TraversalPathItem>::new();
2267 :
2268 6372307 : let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
2269 1297785 : *cached_lsn
2270 : } else {
2271 5074522 : Lsn(0)
2272 : };
2273 :
2274 : // 'prev_lsn' tracks the last LSN that we were at in our search. It's used
2275 : // to check that each iteration make some progress, to break infinite
2276 : // looping if something goes wrong.
2277 6372307 : let mut prev_lsn = Lsn(u64::MAX);
2278 6372307 :
2279 6372307 : let mut result = ValueReconstructResult::Continue;
2280 6372307 : let mut cont_lsn = Lsn(request_lsn.0 + 1);
2281 :
2282 36845087 : 'outer: loop {
2283 36845087 : // The function should have updated 'state'
2284 36845087 : //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
2285 36845087 : match result {
2286 5181611 : ValueReconstructResult::Complete => return Ok(traversal_path),
2287 : ValueReconstructResult::Continue => {
2288 : // If we reached an earlier cached page image, we're done.
2289 31663469 : if cont_lsn == cached_lsn + 1 {
2290 1190638 : MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
2291 1190638 : return Ok(traversal_path);
2292 30472831 : }
2293 30472831 : if prev_lsn <= cont_lsn {
2294 : // Didn't make any progress in last iteration. Error out to avoid
2295 : // getting stuck in the loop.
2296 UBC 0 : return Err(layer_traversal_error(format!(
2297 0 : "could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
2298 0 : key,
2299 0 : Lsn(cont_lsn.0 - 1),
2300 0 : request_lsn,
2301 0 : timeline.ancestor_lsn
2302 0 : ), traversal_path));
2303 CBC 30472831 : }
2304 30472831 : prev_lsn = cont_lsn;
2305 : }
2306 : ValueReconstructResult::Missing => {
2307 : return Err(layer_traversal_error(
2308 7 : if cfg!(test) {
2309 2 : format!(
2310 2 : "could not find data for key {} at LSN {}, for request at LSN {}\n{}",
2311 2 : key, cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
2312 2 : )
2313 : } else {
2314 5 : format!(
2315 5 : "could not find data for key {} at LSN {}, for request at LSN {}",
2316 5 : key, cont_lsn, request_lsn
2317 5 : )
2318 : },
2319 7 : traversal_path,
2320 : ));
2321 : }
2322 : }
2323 :
2324 : // Recurse into ancestor if needed
2325 30472831 : if Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
2326 UBC 0 : trace!(
2327 0 : "going into ancestor {}, cont_lsn is {}",
2328 0 : timeline.ancestor_lsn,
2329 0 : cont_lsn
2330 0 : );
2331 CBC 1062058 : let ancestor = match timeline.get_ancestor_timeline() {
2332 1062058 : Ok(timeline) => timeline,
2333 UBC 0 : Err(e) => return Err(PageReconstructError::from(e)),
2334 : };
2335 :
2336 : // It's possible that the ancestor timeline isn't active yet, or
2337 : // is active but hasn't yet caught up to the branch point. Wait
2338 : // for it.
2339 : //
2340 : // This cannot happen while the pageserver is running normally,
2341 : // because you cannot create a branch from a point that isn't
2342 : // present in the pageserver yet. However, we don't wait for the
2343 : // branch point to be uploaded to cloud storage before creating
2344 : // a branch. I.e., the branch LSN need not be remote consistent
2345 : // for the branching operation to succeed.
2346 : //
2347 : // Hence, if we try to load a tenant in such a state where
2348 : // 1. the existence of the branch was persisted (in IndexPart and/or locally)
2349 : // 2. but the ancestor state is behind branch_lsn because it was not yet persisted
2350 : // then we will need to wait for the ancestor timeline to
2351 : // re-stream WAL up to branch_lsn before we access it.
2352 : //
2353 : // How can a tenant get in such a state?
2354 : // - ungraceful pageserver process exit
2355 : // - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
2356 : //
2357 : // NB: this could be avoided by requiring
2358 : // branch_lsn >= remote_consistent_lsn
2359 : // during branch creation.
2360 CBC 1062058 : match ancestor.wait_to_become_active(ctx).await {
2361 1062057 : Ok(()) => {}
2362 : Err(TimelineState::Stopping) => {
2363 UBC 0 : return Err(PageReconstructError::AncestorStopping(ancestor.timeline_id));
2364 : }
2365 CBC 1 : Err(state) => {
2366 1 : return Err(PageReconstructError::Other(anyhow::anyhow!(
2367 1 : "Timeline {} will not become active. Current state: {:?}",
2368 1 : ancestor.timeline_id,
2369 1 : &state,
2370 1 : )));
2371 : }
2372 : }
2373 1062057 : ancestor
2374 1062057 : .wait_lsn(timeline.ancestor_lsn, ctx)
2375 10 : .await
2376 1062057 : .with_context(|| {
2377 UBC 0 : format!(
2378 0 : "wait for lsn {} on ancestor timeline_id={}",
2379 0 : timeline.ancestor_lsn, ancestor.timeline_id
2380 0 : )
2381 CBC 1062057 : })?;
2382 :
2383 1062057 : timeline_owned = ancestor;
2384 1062057 : timeline = &*timeline_owned;
2385 1062057 : prev_lsn = Lsn(u64::MAX);
2386 1062057 : continue 'outer;
2387 29410773 : }
2388 :
2389 : #[allow(clippy::never_loop)] // see comment at bottom of this loop
2390 : 'layer_map_search: loop {
2391 1425 : let remote_layer = {
2392 29412190 : let guard = timeline.layers.read().await;
2393 29412164 : let layers = guard.layer_map();
2394 :
2395 : // Check the open and frozen in-memory layers first, in order from newest
2396 : // to oldest.
2397 29412164 : if let Some(open_layer) = &layers.open_layer {
2398 24833727 : let start_lsn = open_layer.get_lsn_range().start;
2399 24833727 : if cont_lsn > start_lsn {
2400 : //info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
2401 : // Get all the data needed to reconstruct the page version from this layer.
2402 : // But if we have an older cached page image, no need to go past that.
2403 5076087 : let lsn_floor = max(cached_lsn + 1, start_lsn);
2404 5076087 : result = match open_layer
2405 5076087 : .get_value_reconstruct_data(
2406 5076087 : key,
2407 5076087 : lsn_floor..cont_lsn,
2408 5076087 : reconstruct_state,
2409 5076087 : ctx,
2410 5076087 : )
2411 479207 : .await
2412 : {
2413 5076086 : Ok(result) => result,
2414 UBC 0 : Err(e) => return Err(PageReconstructError::from(e)),
2415 : };
2416 CBC 5076086 : cont_lsn = lsn_floor;
2417 5076086 : // metrics: open_layer does not count as fs access, so we are not updating `read_count`
2418 5076086 : traversal_path.push((
2419 5076086 : result,
2420 5076086 : cont_lsn,
2421 5076086 : Box::new({
2422 5076086 : let open_layer = Arc::clone(open_layer);
2423 5076086 : move || open_layer.traversal_id()
2424 5076086 : }),
2425 5076086 : ));
2426 5076086 : continue 'outer;
2427 19757640 : }
2428 4578437 : }
2429 24336077 : for frozen_layer in layers.frozen_layers.iter().rev() {
2430 3105084 : let start_lsn = frozen_layer.get_lsn_range().start;
2431 3105084 : if cont_lsn > start_lsn {
2432 : //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
2433 387246 : let lsn_floor = max(cached_lsn + 1, start_lsn);
2434 387246 : result = match frozen_layer
2435 387246 : .get_value_reconstruct_data(
2436 387246 : key,
2437 387246 : lsn_floor..cont_lsn,
2438 387246 : reconstruct_state,
2439 387246 : ctx,
2440 387246 : )
2441 27403 : .await
2442 : {
2443 387246 : Ok(result) => result,
2444 UBC 0 : Err(e) => return Err(PageReconstructError::from(e)),
2445 : };
2446 CBC 387246 : cont_lsn = lsn_floor;
2447 387246 : // metrics: open_layer does not count as fs access, so we are not updating `read_count`
2448 387246 : traversal_path.push((
2449 387246 : result,
2450 387246 : cont_lsn,
2451 387246 : Box::new({
2452 387246 : let frozen_layer = Arc::clone(frozen_layer);
2453 387246 : move || frozen_layer.traversal_id()
2454 387246 : }),
2455 387246 : ));
2456 387246 : continue 'outer;
2457 2717838 : }
2458 : }
2459 :
2460 23948831 : if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
2461 23851667 : let layer = guard.get_from_desc(&layer);
2462 : // If it's a remote layer, download it and retry.
2463 1425 : if let Some(remote_layer) =
2464 23851667 : super::storage_layer::downcast_remote_layer(&layer)
2465 : {
2466 : // TODO: push a breadcrumb to 'traversal_path' to record the fact that
2467 : // we downloaded / would need to download this layer.
2468 1425 : remote_layer // download happens outside the scope of `layers` guard object
2469 : } else {
2470 : // Get all the data needed to reconstruct the page version from this layer.
2471 : // But if we have an older cached page image, no need to go past that.
2472 23850242 : let lsn_floor = max(cached_lsn + 1, lsn_floor);
2473 23850242 : result = match layer
2474 23850242 : .get_value_reconstruct_data(
2475 23850242 : key,
2476 23850242 : lsn_floor..cont_lsn,
2477 23850242 : reconstruct_state,
2478 23850242 : ctx,
2479 23850242 : )
2480 2126171 : .await
2481 : {
2482 23850227 : Ok(result) => result,
2483 10 : Err(e) => return Err(PageReconstructError::from(e)),
2484 : };
2485 23850227 : cont_lsn = lsn_floor;
2486 23850227 : *read_count += 1;
2487 23850227 : traversal_path.push((
2488 23850227 : result,
2489 23850227 : cont_lsn,
2490 23850227 : Box::new({
2491 23850227 : let layer = Arc::clone(&layer);
2492 23850227 : move || layer.traversal_id()
2493 23850227 : }),
2494 23850227 : ));
2495 23850227 : continue 'outer;
2496 : }
2497 97164 : } else if timeline.ancestor_timeline.is_some() {
2498 : // Nothing on this timeline. Traverse to parent
2499 97159 : result = ValueReconstructResult::Continue;
2500 97159 : cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
2501 97159 : continue 'outer;
2502 : } else {
2503 : // Nothing found
2504 5 : result = ValueReconstructResult::Missing;
2505 5 : continue 'outer;
2506 : }
2507 : };
2508 : // Download the remote_layer and replace it in the layer map.
2509 : // For that, we need to release the mutex. Otherwise, we'd deadlock.
2510 : //
2511 : // The control flow is so weird here because `drop(layers)` inside
2512 : // the if stmt above is not enough for current rustc: it requires
2513 : // that the layers lock guard is not in scope across the download
2514 : // await point.
2515 1425 : let remote_layer_as_persistent: Arc<dyn PersistentLayer> =
2516 1425 : Arc::clone(&remote_layer) as Arc<dyn PersistentLayer>;
2517 1425 : let id = remote_layer_as_persistent.traversal_id();
2518 1424 : info!(
2519 1424 : "need remote layer {} for task kind {:?}",
2520 1424 : id,
2521 1424 : ctx.task_kind()
2522 1424 : );
2523 :
2524 : // The next layer doesn't exist locally. Need to download it.
2525 : // (The control flow is a bit complicated here because we must drop the 'layers'
2526 : // lock before awaiting on the Future.)
2527 1424 : match (
2528 1424 : ctx.download_behavior(),
2529 1424 : self.conf.ondemand_download_behavior_treat_error_as_warn,
2530 1424 : ) {
2531 : (DownloadBehavior::Download, _) => {
2532 1424 : info!(
2533 1424 : "on-demand downloading remote layer {id} for task kind {:?}",
2534 1424 : ctx.task_kind()
2535 1424 : );
2536 1903 : timeline.download_remote_layer(remote_layer).await?;
2537 1417 : continue 'layer_map_search;
2538 : }
2539 : (DownloadBehavior::Warn, _) | (DownloadBehavior::Error, true) => {
2540 UBC 0 : warn!(
2541 0 : "unexpectedly on-demand downloading remote layer {} for task kind {:?}",
2542 0 : id,
2543 0 : ctx.task_kind()
2544 0 : );
2545 0 : UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
2546 0 : timeline.download_remote_layer(remote_layer).await?;
2547 0 : continue 'layer_map_search;
2548 : }
2549 : (DownloadBehavior::Error, false) => {
2550 0 : return Err(PageReconstructError::NeedsDownload(
2551 0 : TenantTimelineId::new(self.tenant_id, self.timeline_id),
2552 0 : remote_layer.filename(),
2553 0 : ))
2554 : }
2555 : }
2556 : }
2557 : }
2558 CBC 6372271 : }
2559 :
2560 6372321 : async fn lookup_cached_page(
2561 6372321 : &self,
2562 6372321 : key: &Key,
2563 6372321 : lsn: Lsn,
2564 6372321 : ctx: &RequestContext,
2565 6372323 : ) -> Option<(Lsn, Bytes)> {
2566 6372323 : let cache = page_cache::get();
2567 :
2568 : // FIXME: It's pointless to check the cache for things that are not 8kB pages.
2569 : // We should look at the key to determine if it's a cacheable object
2570 6372323 : let (lsn, read_guard) = cache
2571 6372323 : .lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn, ctx)
2572 5074522 : .await?;
2573 1297801 : let img = Bytes::from(read_guard.to_vec());
2574 1297801 : Some((lsn, img))
2575 6372323 : }
2576 :
2577 1062058 : fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
2578 1062058 : let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
2579 UBC 0 : format!(
2580 0 : "Ancestor is missing. Timeline id: {} Ancestor id {:?}",
2581 0 : self.timeline_id,
2582 0 : self.get_ancestor_timeline_id(),
2583 0 : )
2584 CBC 1062058 : })?;
2585 1062058 : Ok(Arc::clone(ancestor))
2586 1062058 : }
2587 :
2588 : ///
2589 : /// Get a handle to the latest layer for appending.
2590 : ///
2591 76639811 : async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
2592 76639858 : let mut guard = self.layers.write().await;
2593 76639855 : let layer = guard
2594 76639855 : .get_layer_for_write(
2595 76639855 : lsn,
2596 76639855 : self.get_last_record_lsn(),
2597 76639855 : self.conf,
2598 76639855 : self.timeline_id,
2599 76639855 : self.tenant_id,
2600 76639855 : )
2601 UBC 0 : .await?;
2602 CBC 76639855 : Ok(layer)
2603 76639855 : }
2604 :
2605 76622124 : async fn put_value(
2606 76622124 : &self,
2607 76622124 : key: Key,
2608 76622124 : lsn: Lsn,
2609 76622124 : val: &Value,
2610 76622124 : ctx: &RequestContext,
2611 76622124 : ) -> anyhow::Result<()> {
2612 : //info!("PUT: key {} at {}", key, lsn);
2613 76622171 : let layer = self.get_layer_for_write(lsn).await?;
2614 76622168 : layer.put_value(key, lsn, val, ctx).await?;
2615 76622164 : Ok(())
2616 76622164 : }
2617 :
2618 17687 : async fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
2619 17687 : let layer = self.get_layer_for_write(lsn).await?;
2620 17687 : layer.put_tombstone(key_range, lsn).await?;
2621 17687 : Ok(())
2622 17687 : }
2623 :
2624 69330499 : fn finish_write(&self, new_lsn: Lsn) {
2625 69330499 : assert!(new_lsn.is_aligned());
2626 :
2627 69330499 : self.metrics.last_record_gauge.set(new_lsn.0 as i64);
2628 69330499 : self.last_record_lsn.advance(new_lsn);
2629 69330499 : }
2630 :
2631 5575 : async fn freeze_inmem_layer(&self, write_lock_held: bool) {
2632 : // Freeze the current open in-memory layer. It will be written to disk on next
2633 : // iteration.
2634 5575 : let _write_guard = if write_lock_held {
2635 3983 : None
2636 : } else {
2637 1592 : Some(self.write_lock.lock().await)
2638 : };
2639 5575 : let mut guard = self.layers.write().await;
2640 5575 : guard
2641 5575 : .try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at)
2642 21 : .await;
2643 5575 : }
2644 :
2645 : /// Layer flusher task's main loop.
2646 1277 : async fn flush_loop(
2647 1277 : self: &Arc<Self>,
2648 1277 : mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
2649 1277 : ctx: &RequestContext,
2650 1277 : ) {
2651 1277 : info!("started flush loop");
2652 : loop {
2653 5932 : tokio::select! {
2654 10608 : _ = task_mgr::shutdown_watcher() => {
2655 10608 : info!("shutting down layer flush task");
2656 10608 : break;
2657 10608 : },
2658 10608 : _ = layer_flush_start_rx.changed() => {}
2659 10608 : }
2660 :
2661 UBC 0 : trace!("waking up");
2662 CBC 4663 : let timer = self.metrics.flush_time_histo.start_timer();
2663 4663 : let flush_counter = *layer_flush_start_rx.borrow();
2664 4655 : let result = loop {
2665 10006 : let layer_to_flush = {
2666 10006 : let guard = self.layers.read().await;
2667 10006 : guard.layer_map().frozen_layers.front().cloned()
2668 : // drop 'layers' lock to allow concurrent reads and writes
2669 : };
2670 10006 : let Some(layer_to_flush) = layer_to_flush else {
2671 4655 : break Ok(());
2672 : };
2673 13069 : if let Err(err) = self.flush_frozen_layer(layer_to_flush, ctx).await {
2674 UBC 0 : error!("could not flush frozen layer: {err:?}");
2675 0 : break Err(err);
2676 CBC 5343 : }
2677 : };
2678 : // Notify any listeners that we're done
2679 4655 : let _ = self
2680 4655 : .layer_flush_done_tx
2681 4655 : .send_replace((flush_counter, result));
2682 4655 :
2683 4655 : timer.stop_and_record();
2684 : }
2685 443 : }
2686 :
2687 1592 : async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
2688 1592 : let mut rx = self.layer_flush_done_tx.subscribe();
2689 1592 :
2690 1592 : // Increment the flush cycle counter and wake up the flush task.
2691 1592 : // Remember the new value, so that when we listen for the flush
2692 1592 : // to finish, we know when the flush that we initiated has
2693 1592 : // finished, instead of some other flush that was started earlier.
2694 1592 : let mut my_flush_request = 0;
2695 1592 :
2696 1592 : let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
2697 1592 : if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
2698 59 : anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
2699 1533 : }
2700 1533 :
2701 1533 : self.layer_flush_start_tx.send_modify(|counter| {
2702 1533 : my_flush_request = *counter + 1;
2703 1533 : *counter = my_flush_request;
2704 1533 : });
2705 :
2706 3067 : loop {
2707 3067 : {
2708 3067 : let (last_result_counter, last_result) = &*rx.borrow();
2709 3067 : if *last_result_counter >= my_flush_request {
2710 1533 : if let Err(_err) = last_result {
2711 : // We already logged the original error in
2712 : // flush_loop. We cannot propagate it to the caller
2713 : // here, because it might not be Cloneable
2714 UBC 0 : anyhow::bail!(
2715 0 : "Could not flush frozen layer. Request id: {}",
2716 0 : my_flush_request
2717 0 : );
2718 : } else {
2719 CBC 1533 : return Ok(());
2720 : }
2721 1534 : }
2722 : }
2723 UBC 0 : trace!("waiting for flush to complete");
2724 CBC 1535 : rx.changed().await?;
2725 UBC 0 : trace!("done")
2726 : }
2727 CBC 1592 : }
2728 :
2729 3983 : fn flush_frozen_layers(&self) {
2730 3983 : self.layer_flush_start_tx.send_modify(|val| *val += 1);
2731 3983 : }
2732 :
2733 : /// Flush one frozen in-memory layer to disk, as a new delta layer.
2734 5351 : #[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer))]
2735 : async fn flush_frozen_layer(
2736 : self: &Arc<Self>,
2737 : frozen_layer: Arc<InMemoryLayer>,
2738 : ctx: &RequestContext,
2739 : ) -> anyhow::Result<()> {
2740 : // As a special case, when we have just imported an image into the repository,
2741 : // instead of writing out a L0 delta layer, we directly write out image layer
2742 : // files instead. This is possible as long as *all* the data imported into the
2743 : // repository have the same LSN.
2744 : let lsn_range = frozen_layer.get_lsn_range();
2745 : let (layer_paths_to_upload, delta_layer_to_add) =
2746 : if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
2747 : #[cfg(test)]
2748 : match &mut *self.flush_loop_state.lock().unwrap() {
2749 : FlushLoopState::NotStarted | FlushLoopState::Exited => {
2750 : panic!("flush loop not running")
2751 : }
2752 : FlushLoopState::Running {
2753 : initdb_optimization_count,
2754 : ..
2755 : } => {
2756 : *initdb_optimization_count += 1;
2757 : }
2758 : }
2759 : // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
2760 : // require downloading anything during initial import.
2761 : let (partitioning, _lsn) = self
2762 : .repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx)
2763 : .await?;
2764 : // For image layers, we add them immediately into the layer map.
2765 : (
2766 : self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
2767 : .await?,
2768 : None,
2769 : )
2770 : } else {
2771 : #[cfg(test)]
2772 : match &mut *self.flush_loop_state.lock().unwrap() {
2773 : FlushLoopState::NotStarted | FlushLoopState::Exited => {
2774 : panic!("flush loop not running")
2775 : }
2776 : FlushLoopState::Running {
2777 : expect_initdb_optimization,
2778 : ..
2779 : } => {
2780 : assert!(!*expect_initdb_optimization, "expected initdb optimization");
2781 : }
2782 : }
2783 : // Normal case, write out a L0 delta layer file.
2784 : // `create_delta_layer` will not modify the layer map.
2785 : // We will remove frozen layer and add delta layer in one atomic operation later.
2786 : let layer = self.create_delta_layer(&frozen_layer, ctx).await?;
2787 : (
2788 : HashMap::from([(
2789 : layer.filename(),
2790 : LayerFileMetadata::new(layer.layer_desc().file_size, self.generation),
2791 : )]),
2792 : Some(layer),
2793 : )
2794 : };
2795 :
2796 : // The new on-disk layers are now in the layer map. We can remove the
2797 : // in-memory layer from the map now. The flushed layer is stored in
2798 : // the mapping in `create_delta_layer`.
2799 : {
2800 : let mut guard = self.layers.write().await;
2801 :
2802 : if let Some(ref l) = delta_layer_to_add {
2803 : // TODO: move access stats, metrics update, etc. into layer manager.
2804 : l.access_stats().record_residence_event(
2805 : LayerResidenceStatus::Resident,
2806 : LayerResidenceEventReason::LayerCreate,
2807 : );
2808 :
2809 : // update metrics
2810 : let sz = l.layer_desc().file_size;
2811 : self.metrics.record_new_file_metrics(sz);
2812 : }
2813 :
2814 : guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer);
2815 : // release lock on 'layers'
2816 : }
2817 :
2818 : // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
2819 : // a compaction can delete the file and then it won't be available for uploads any more.
2820 : // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
2821 : // race situation.
2822 : // See https://github.com/neondatabase/neon/issues/4526
2823 5347 : pausable_failpoint!("flush-frozen-pausable");
2824 :
2825 : // This failpoint is used by another test case `test_pageserver_recovery`.
2826 UBC 0 : fail_point!("flush-frozen-exit");
2827 :
2828 : // Update the metadata file, with new 'disk_consistent_lsn'
2829 : //
2830 : // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
2831 : // *all* the layers, to avoid fsyncing the file multiple times.
2832 : let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
2833 : let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
2834 :
2835 : // If we were able to advance 'disk_consistent_lsn', save it the metadata file.
2836 : // After crash, we will restart WAL streaming and processing from that point.
2837 : if disk_consistent_lsn != old_disk_consistent_lsn {
2838 : assert!(disk_consistent_lsn > old_disk_consistent_lsn);
2839 : self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload)
2840 : .await
2841 : .context("update_metadata_file")?;
2842 : // Also update the in-memory copy
2843 : self.disk_consistent_lsn.store(disk_consistent_lsn);
2844 : }
2845 : Ok(())
2846 : }
2847 :
2848 : /// Update metadata file
2849 CBC 5368 : async fn update_metadata_file(
2850 5368 : &self,
2851 5368 : disk_consistent_lsn: Lsn,
2852 5368 : layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
2853 5368 : ) -> anyhow::Result<()> {
2854 5368 : // We can only save a valid 'prev_record_lsn' value on disk if we
2855 5368 : // flushed *all* in-memory changes to disk. We only track
2856 5368 : // 'prev_record_lsn' in memory for the latest processed record, so we
2857 5368 : // don't remember what the correct value that corresponds to some old
2858 5368 : // LSN is. But if we flush everything, then the value corresponding
2859 5368 : // current 'last_record_lsn' is correct and we can store it on disk.
2860 5368 : let RecordLsn {
2861 5368 : last: last_record_lsn,
2862 5368 : prev: prev_record_lsn,
2863 5368 : } = self.last_record_lsn.load();
2864 5368 : let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
2865 1214 : Some(prev_record_lsn)
2866 : } else {
2867 4154 : None
2868 : };
2869 :
2870 5368 : let ancestor_timeline_id = self
2871 5368 : .ancestor_timeline
2872 5368 : .as_ref()
2873 5368 : .map(|ancestor| ancestor.timeline_id);
2874 5368 :
2875 5368 : let metadata = TimelineMetadata::new(
2876 5368 : disk_consistent_lsn,
2877 5368 : ondisk_prev_record_lsn,
2878 5368 : ancestor_timeline_id,
2879 5368 : self.ancestor_lsn,
2880 5368 : *self.latest_gc_cutoff_lsn.read(),
2881 5368 : self.initdb_lsn,
2882 5368 : self.pg_version,
2883 5368 : );
2884 5368 :
2885 5368 : fail_point!("checkpoint-before-saving-metadata", |x| bail!(
2886 UBC 0 : "{}",
2887 0 : x.unwrap()
2888 CBC 5368 : ));
2889 :
2890 5368 : save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
2891 UBC 0 : .await
2892 CBC 5368 : .context("save_metadata")?;
2893 :
2894 5368 : if let Some(remote_client) = &self.remote_client {
2895 10711 : for (path, layer_metadata) in layer_paths_to_upload {
2896 5343 : remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
2897 : }
2898 5368 : remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
2899 UBC 0 : }
2900 :
2901 CBC 5368 : Ok(())
2902 5368 : }
2903 :
2904 : // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
2905 : // in layer map immediately. The caller is responsible to put it into the layer map.
2906 5312 : async fn create_delta_layer(
2907 5312 : self: &Arc<Self>,
2908 5312 : frozen_layer: &Arc<InMemoryLayer>,
2909 5312 : ctx: &RequestContext,
2910 5312 : ) -> anyhow::Result<DeltaLayer> {
2911 5312 : let span = tracing::info_span!("blocking");
2912 5312 : let new_delta: DeltaLayer = tokio::task::spawn_blocking({
2913 5312 : let _g = span.entered();
2914 5312 : let self_clone = Arc::clone(self);
2915 5312 : let frozen_layer = Arc::clone(frozen_layer);
2916 5312 : let ctx = ctx.attached_child();
2917 5312 : move || {
2918 : // Write it out
2919 : // Keep this inside `spawn_blocking` and `Handle::current`
2920 : // as long as the write path is still sync and the read impl
2921 : // is still not fully async. Otherwise executor threads would
2922 : // be blocked.
2923 5312 : let new_delta = Handle::current().block_on(frozen_layer.write_to_disk(&ctx))?;
2924 5312 : let new_delta_path = new_delta.path();
2925 5312 :
2926 5312 : // Sync it to disk.
2927 5312 : //
2928 5312 : // We must also fsync the timeline dir to ensure the directory entries for
2929 5312 : // new layer files are durable.
2930 5312 : //
2931 5312 : // NB: timeline dir must be synced _after_ the file contents are durable.
2932 5312 : // So, two separate fsyncs are required, they mustn't be batched.
2933 5312 : //
2934 5312 : // TODO: If we're running inside 'flush_frozen_layers' and there are multiple
2935 5312 : // files to flush, the fsync overhead can be reduces as follows:
2936 5312 : // 1. write them all to temporary file names
2937 5312 : // 2. fsync them
2938 5312 : // 3. rename to the final name
2939 5312 : // 4. fsync the parent directory.
2940 5312 : // Note that (1),(2),(3) today happen inside write_to_disk().
2941 5312 : par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
2942 5312 : par_fsync::par_fsync(&[self_clone
2943 5312 : .conf
2944 5312 : .timeline_path(&self_clone.tenant_id, &self_clone.timeline_id)])
2945 5312 : .context("fsync of timeline dir")?;
2946 :
2947 5308 : anyhow::Ok(new_delta)
2948 5312 : }
2949 5312 : })
2950 5308 : .await
2951 5308 : .context("spawn_blocking")??;
2952 :
2953 5308 : Ok(new_delta)
2954 5308 : }
2955 :
2956 1302 : async fn repartition(
2957 1302 : &self,
2958 1302 : lsn: Lsn,
2959 1302 : partition_size: u64,
2960 1302 : ctx: &RequestContext,
2961 1302 : ) -> anyhow::Result<(KeyPartitioning, Lsn)> {
2962 1302 : {
2963 1302 : let partitioning_guard = self.partitioning.lock().unwrap();
2964 1302 : let distance = lsn.0 - partitioning_guard.1 .0;
2965 1302 : if partitioning_guard.1 != Lsn(0) && distance <= self.repartition_threshold {
2966 UBC 0 : debug!(
2967 0 : distance,
2968 0 : threshold = self.repartition_threshold,
2969 0 : "no repartitioning needed"
2970 0 : );
2971 CBC 639 : return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
2972 663 : }
2973 : }
2974 279558 : let keyspace = self.collect_keyspace(lsn, ctx).await?;
2975 662 : let partitioning = keyspace.partition(partition_size);
2976 662 :
2977 662 : let mut partitioning_guard = self.partitioning.lock().unwrap();
2978 662 : if lsn > partitioning_guard.1 {
2979 662 : *partitioning_guard = (partitioning, lsn);
2980 662 : } else {
2981 UBC 0 : warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
2982 : }
2983 CBC 662 : Ok((partitioning_guard.0.clone(), partitioning_guard.1))
2984 1301 : }
2985 :
2986 : // Is it time to create a new image layer for the given partition?
2987 23824 : async fn time_for_new_image_layer(
2988 23824 : &self,
2989 23824 : partition: &KeySpace,
2990 23824 : lsn: Lsn,
2991 23824 : ) -> anyhow::Result<bool> {
2992 23824 : let threshold = self.get_image_creation_threshold();
2993 :
2994 23824 : let guard = self.layers.read().await;
2995 23824 : let layers = guard.layer_map();
2996 23824 :
2997 23824 : let mut max_deltas = 0;
2998 23824 : {
2999 23824 : let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
3000 23824 : if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
3001 3653 : let img_range =
3002 3653 : partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
3003 3653 : if wanted.overlaps(&img_range) {
3004 : //
3005 : // gc_timeline only pays attention to image layers that are older than the GC cutoff,
3006 : // but create_image_layers creates image layers at last-record-lsn.
3007 : // So it's possible that gc_timeline wants a new image layer to be created for a key range,
3008 : // but the range is already covered by image layers at more recent LSNs. Before we
3009 : // create a new image layer, check if the range is already covered at more recent LSNs.
3010 UBC 0 : if !layers
3011 0 : .image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))?
3012 : {
3013 0 : debug!(
3014 0 : "Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
3015 0 : img_range.start, img_range.end, cutoff_lsn, lsn
3016 0 : );
3017 0 : return Ok(true);
3018 0 : }
3019 CBC 3653 : }
3020 20171 : }
3021 : }
3022 :
3023 1833014 : for part_range in &partition.ranges {
3024 1812557 : let image_coverage = layers.image_coverage(part_range, lsn)?;
3025 3483460 : for (img_range, last_img) in image_coverage {
3026 1674270 : let img_lsn = if let Some(last_img) = last_img {
3027 203479 : last_img.get_lsn_range().end
3028 : } else {
3029 1470791 : Lsn(0)
3030 : };
3031 : // Let's consider an example:
3032 : //
3033 : // delta layer with LSN range 71-81
3034 : // delta layer with LSN range 81-91
3035 : // delta layer with LSN range 91-101
3036 : // image layer at LSN 100
3037 : //
3038 : // If 'lsn' is still 100, i.e. no new WAL has been processed since the last image layer,
3039 : // there's no need to create a new one. We check this case explicitly, to avoid passing
3040 : // a bogus range to count_deltas below, with start > end. It's even possible that there
3041 : // are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
3042 : // after we read last_record_lsn, which is passed here in the 'lsn' argument.
3043 1674270 : if img_lsn < lsn {
3044 1655411 : let num_deltas =
3045 1655411 : layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?;
3046 :
3047 1655411 : max_deltas = max_deltas.max(num_deltas);
3048 1655411 : if num_deltas >= threshold {
3049 UBC 0 : debug!(
3050 0 : "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
3051 0 : img_range.start, img_range.end, num_deltas, img_lsn, lsn
3052 0 : );
3053 CBC 3367 : return Ok(true);
3054 1652044 : }
3055 18859 : }
3056 : }
3057 : }
3058 :
3059 UBC 0 : debug!(
3060 0 : max_deltas,
3061 0 : "none of the partitioned ranges had >= {threshold} deltas"
3062 0 : );
3063 CBC 20457 : Ok(false)
3064 23824 : }
3065 :
3066 1301 : async fn create_image_layers(
3067 1301 : &self,
3068 1301 : partitioning: &KeyPartitioning,
3069 1301 : lsn: Lsn,
3070 1301 : force: bool,
3071 1301 : ctx: &RequestContext,
3072 1301 : ) -> Result<HashMap<LayerFileName, LayerFileMetadata>, PageReconstructError> {
3073 1301 : let timer = self.metrics.create_images_time_histo.start_timer();
3074 1301 : let mut image_layers: Vec<ImageLayer> = Vec::new();
3075 1301 :
3076 1301 : // We need to avoid holes between generated image layers.
3077 1301 : // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
3078 1301 : // image layer with hole between them. In this case such layer can not be utilized by GC.
3079 1301 : //
3080 1301 : // How such hole between partitions can appear?
3081 1301 : // if we have relation with relid=1 and size 100 and relation with relid=2 with size 200 then result of
3082 1301 : // KeySpace::partition may contain partitions <100000000..100000099> and <200000000..200000199>.
3083 1301 : // If there is delta layer <100000000..300000000> then it never be garbage collected because
3084 1301 : // image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
3085 1301 : let mut start = Key::MIN;
3086 :
3087 23863 : for partition in partitioning.parts.iter() {
3088 23863 : let img_range = start..partition.ranges.last().unwrap().end;
3089 23863 : start = img_range.end;
3090 23863 : if force || self.time_for_new_image_layer(partition, lsn).await? {
3091 3406 : let mut image_layer_writer = ImageLayerWriter::new(
3092 3406 : self.conf,
3093 3406 : self.timeline_id,
3094 3406 : self.tenant_id,
3095 3406 : &img_range,
3096 3406 : lsn,
3097 3406 : )
3098 UBC 0 : .await?;
3099 :
3100 CBC 3406 : fail_point!("image-layer-writer-fail-before-finish", |_| {
3101 UBC 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
3102 0 : "failpoint image-layer-writer-fail-before-finish"
3103 0 : )))
3104 CBC 3406 : });
3105 57844 : for range in &partition.ranges {
3106 54442 : let mut key = range.start;
3107 278564 : while key < range.end {
3108 740114 : let img = match self.get(key, lsn, ctx).await {
3109 224122 : Ok(img) => img,
3110 UBC 0 : Err(err) => {
3111 0 : // If we fail to reconstruct a VM or FSM page, we can zero the
3112 0 : // page without losing any actual user data. That seems better
3113 0 : // than failing repeatedly and getting stuck.
3114 0 : //
3115 0 : // We had a bug at one point, where we truncated the FSM and VM
3116 0 : // in the pageserver, but the Postgres didn't know about that
3117 0 : // and continued to generate incremental WAL records for pages
3118 0 : // that didn't exist in the pageserver. Trying to replay those
3119 0 : // WAL records failed to find the previous image of the page.
3120 0 : // This special case allows us to recover from that situation.
3121 0 : // See https://github.com/neondatabase/neon/issues/2601.
3122 0 : //
3123 0 : // Unfortunately we cannot do this for the main fork, or for
3124 0 : // any metadata keys, keys, as that would lead to actual data
3125 0 : // loss.
3126 0 : if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
3127 0 : warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
3128 0 : ZERO_PAGE.clone()
3129 : } else {
3130 0 : return Err(err);
3131 : }
3132 : }
3133 : };
3134 CBC 224122 : image_layer_writer.put_image(key, &img).await?;
3135 224122 : key = key.next();
3136 : }
3137 : }
3138 3402 : let image_layer = image_layer_writer.finish().await?;
3139 3402 : image_layers.push(image_layer);
3140 20457 : }
3141 : }
3142 : // All layers that the GC wanted us to create have now been created.
3143 : //
3144 : // It's possible that another GC cycle happened while we were compacting, and added
3145 : // something new to wanted_image_layers, and we now clear that before processing it.
3146 : // That's OK, because the next GC iteration will put it back in.
3147 1297 : *self.wanted_image_layers.lock().unwrap() = None;
3148 1297 :
3149 1297 : // Sync the new layer to disk before adding it to the layer map, to make sure
3150 1297 : // we don't garbage collect something based on the new layer, before it has
3151 1297 : // reached the disk.
3152 1297 : //
3153 1297 : // We must also fsync the timeline dir to ensure the directory entries for
3154 1297 : // new layer files are durable
3155 1297 : //
3156 1297 : // Compaction creates multiple image layers. It would be better to create them all
3157 1297 : // and fsync them all in parallel.
3158 1297 : let all_paths = image_layers
3159 1297 : .iter()
3160 3402 : .map(|layer| layer.path())
3161 1297 : .collect::<Vec<_>>();
3162 1297 :
3163 1297 : par_fsync::par_fsync_async(&all_paths)
3164 208 : .await
3165 1297 : .context("fsync of newly created layer files")?;
3166 :
3167 1297 : par_fsync::par_fsync_async(&[self.conf.timeline_path(&self.tenant_id, &self.timeline_id)])
3168 1299 : .await
3169 1296 : .context("fsync of timeline dir")?;
3170 :
3171 1296 : let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
3172 :
3173 1296 : let mut guard = self.layers.write().await;
3174 1296 : let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id);
3175 :
3176 4698 : for l in &image_layers {
3177 3402 : let path = l.filename();
3178 3402 : let metadata = timeline_path
3179 3402 : .join(path.file_name())
3180 3402 : .metadata()
3181 3402 : .with_context(|| format!("reading metadata of layer file {}", path.file_name()))?;
3182 :
3183 3402 : layer_paths_to_upload.insert(
3184 3402 : path,
3185 3402 : LayerFileMetadata::new(metadata.len(), self.generation),
3186 3402 : );
3187 3402 :
3188 3402 : // update metrics
3189 3402 : self.metrics.record_new_file_metrics(metadata.len());
3190 3402 : let l = Arc::new(l);
3191 3402 : l.access_stats().record_residence_event(
3192 3402 : LayerResidenceStatus::Resident,
3193 3402 : LayerResidenceEventReason::LayerCreate,
3194 3402 : );
3195 : }
3196 1296 : guard.track_new_image_layers(image_layers);
3197 1296 : drop_wlock(guard);
3198 1296 : timer.stop_and_record();
3199 1296 :
3200 1296 : Ok(layer_paths_to_upload)
3201 1296 : }
3202 : }
3203 :
3204 948 : #[derive(Default)]
3205 : struct CompactLevel0Phase1Result {
3206 : new_layers: Vec<Arc<DeltaLayer>>,
3207 : deltas_to_compact: Vec<Arc<PersistentLayerDesc>>,
3208 : }
3209 :
3210 : /// Top-level failure to compact.
3211 UBC 0 : #[derive(Debug)]
3212 : enum CompactionError {
3213 : /// L0 compaction requires layers to be downloaded.
3214 : ///
3215 : /// This should not happen repeatedly, but will be retried once by top-level
3216 : /// `Timeline::compact`.
3217 : DownloadRequired(Vec<Arc<RemoteLayer>>),
3218 : /// The timeline or pageserver is shutting down
3219 : ShuttingDown,
3220 : /// Compaction cannot be done right now; page reconstruction and so on.
3221 : Other(anyhow::Error),
3222 : }
3223 :
3224 : impl From<anyhow::Error> for CompactionError {
3225 CBC 1 : fn from(value: anyhow::Error) -> Self {
3226 1 : CompactionError::Other(value)
3227 1 : }
3228 : }
3229 :
3230 : #[serde_as]
3231 4242 : #[derive(serde::Serialize)]
3232 : struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
3233 :
3234 8799 : #[derive(Default)]
3235 : enum DurationRecorder {
3236 : #[default]
3237 : NotStarted,
3238 : Recorded(RecordedDuration, tokio::time::Instant),
3239 : }
3240 :
3241 : impl DurationRecorder {
3242 2784 : pub fn till_now(&self) -> DurationRecorder {
3243 2784 : match self {
3244 : DurationRecorder::NotStarted => {
3245 UBC 0 : panic!("must only call on recorded measurements")
3246 : }
3247 CBC 2784 : DurationRecorder::Recorded(_, ended) => {
3248 2784 : let now = tokio::time::Instant::now();
3249 2784 : DurationRecorder::Recorded(RecordedDuration(now - *ended), now)
3250 2784 : }
3251 2784 : }
3252 2784 : }
3253 2121 : pub fn into_recorded(self) -> Option<RecordedDuration> {
3254 2121 : match self {
3255 UBC 0 : DurationRecorder::NotStarted => None,
3256 CBC 2121 : DurationRecorder::Recorded(recorded, _) => Some(recorded),
3257 : }
3258 2121 : }
3259 : }
3260 :
3261 1257 : #[derive(Default)]
3262 : struct CompactLevel0Phase1StatsBuilder {
3263 : version: Option<u64>,
3264 : tenant_id: Option<TenantId>,
3265 : timeline_id: Option<TimelineId>,
3266 : read_lock_acquisition_micros: DurationRecorder,
3267 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
3268 : read_lock_held_key_sort_micros: DurationRecorder,
3269 : read_lock_held_prerequisites_micros: DurationRecorder,
3270 : read_lock_held_compute_holes_micros: DurationRecorder,
3271 : read_lock_drop_micros: DurationRecorder,
3272 : write_layer_files_micros: DurationRecorder,
3273 : level0_deltas_count: Option<usize>,
3274 : new_deltas_count: Option<usize>,
3275 : new_deltas_size: Option<u64>,
3276 : }
3277 :
3278 : #[serde_as]
3279 303 : #[derive(serde::Serialize)]
3280 : struct CompactLevel0Phase1Stats {
3281 : version: u64,
3282 : #[serde_as(as = "serde_with::DisplayFromStr")]
3283 : tenant_id: TenantId,
3284 : #[serde_as(as = "serde_with::DisplayFromStr")]
3285 : timeline_id: TimelineId,
3286 : read_lock_acquisition_micros: RecordedDuration,
3287 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
3288 : read_lock_held_key_sort_micros: RecordedDuration,
3289 : read_lock_held_prerequisites_micros: RecordedDuration,
3290 : read_lock_held_compute_holes_micros: RecordedDuration,
3291 : read_lock_drop_micros: RecordedDuration,
3292 : write_layer_files_micros: RecordedDuration,
3293 : level0_deltas_count: usize,
3294 : new_deltas_count: usize,
3295 : new_deltas_size: u64,
3296 : }
3297 :
3298 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
3299 : type Error = anyhow::Error;
3300 :
3301 303 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
3302 303 : Ok(Self {
3303 303 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
3304 303 : tenant_id: value
3305 303 : .tenant_id
3306 303 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
3307 303 : timeline_id: value
3308 303 : .timeline_id
3309 303 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
3310 303 : read_lock_acquisition_micros: value
3311 303 : .read_lock_acquisition_micros
3312 303 : .into_recorded()
3313 303 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
3314 303 : read_lock_held_spawn_blocking_startup_micros: value
3315 303 : .read_lock_held_spawn_blocking_startup_micros
3316 303 : .into_recorded()
3317 303 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
3318 303 : read_lock_held_key_sort_micros: value
3319 303 : .read_lock_held_key_sort_micros
3320 303 : .into_recorded()
3321 303 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
3322 303 : read_lock_held_prerequisites_micros: value
3323 303 : .read_lock_held_prerequisites_micros
3324 303 : .into_recorded()
3325 303 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
3326 303 : read_lock_held_compute_holes_micros: value
3327 303 : .read_lock_held_compute_holes_micros
3328 303 : .into_recorded()
3329 303 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
3330 303 : read_lock_drop_micros: value
3331 303 : .read_lock_drop_micros
3332 303 : .into_recorded()
3333 303 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
3334 303 : write_layer_files_micros: value
3335 303 : .write_layer_files_micros
3336 303 : .into_recorded()
3337 303 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
3338 303 : level0_deltas_count: value
3339 303 : .level0_deltas_count
3340 303 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
3341 303 : new_deltas_count: value
3342 303 : .new_deltas_count
3343 303 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
3344 303 : new_deltas_size: value
3345 303 : .new_deltas_size
3346 303 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
3347 : })
3348 303 : }
3349 : }
3350 :
3351 : impl Timeline {
3352 : /// Level0 files first phase of compaction, explained in the [`compact_inner`] comment.
3353 : ///
3354 : /// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
3355 : /// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
3356 : /// start of level0 files compaction, the on-demand download should be revisited as well.
3357 : ///
3358 : /// [`compact_inner`]: Self::compact_inner
3359 1257 : async fn compact_level0_phase1(
3360 1257 : self: &Arc<Self>,
3361 1257 : _layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
3362 1257 : guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
3363 1257 : mut stats: CompactLevel0Phase1StatsBuilder,
3364 1257 : target_file_size: u64,
3365 1257 : ctx: &RequestContext,
3366 1257 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
3367 1257 : stats.read_lock_held_spawn_blocking_startup_micros =
3368 1257 : stats.read_lock_acquisition_micros.till_now(); // set by caller
3369 1257 : let layers = guard.layer_map();
3370 1257 : let level0_deltas = layers.get_level0_deltas()?;
3371 1257 : let mut level0_deltas = level0_deltas
3372 1257 : .into_iter()
3373 6666 : .map(|x| guard.get_from_desc(&x))
3374 1257 : .collect_vec();
3375 1257 : stats.level0_deltas_count = Some(level0_deltas.len());
3376 1257 : // Only compact if enough layers have accumulated.
3377 1257 : let threshold = self.get_compaction_threshold();
3378 1257 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
3379 UBC 0 : debug!(
3380 0 : level0_deltas = level0_deltas.len(),
3381 0 : threshold, "too few deltas to compact"
3382 0 : );
3383 CBC 948 : return Ok(CompactLevel0Phase1Result::default());
3384 309 : }
3385 309 :
3386 309 : // This failpoint is used together with `test_duplicate_layers` integration test.
3387 309 : // It returns the compaction result exactly the same layers as input to compaction.
3388 309 : // We want to ensure that this will not cause any problem when updating the layer map
3389 309 : // after the compaction is finished.
3390 309 : //
3391 309 : // Currently, there are two rare edge cases that will cause duplicated layers being
3392 309 : // inserted.
3393 309 : // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
3394 309 : // is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
3395 309 : // map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
3396 309 : // point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
3397 309 : // and this causes an overwrite. This is acceptable because the content is the same, and we should do a
3398 309 : // layer replace instead of the normal remove / upload process.
3399 309 : // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
3400 309 : // size length. Compaction will likely create the same set of n files afterwards.
3401 309 : //
3402 309 : // This failpoint is a superset of both of the cases.
3403 309 : fail_point!("compact-level0-phase1-return-same", |_| {
3404 2 : println!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
3405 2 : Ok(CompactLevel0Phase1Result {
3406 2 : new_layers: level0_deltas
3407 2 : .iter()
3408 28 : .map(|x| x.clone().downcast_delta_layer().unwrap())
3409 2 : .collect(),
3410 2 : deltas_to_compact: level0_deltas
3411 2 : .iter()
3412 28 : .map(|x| x.layer_desc().clone().into())
3413 2 : .collect(),
3414 2 : })
3415 309 : });
3416 :
3417 : // Gather the files to compact in this iteration.
3418 : //
3419 : // Start with the oldest Level 0 delta file, and collect any other
3420 : // level 0 files that form a contiguous sequence, such that the end
3421 : // LSN of previous file matches the start LSN of the next file.
3422 : //
3423 : // Note that if the files don't form such a sequence, we might
3424 : // "compact" just a single file. That's a bit pointless, but it allows
3425 : // us to get rid of the level 0 file, and compact the other files on
3426 : // the next iteration. This could probably made smarter, but such
3427 : // "gaps" in the sequence of level 0 files should only happen in case
3428 : // of a crash, partial download from cloud storage, or something like
3429 : // that, so it's not a big deal in practice.
3430 9422 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
3431 307 : let mut level0_deltas_iter = level0_deltas.iter();
3432 307 :
3433 307 : let first_level0_delta = level0_deltas_iter.next().unwrap();
3434 307 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
3435 307 : let mut deltas_to_compact = vec![Arc::clone(first_level0_delta)];
3436 4336 : for l in level0_deltas_iter {
3437 4029 : let lsn_range = &l.layer_desc().lsn_range;
3438 4029 :
3439 4029 : if lsn_range.start != prev_lsn_end {
3440 UBC 0 : break;
3441 CBC 4029 : }
3442 4029 : deltas_to_compact.push(Arc::clone(l));
3443 4029 : prev_lsn_end = lsn_range.end;
3444 : }
3445 307 : let lsn_range = Range {
3446 307 : start: deltas_to_compact
3447 307 : .first()
3448 307 : .unwrap()
3449 307 : .layer_desc()
3450 307 : .lsn_range
3451 307 : .start,
3452 307 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
3453 307 : };
3454 307 :
3455 307 : let remotes = deltas_to_compact
3456 307 : .iter()
3457 4336 : .filter(|l| l.is_remote_layer())
3458 307 : .inspect(|l| info!("compact requires download of {l}"))
3459 307 : .map(|l| {
3460 3 : l.clone()
3461 3 : .downcast_remote_layer()
3462 3 : .expect("just checked it is remote layer")
3463 307 : })
3464 307 : .collect::<Vec<_>>();
3465 307 :
3466 307 : if !remotes.is_empty() {
3467 : // caller is holding the lock to layer_removal_cs, and we don't want to download while
3468 : // holding that; in future download_remote_layer might take it as well. this is
3469 : // regardless of earlier image creation downloading on-demand, while holding the lock.
3470 1 : return Err(CompactionError::DownloadRequired(remotes));
3471 306 : }
3472 :
3473 306 : info!(
3474 306 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
3475 306 : lsn_range.start,
3476 306 : lsn_range.end,
3477 306 : deltas_to_compact.len(),
3478 306 : level0_deltas.len()
3479 306 : );
3480 :
3481 4333 : for l in deltas_to_compact.iter() {
3482 4333 : info!("compact includes {l}");
3483 : }
3484 :
3485 : // We don't need the original list of layers anymore. Drop it so that
3486 : // we don't accidentally use it later in the function.
3487 306 : drop(level0_deltas);
3488 306 :
3489 306 : stats.read_lock_held_prerequisites_micros = stats
3490 306 : .read_lock_held_spawn_blocking_startup_micros
3491 306 : .till_now();
3492 306 :
3493 306 : // Determine N largest holes where N is number of compacted layers.
3494 306 : let max_holes = deltas_to_compact.len();
3495 306 : let last_record_lsn = self.get_last_record_lsn();
3496 306 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
3497 306 : let min_hole_coverage_size = 3; // TODO: something more flexible?
3498 306 :
3499 306 : // min-heap (reserve space for one more element added before eviction)
3500 306 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
3501 306 : let mut prev: Option<Key> = None;
3502 306 :
3503 306 : let mut all_keys = Vec::new();
3504 306 :
3505 306 : let downcast_deltas: Vec<_> = deltas_to_compact
3506 306 : .iter()
3507 4333 : .map(|l| l.clone().downcast_delta_layer().expect("delta layer"))
3508 306 : .collect();
3509 4333 : for dl in downcast_deltas.iter() {
3510 : // TODO: replace this with an await once we fully go async
3511 4333 : all_keys.extend(DeltaLayer::load_keys(dl, ctx).await?);
3512 : }
3513 :
3514 : // The current stdlib sorting implementation is designed in a way where it is
3515 : // particularly fast where the slice is made up of sorted sub-ranges.
3516 303973472 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
3517 306 :
3518 306 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
3519 :
3520 30235442 : for DeltaEntry { key: next_key, .. } in all_keys.iter() {
3521 30235442 : let next_key = *next_key;
3522 30235442 : if let Some(prev_key) = prev {
3523 : // just first fast filter
3524 30235136 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
3525 207342 : let key_range = prev_key..next_key;
3526 : // Measuring hole by just subtraction of i128 representation of key range boundaries
3527 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
3528 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
3529 : // That is why it is better to measure size of hole as number of covering image layers.
3530 207342 : let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
3531 207342 : if coverage_size >= min_hole_coverage_size {
3532 68 : heap.push(Hole {
3533 68 : key_range,
3534 68 : coverage_size,
3535 68 : });
3536 68 : if heap.len() > max_holes {
3537 25 : heap.pop(); // remove smallest hole
3538 43 : }
3539 207274 : }
3540 30027794 : }
3541 306 : }
3542 30235442 : prev = Some(next_key.next());
3543 : }
3544 306 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
3545 306 : drop_rlock(guard);
3546 306 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
3547 306 : let mut holes = heap.into_vec();
3548 306 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
3549 306 : let mut next_hole = 0; // index of next hole in holes vector
3550 306 :
3551 306 : // This iterator walks through all key-value pairs from all the layers
3552 306 : // we're compacting, in key, LSN order.
3553 306 : let all_values_iter = all_keys.iter();
3554 306 :
3555 306 : // This iterator walks through all keys and is needed to calculate size used by each key
3556 306 : let mut all_keys_iter = all_keys
3557 306 : .iter()
3558 29620953 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
3559 29620647 : .coalesce(|mut prev, cur| {
3560 29620647 : // Coalesce keys that belong to the same key pair.
3561 29620647 : // This ensures that compaction doesn't put them
3562 29620647 : // into different layer files.
3563 29620647 : // Still limit this by the target file size,
3564 29620647 : // so that we keep the size of the files in
3565 29620647 : // check.
3566 29620647 : if prev.0 == cur.0 && prev.2 < target_file_size {
3567 27749940 : prev.2 += cur.2;
3568 27749940 : Ok(prev)
3569 : } else {
3570 1870707 : Err((prev, cur))
3571 : }
3572 29620647 : });
3573 306 :
3574 306 : // Merge the contents of all the input delta layers into a new set
3575 306 : // of delta layers, based on the current partitioning.
3576 306 : //
3577 306 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
3578 306 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
3579 306 : // would be too large. In that case, we also split on the LSN dimension.
3580 306 : //
3581 306 : // LSN
3582 306 : // ^
3583 306 : // |
3584 306 : // | +-----------+ +--+--+--+--+
3585 306 : // | | | | | | | |
3586 306 : // | +-----------+ | | | | |
3587 306 : // | | | | | | | |
3588 306 : // | +-----------+ ==> | | | | |
3589 306 : // | | | | | | | |
3590 306 : // | +-----------+ | | | | |
3591 306 : // | | | | | | | |
3592 306 : // | +-----------+ +--+--+--+--+
3593 306 : // |
3594 306 : // +--------------> key
3595 306 : //
3596 306 : //
3597 306 : // If one key (X) has a lot of page versions:
3598 306 : //
3599 306 : // LSN
3600 306 : // ^
3601 306 : // | (X)
3602 306 : // | +-----------+ +--+--+--+--+
3603 306 : // | | | | | | | |
3604 306 : // | +-----------+ | | +--+ |
3605 306 : // | | | | | | | |
3606 306 : // | +-----------+ ==> | | | | |
3607 306 : // | | | | | +--+ |
3608 306 : // | +-----------+ | | | | |
3609 306 : // | | | | | | | |
3610 306 : // | +-----------+ +--+--+--+--+
3611 306 : // |
3612 306 : // +--------------> key
3613 306 : // TODO: this actually divides the layers into fixed-size chunks, not
3614 306 : // based on the partitioning.
3615 306 : //
3616 306 : // TODO: we should also opportunistically materialize and
3617 306 : // garbage collect what we can.
3618 306 : let mut new_layers = Vec::new();
3619 306 : let mut prev_key: Option<Key> = None;
3620 306 : let mut writer: Option<DeltaLayerWriter> = None;
3621 306 : let mut key_values_total_size = 0u64;
3622 306 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
3623 306 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
3624 :
3625 : for &DeltaEntry {
3626 29577812 : key, lsn, ref val, ..
3627 29578115 : } in all_values_iter
3628 : {
3629 29577812 : let value = val.load(ctx).await?;
3630 29577809 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
3631 29577809 : // We need to check key boundaries once we reach next key or end of layer with the same key
3632 29577809 : if !same_key || lsn == dup_end_lsn {
3633 1871007 : let mut next_key_size = 0u64;
3634 1871007 : let is_dup_layer = dup_end_lsn.is_valid();
3635 1871007 : dup_start_lsn = Lsn::INVALID;
3636 1871007 : if !same_key {
3637 1870949 : dup_end_lsn = Lsn::INVALID;
3638 1870949 : }
3639 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
3640 1871010 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
3641 1871010 : next_key_size = next_size;
3642 1871010 : if key != next_key {
3643 1870646 : if dup_end_lsn.is_valid() {
3644 18 : // We are writting segment with duplicates:
3645 18 : // place all remaining values of this key in separate segment
3646 18 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
3647 18 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
3648 1870628 : }
3649 1870646 : break;
3650 364 : }
3651 364 : key_values_total_size += next_size;
3652 364 : // Check if it is time to split segment: if total keys size is larger than target file size.
3653 364 : // We need to avoid generation of empty segments if next_size > target_file_size.
3654 364 : if key_values_total_size > target_file_size && lsn != next_lsn {
3655 : // Split key between multiple layers: such layer can contain only single key
3656 58 : dup_start_lsn = if dup_end_lsn.is_valid() {
3657 40 : dup_end_lsn // new segment with duplicates starts where old one stops
3658 : } else {
3659 18 : lsn // start with the first LSN for this key
3660 : };
3661 58 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
3662 58 : break;
3663 306 : }
3664 : }
3665 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
3666 1871007 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
3667 UBC 0 : dup_start_lsn = dup_end_lsn;
3668 0 : dup_end_lsn = lsn_range.end;
3669 CBC 1871007 : }
3670 1871007 : if writer.is_some() {
3671 1870701 : let written_size = writer.as_mut().unwrap().size();
3672 1870701 : let contains_hole =
3673 1870701 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
3674 : // check if key cause layer overflow or contains hole...
3675 1870701 : if is_dup_layer
3676 1870625 : || dup_end_lsn.is_valid()
3677 1870611 : || written_size + key_values_total_size > target_file_size
3678 1860632 : || contains_hole
3679 : {
3680 : // ... if so, flush previous layer and prepare to write new one
3681 10110 : new_layers.push(Arc::new(
3682 10110 : writer
3683 10110 : .take()
3684 10110 : .unwrap()
3685 10110 : .finish(prev_key.unwrap().next())
3686 UBC 0 : .await?,
3687 : ));
3688 CBC 10110 : writer = None;
3689 10110 :
3690 10110 : if contains_hole {
3691 43 : // skip hole
3692 43 : next_hole += 1;
3693 10067 : }
3694 1860591 : }
3695 306 : }
3696 : // Remember size of key value because at next iteration we will access next item
3697 1871007 : key_values_total_size = next_key_size;
3698 27706802 : }
3699 29577809 : if writer.is_none() {
3700 10416 : // Create writer if not initiaized yet
3701 10416 : writer = Some(
3702 : DeltaLayerWriter::new(
3703 10416 : self.conf,
3704 10416 : self.timeline_id,
3705 10416 : self.tenant_id,
3706 10416 : key,
3707 10416 : if dup_end_lsn.is_valid() {
3708 : // this is a layer containing slice of values of the same key
3709 UBC 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
3710 CBC 76 : dup_start_lsn..dup_end_lsn
3711 : } else {
3712 UBC 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
3713 CBC 10340 : lsn_range.clone()
3714 : },
3715 : )
3716 UBC 0 : .await?,
3717 : );
3718 CBC 29567393 : }
3719 :
3720 29577809 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
3721 UBC 0 : Err(CompactionError::Other(anyhow::anyhow!(
3722 0 : "failpoint delta-layer-writer-fail-before-finish"
3723 0 : )))
3724 CBC 29577809 : });
3725 :
3726 29577809 : writer.as_mut().unwrap().put_value(key, lsn, value).await?;
3727 :
3728 29577809 : if !new_layers.is_empty() {
3729 26394245 : fail_point!("after-timeline-compacted-first-L1");
3730 26394245 : }
3731 :
3732 29577809 : prev_key = Some(key);
3733 : }
3734 303 : if let Some(writer) = writer {
3735 303 : new_layers.push(Arc::new(writer.finish(prev_key.unwrap().next()).await?));
3736 UBC 0 : }
3737 :
3738 : // Sync layers
3739 CBC 303 : if !new_layers.is_empty() {
3740 : // Print a warning if the created layer is larger than double the target size
3741 : // Add two pages for potential overhead. This should in theory be already
3742 : // accounted for in the target calculation, but for very small targets,
3743 : // we still might easily hit the limit otherwise.
3744 303 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
3745 10397 : for layer in new_layers.iter() {
3746 10397 : if layer.layer_desc().file_size > warn_limit {
3747 UBC 0 : warn!(
3748 0 : %layer,
3749 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
3750 0 : );
3751 CBC 10397 : }
3752 : }
3753 10397 : let mut layer_paths: Vec<Utf8PathBuf> = new_layers.iter().map(|l| l.path()).collect();
3754 303 :
3755 303 : // Fsync all the layer files and directory using multiple threads to
3756 303 : // minimize latency.
3757 303 : par_fsync::par_fsync(&layer_paths).context("fsync all new layers")?;
3758 :
3759 303 : par_fsync::par_fsync(&[self.conf.timeline_path(&self.tenant_id, &self.timeline_id)])
3760 303 : .context("fsync of timeline dir")?;
3761 :
3762 303 : layer_paths.pop().unwrap();
3763 UBC 0 : }
3764 :
3765 CBC 303 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
3766 303 : stats.new_deltas_count = Some(new_layers.len());
3767 10397 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
3768 303 :
3769 303 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
3770 303 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
3771 : {
3772 303 : Ok(stats_json) => {
3773 303 : info!(
3774 303 : stats_json = stats_json.as_str(),
3775 303 : "compact_level0_phase1 stats available"
3776 303 : )
3777 : }
3778 UBC 0 : Err(e) => {
3779 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
3780 : }
3781 : }
3782 :
3783 CBC 303 : Ok(CompactLevel0Phase1Result {
3784 303 : new_layers,
3785 303 : deltas_to_compact: deltas_to_compact
3786 303 : .into_iter()
3787 4289 : .map(|x| Arc::new(x.layer_desc().clone()))
3788 303 : .collect(),
3789 303 : })
3790 1254 : }
3791 :
3792 : ///
3793 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
3794 : /// as Level 1 files.
3795 : ///
3796 1257 : async fn compact_level0(
3797 1257 : self: &Arc<Self>,
3798 1257 : layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
3799 1257 : target_file_size: u64,
3800 1257 : ctx: &RequestContext,
3801 1257 : ) -> Result<(), CompactionError> {
3802 : let CompactLevel0Phase1Result {
3803 1253 : new_layers,
3804 1253 : deltas_to_compact,
3805 : } = {
3806 1257 : let phase1_span = info_span!("compact_level0_phase1");
3807 1257 : let ctx = ctx.attached_child();
3808 1257 : let mut stats = CompactLevel0Phase1StatsBuilder {
3809 1257 : version: Some(2),
3810 1257 : tenant_id: Some(self.tenant_id),
3811 1257 : timeline_id: Some(self.timeline_id),
3812 1257 : ..Default::default()
3813 1257 : };
3814 1257 :
3815 1257 : let begin = tokio::time::Instant::now();
3816 1257 : let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
3817 1257 : let now = tokio::time::Instant::now();
3818 1257 : stats.read_lock_acquisition_micros =
3819 1257 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
3820 1257 : let layer_removal_cs = layer_removal_cs.clone();
3821 1257 : self.compact_level0_phase1(
3822 1257 : layer_removal_cs,
3823 1257 : phase1_layers_locked,
3824 1257 : stats,
3825 1257 : target_file_size,
3826 1257 : &ctx,
3827 1257 : )
3828 1257 : .instrument(phase1_span)
3829 477355 : .await?
3830 : };
3831 :
3832 1253 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
3833 : // nothing to do
3834 948 : return Ok(());
3835 305 : }
3836 :
3837 : // Before deleting any layers, we need to wait for their upload ops to finish.
3838 : // See remote_timeline_client module level comment on consistency.
3839 : // Do it here because we don't want to hold self.layers.write() while waiting.
3840 305 : if let Some(remote_client) = &self.remote_client {
3841 UBC 0 : debug!("waiting for upload ops to complete");
3842 CBC 305 : remote_client
3843 305 : .wait_completion()
3844 66 : .await
3845 305 : .context("wait for layer upload ops to complete")?;
3846 UBC 0 : }
3847 :
3848 CBC 304 : let mut guard = self.layers.write().await;
3849 304 : let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
3850 304 :
3851 304 : // In some rare cases, we may generate a file with exactly the same key range / LSN as before the compaction.
3852 304 : // We should move to numbering the layer files instead of naming them using key range / LSN some day. But for
3853 304 : // now, we just skip the file to avoid unintentional modification to files on the disk and in the layer map.
3854 304 : let mut duplicated_layers = HashSet::new();
3855 304 :
3856 304 : let mut insert_layers = Vec::new();
3857 304 : let mut remove_layers = Vec::new();
3858 :
3859 10194 : for l in new_layers {
3860 9890 : let new_delta_path = l.path();
3861 :
3862 9890 : let metadata = new_delta_path.metadata().with_context(|| {
3863 UBC 0 : format!("read file metadata for new created layer {new_delta_path}")
3864 CBC 9890 : })?;
3865 :
3866 9890 : if let Some(remote_client) = &self.remote_client {
3867 9890 : remote_client.schedule_layer_file_upload(
3868 9890 : &l.filename(),
3869 9890 : &LayerFileMetadata::new(metadata.len(), self.generation),
3870 9890 : )?;
3871 UBC 0 : }
3872 :
3873 : // update metrics, including the timeline's physical size
3874 CBC 9890 : self.metrics.record_new_file_metrics(metadata.len());
3875 9890 :
3876 9890 : new_layer_paths.insert(
3877 9890 : new_delta_path,
3878 9890 : LayerFileMetadata::new(metadata.len(), self.generation),
3879 9890 : );
3880 9890 : l.access_stats().record_residence_event(
3881 9890 : LayerResidenceStatus::Resident,
3882 9890 : LayerResidenceEventReason::LayerCreate,
3883 9890 : );
3884 9890 : let l = l as Arc<dyn PersistentLayer>;
3885 9890 : if guard.contains(&l) {
3886 28 : tracing::error!(layer=%l, "duplicated L1 layer");
3887 28 : duplicated_layers.insert(l.layer_desc().key());
3888 : } else {
3889 9862 : if LayerMap::is_l0(l.layer_desc()) {
3890 UBC 0 : return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
3891 CBC 9862 : }
3892 9862 : insert_layers.push(l);
3893 : }
3894 : }
3895 :
3896 : // Now that we have reshuffled the data to set of new delta layers, we can
3897 : // delete the old ones
3898 304 : let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
3899 4612 : for ldesc in deltas_to_compact {
3900 4308 : if duplicated_layers.contains(&ldesc.key()) {
3901 : // skip duplicated layers, they will not be removed; we have already overwritten them
3902 : // with new layers in the compaction phase 1.
3903 28 : continue;
3904 4280 : }
3905 4280 : layer_names_to_delete.push(ldesc.filename());
3906 4280 : remove_layers.push(guard.get_from_desc(&ldesc));
3907 : }
3908 :
3909 304 : guard.finish_compact_l0(
3910 304 : layer_removal_cs,
3911 304 : remove_layers,
3912 304 : insert_layers,
3913 304 : &self.metrics,
3914 304 : )?;
3915 :
3916 304 : drop_wlock(guard);
3917 :
3918 : // Also schedule the deletions in remote storage
3919 304 : if let Some(remote_client) = &self.remote_client {
3920 304 : remote_client.schedule_layer_file_deletion(layer_names_to_delete)?;
3921 UBC 0 : }
3922 :
3923 CBC 304 : Ok(())
3924 1254 : }
3925 :
3926 : /// Update information about which layer files need to be retained on
3927 : /// garbage collection. This is separate from actually performing the GC,
3928 : /// and is updated more frequently, so that compaction can remove obsolete
3929 : /// page versions more aggressively.
3930 : ///
3931 : /// TODO: that's wishful thinking, compaction doesn't actually do that
3932 : /// currently.
3933 : ///
3934 : /// The caller specifies how much history is needed with the 3 arguments:
3935 : ///
3936 : /// retain_lsns: keep a version of each page at these LSNs
3937 : /// cutoff_horizon: also keep everything newer than this LSN
3938 : /// pitr: the time duration required to keep data for PITR
3939 : ///
3940 : /// The 'retain_lsns' list is currently used to prevent removing files that
3941 : /// are needed by child timelines. In the future, the user might be able to
3942 : /// name additional points in time to retain. The caller is responsible for
3943 : /// collecting that information.
3944 : ///
3945 : /// The 'cutoff_horizon' point is used to retain recent versions that might still be
3946 : /// needed by read-only nodes. (As of this writing, the caller just passes
3947 : /// the latest LSN subtracted by a constant, and doesn't do anything smart
3948 : /// to figure out what read-only nodes might actually need.)
3949 : ///
3950 : /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
3951 : /// whether a record is needed for PITR.
3952 : ///
3953 : /// NOTE: This function holds a short-lived lock to protect the 'gc_info'
3954 : /// field, so that the three values passed as argument are stored
3955 : /// atomically. But the caller is responsible for ensuring that no new
3956 : /// branches are created that would need to be included in 'retain_lsns',
3957 : /// for example. The caller should hold `Tenant::gc_cs` lock to ensure
3958 : /// that.
3959 : ///
3960 2256 : #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
3961 : pub(super) async fn update_gc_info(
3962 : &self,
3963 : retain_lsns: Vec<Lsn>,
3964 : cutoff_horizon: Lsn,
3965 : pitr: Duration,
3966 : ctx: &RequestContext,
3967 : ) -> anyhow::Result<()> {
3968 : // First, calculate pitr_cutoff_timestamp and then convert it to LSN.
3969 : //
3970 : // Some unit tests depend on garbage-collection working even when
3971 : // CLOG data is missing, so that find_lsn_for_timestamp() doesn't
3972 : // work, so avoid calling it altogether if time-based retention is not
3973 : // configured. It would be pointless anyway.
3974 : let pitr_cutoff = if pitr != Duration::ZERO {
3975 : let now = SystemTime::now();
3976 : if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
3977 : let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
3978 :
3979 : match self.find_lsn_for_timestamp(pitr_timestamp, ctx).await? {
3980 : LsnForTimestamp::Present(lsn) => lsn,
3981 : LsnForTimestamp::Future(lsn) => {
3982 : // The timestamp is in the future. That sounds impossible,
3983 : // but what it really means is that there hasn't been
3984 : // any commits since the cutoff timestamp.
3985 UBC 0 : debug!("future({})", lsn);
3986 : cutoff_horizon
3987 : }
3988 : LsnForTimestamp::Past(lsn) => {
3989 0 : debug!("past({})", lsn);
3990 : // conservative, safe default is to remove nothing, when we
3991 : // have no commit timestamp data available
3992 : *self.get_latest_gc_cutoff_lsn()
3993 : }
3994 : LsnForTimestamp::NoData(lsn) => {
3995 0 : debug!("nodata({})", lsn);
3996 : // conservative, safe default is to remove nothing, when we
3997 : // have no commit timestamp data available
3998 : *self.get_latest_gc_cutoff_lsn()
3999 : }
4000 : }
4001 : } else {
4002 : // If we don't have enough data to convert to LSN,
4003 : // play safe and don't remove any layers.
4004 : *self.get_latest_gc_cutoff_lsn()
4005 : }
4006 : } else {
4007 : // No time-based retention was configured. Set time-based cutoff to
4008 : // same as LSN based.
4009 : cutoff_horizon
4010 : };
4011 :
4012 : // Grab the lock and update the values
4013 : *self.gc_info.write().unwrap() = GcInfo {
4014 : retain_lsns,
4015 : horizon_cutoff: cutoff_horizon,
4016 : pitr_cutoff,
4017 : };
4018 :
4019 : Ok(())
4020 : }
4021 :
4022 : ///
4023 : /// Garbage collect layer files on a timeline that are no longer needed.
4024 : ///
4025 : /// Currently, we don't make any attempt at removing unneeded page versions
4026 : /// within a layer file. We can only remove the whole file if it's fully
4027 : /// obsolete.
4028 : ///
4029 CBC 695 : pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
4030 695 : let timer = self.metrics.garbage_collect_histo.start_timer();
4031 695 :
4032 695 : fail_point!("before-timeline-gc");
4033 :
4034 695 : let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
4035 : // Is the timeline being deleted?
4036 695 : if self.is_stopping() {
4037 UBC 0 : anyhow::bail!("timeline is Stopping");
4038 CBC 695 : }
4039 695 :
4040 695 : let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
4041 695 : let gc_info = self.gc_info.read().unwrap();
4042 695 :
4043 695 : let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
4044 695 : let pitr_cutoff = gc_info.pitr_cutoff;
4045 695 : let retain_lsns = gc_info.retain_lsns.clone();
4046 695 : (horizon_cutoff, pitr_cutoff, retain_lsns)
4047 695 : };
4048 695 :
4049 695 : let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
4050 :
4051 695 : let res = self
4052 695 : .gc_timeline(
4053 695 : layer_removal_cs.clone(),
4054 695 : horizon_cutoff,
4055 695 : pitr_cutoff,
4056 695 : retain_lsns,
4057 695 : new_gc_cutoff,
4058 695 : )
4059 695 : .instrument(
4060 695 : info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
4061 : )
4062 219 : .await?;
4063 :
4064 : // only record successes
4065 690 : timer.stop_and_record();
4066 690 :
4067 690 : Ok(res)
4068 690 : }
4069 :
4070 695 : async fn gc_timeline(
4071 695 : &self,
4072 695 : layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
4073 695 : horizon_cutoff: Lsn,
4074 695 : pitr_cutoff: Lsn,
4075 695 : retain_lsns: Vec<Lsn>,
4076 695 : new_gc_cutoff: Lsn,
4077 695 : ) -> anyhow::Result<GcResult> {
4078 695 : let now = SystemTime::now();
4079 695 : let mut result: GcResult = GcResult::default();
4080 695 :
4081 695 : // Nothing to GC. Return early.
4082 695 : let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
4083 695 : if latest_gc_cutoff >= new_gc_cutoff {
4084 108 : info!(
4085 108 : "Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
4086 108 : );
4087 108 : return Ok(result);
4088 587 : }
4089 587 :
4090 587 : // We need to ensure that no one tries to read page versions or create
4091 587 : // branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
4092 587 : // for details. This will block until the old value is no longer in use.
4093 587 : //
4094 587 : // The GC cutoff should only ever move forwards.
4095 587 : {
4096 587 : let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
4097 587 : ensure!(
4098 587 : *write_guard <= new_gc_cutoff,
4099 UBC 0 : "Cannot move GC cutoff LSN backwards (was {}, new {})",
4100 0 : *write_guard,
4101 : new_gc_cutoff
4102 : );
4103 CBC 587 : write_guard.store_and_unlock(new_gc_cutoff).wait();
4104 : }
4105 :
4106 587 : info!("GC starting");
4107 :
4108 UBC 0 : debug!("retain_lsns: {:?}", retain_lsns);
4109 :
4110 : // Before deleting any layers, we need to wait for their upload ops to finish.
4111 : // See storage_sync module level comment on consistency.
4112 : // Do it here because we don't want to hold self.layers.write() while waiting.
4113 CBC 587 : if let Some(remote_client) = &self.remote_client {
4114 UBC 0 : debug!("waiting for upload ops to complete");
4115 CBC 587 : remote_client
4116 587 : .wait_completion()
4117 184 : .await
4118 587 : .context("wait for layer upload ops to complete")?;
4119 UBC 0 : }
4120 :
4121 CBC 587 : let mut layers_to_remove = Vec::new();
4122 587 : let mut wanted_image_layers = KeySpaceRandomAccum::default();
4123 :
4124 : // Scan all layers in the timeline (remote or on-disk).
4125 : //
4126 : // Garbage collect the layer if all conditions are satisfied:
4127 : // 1. it is older than cutoff LSN;
4128 : // 2. it is older than PITR interval;
4129 : // 3. it doesn't need to be retained for 'retain_lsns';
4130 : // 4. newer on-disk image layers cover the layer's whole key range
4131 : //
4132 : // TODO holding a write lock is too agressive and avoidable
4133 587 : let mut guard = self.layers.write().await;
4134 587 : let layers = guard.layer_map();
4135 16846 : 'outer: for l in layers.iter_historic_layers() {
4136 16846 : result.layers_total += 1;
4137 16846 :
4138 16846 : // 1. Is it newer than GC horizon cutoff point?
4139 16846 : if l.get_lsn_range().end > horizon_cutoff {
4140 UBC 0 : debug!(
4141 0 : "keeping {} because it's newer than horizon_cutoff {}",
4142 0 : l.filename(),
4143 0 : horizon_cutoff,
4144 0 : );
4145 CBC 1717 : result.layers_needed_by_cutoff += 1;
4146 1717 : continue 'outer;
4147 15129 : }
4148 15129 :
4149 15129 : // 2. It is newer than PiTR cutoff point?
4150 15129 : if l.get_lsn_range().end > pitr_cutoff {
4151 UBC 0 : debug!(
4152 0 : "keeping {} because it's newer than pitr_cutoff {}",
4153 0 : l.filename(),
4154 0 : pitr_cutoff,
4155 0 : );
4156 CBC 194 : result.layers_needed_by_pitr += 1;
4157 194 : continue 'outer;
4158 14935 : }
4159 :
4160 : // 3. Is it needed by a child branch?
4161 : // NOTE With that we would keep data that
4162 : // might be referenced by child branches forever.
4163 : // We can track this in child timeline GC and delete parent layers when
4164 : // they are no longer needed. This might be complicated with long inheritance chains.
4165 : //
4166 : // TODO Vec is not a great choice for `retain_lsns`
4167 15178 : for retain_lsn in &retain_lsns {
4168 : // start_lsn is inclusive
4169 620 : if &l.get_lsn_range().start <= retain_lsn {
4170 UBC 0 : debug!(
4171 0 : "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
4172 0 : l.filename(),
4173 0 : retain_lsn,
4174 0 : l.is_incremental(),
4175 0 : );
4176 CBC 377 : result.layers_needed_by_branches += 1;
4177 377 : continue 'outer;
4178 243 : }
4179 : }
4180 :
4181 : // 4. Is there a later on-disk layer for this relation?
4182 : //
4183 : // The end-LSN is exclusive, while disk_consistent_lsn is
4184 : // inclusive. For example, if disk_consistent_lsn is 100, it is
4185 : // OK for a delta layer to have end LSN 101, but if the end LSN
4186 : // is 102, then it might not have been fully flushed to disk
4187 : // before crash.
4188 : //
4189 : // For example, imagine that the following layers exist:
4190 : //
4191 : // 1000 - image (A)
4192 : // 1000-2000 - delta (B)
4193 : // 2000 - image (C)
4194 : // 2000-3000 - delta (D)
4195 : // 3000 - image (E)
4196 : //
4197 : // If GC horizon is at 2500, we can remove layers A and B, but
4198 : // we cannot remove C, even though it's older than 2500, because
4199 : // the delta layer 2000-3000 depends on it.
4200 14558 : if !layers
4201 14558 : .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))?
4202 : {
4203 UBC 0 : debug!("keeping {} because it is the latest layer", l.filename());
4204 : // Collect delta key ranges that need image layers to allow garbage
4205 : // collecting the layers.
4206 : // It is not so obvious whether we need to propagate information only about
4207 : // delta layers. Image layers can form "stairs" preventing old image from been deleted.
4208 : // But image layers are in any case less sparse than delta layers. Also we need some
4209 : // protection from replacing recent image layers with new one after each GC iteration.
4210 CBC 11289 : if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
4211 UBC 0 : wanted_image_layers.add_range(l.get_key_range());
4212 CBC 11289 : }
4213 11289 : result.layers_not_updated += 1;
4214 11289 : continue 'outer;
4215 3269 : }
4216 :
4217 : // We didn't find any reason to keep this file, so remove it.
4218 UBC 0 : debug!(
4219 0 : "garbage collecting {} is_dropped: xx is_incremental: {}",
4220 0 : l.filename(),
4221 0 : l.is_incremental(),
4222 0 : );
4223 CBC 3269 : layers_to_remove.push(Arc::clone(&l));
4224 : }
4225 587 : self.wanted_image_layers
4226 587 : .lock()
4227 587 : .unwrap()
4228 587 : .replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
4229 587 :
4230 587 : if !layers_to_remove.is_empty() {
4231 : // Persist the new GC cutoff value in the metadata file, before
4232 : // we actually remove anything.
4233 25 : self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())
4234 UBC 0 : .await?;
4235 :
4236 : // Actually delete the layers from disk and remove them from the map.
4237 : // (couldn't do this in the loop above, because you cannot modify a collection
4238 : // while iterating it. BTreeMap::retain() would be another option)
4239 CBC 25 : let mut layer_names_to_delete = Vec::with_capacity(layers_to_remove.len());
4240 25 : let gc_layers = layers_to_remove
4241 25 : .iter()
4242 3269 : .map(|x| guard.get_from_desc(x))
4243 25 : .collect();
4244 3294 : for doomed_layer in layers_to_remove {
4245 3269 : layer_names_to_delete.push(doomed_layer.filename());
4246 3269 : result.layers_removed += 1;
4247 3269 : }
4248 25 : let apply = guard.finish_gc_timeline(layer_removal_cs, gc_layers, &self.metrics)?;
4249 :
4250 25 : if result.layers_removed != 0 {
4251 25 : fail_point!("after-timeline-gc-removed-layers");
4252 25 : }
4253 :
4254 25 : if let Some(remote_client) = &self.remote_client {
4255 25 : remote_client.schedule_layer_file_deletion(layer_names_to_delete)?;
4256 UBC 0 : }
4257 :
4258 CBC 20 : apply.flush();
4259 562 : }
4260 :
4261 582 : info!(
4262 582 : "GC completed removing {} layers, cutoff {}",
4263 582 : result.layers_removed, new_gc_cutoff
4264 582 : );
4265 :
4266 582 : result.elapsed = now.elapsed()?;
4267 582 : Ok(result)
4268 690 : }
4269 :
4270 : ///
4271 : /// Reconstruct a value, using the given base image and WAL records in 'data'.
4272 : ///
4273 6372248 : async fn reconstruct_value(
4274 6372248 : &self,
4275 6372248 : key: Key,
4276 6372248 : request_lsn: Lsn,
4277 6372248 : mut data: ValueReconstructState,
4278 6372249 : ) -> Result<Bytes, PageReconstructError> {
4279 6372249 : // Perform WAL redo if needed
4280 6372249 : data.records.reverse();
4281 6372249 :
4282 6372249 : // If we have a page image, and no WAL, we're all set
4283 6372249 : if data.records.is_empty() {
4284 4116401 : if let Some((img_lsn, img)) = &data.img {
4285 UBC 0 : trace!(
4286 0 : "found page image for key {} at {}, no WAL redo required, req LSN {}",
4287 0 : key,
4288 0 : img_lsn,
4289 0 : request_lsn,
4290 0 : );
4291 CBC 4116401 : Ok(img.clone())
4292 : } else {
4293 UBC 0 : Err(PageReconstructError::from(anyhow!(
4294 0 : "base image for {key} at {request_lsn} not found"
4295 0 : )))
4296 : }
4297 : } else {
4298 : // We need to do WAL redo.
4299 : //
4300 : // If we don't have a base image, then the oldest WAL record better initialize
4301 : // the page
4302 CBC 2255848 : if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
4303 UBC 0 : Err(PageReconstructError::from(anyhow!(
4304 0 : "Base image for {} at {} not found, but got {} WAL records",
4305 0 : key,
4306 0 : request_lsn,
4307 0 : data.records.len()
4308 0 : )))
4309 : } else {
4310 CBC 2255848 : if data.img.is_some() {
4311 UBC 0 : trace!(
4312 0 : "found {} WAL records and a base image for {} at {}, performing WAL redo",
4313 0 : data.records.len(),
4314 0 : key,
4315 0 : request_lsn
4316 0 : );
4317 : } else {
4318 0 : trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
4319 : };
4320 :
4321 CBC 2255848 : let last_rec_lsn = data.records.last().unwrap().0;
4322 :
4323 2255848 : let img = match self
4324 2255848 : .walredo_mgr
4325 2255848 : .request_redo(key, request_lsn, data.img, data.records, self.pg_version)
4326 UBC 0 : .await
4327 CBC 2255846 : .context("Failed to reconstruct a page image:")
4328 : {
4329 2255846 : Ok(img) => img,
4330 UBC 0 : Err(e) => return Err(PageReconstructError::from(e)),
4331 : };
4332 :
4333 CBC 2255846 : if img.len() == page_cache::PAGE_SZ {
4334 2252422 : let cache = page_cache::get();
4335 2252422 : if let Err(e) = cache
4336 2252422 : .memorize_materialized_page(
4337 2252422 : self.tenant_id,
4338 2252422 : self.timeline_id,
4339 2252422 : key,
4340 2252422 : last_rec_lsn,
4341 2252422 : &img,
4342 2252422 : )
4343 58513 : .await
4344 2252422 : .context("Materialized page memoization failed")
4345 : {
4346 UBC 0 : return Err(PageReconstructError::from(e));
4347 CBC 2252422 : }
4348 3424 : }
4349 :
4350 2255846 : Ok(img)
4351 : }
4352 : }
4353 6372247 : }
4354 :
4355 : /// Download a layer file from remote storage and insert it into the layer map.
4356 : ///
4357 : /// It's safe to call this function for the same layer concurrently. In that case:
4358 : /// - If the layer has already been downloaded, `OK(...)` is returned.
4359 : /// - If the layer is currently being downloaded, we wait until that download succeeded / failed.
4360 : /// - If it succeeded, we return `Ok(...)`.
4361 : /// - If it failed, we or another concurrent caller will initiate a new download attempt.
4362 : ///
4363 : /// Download errors are classified and retried if appropriate by the underlying RemoteTimelineClient function.
4364 : /// It has an internal limit for the maximum number of retries and prints appropriate log messages.
4365 : /// If we exceed the limit, it returns an error, and this function passes it through.
4366 : /// The caller _could_ retry further by themselves by calling this function again, but _should not_ do it.
4367 : /// The reason is that they cannot distinguish permanent errors from temporary ones, whereas
4368 : /// the underlying RemoteTimelineClient can.
4369 : ///
4370 : /// There is no internal timeout or slowness detection.
4371 : /// If the caller has a deadline or needs a timeout, they can simply stop polling:
4372 : /// we're **cancellation-safe** because the download happens in a separate task_mgr task.
4373 : /// So, the current download attempt will run to completion even if we stop polling.
4374 4326 : #[instrument(skip_all, fields(layer=%remote_layer))]
4375 : pub async fn download_remote_layer(
4376 : &self,
4377 : remote_layer: Arc<RemoteLayer>,
4378 : ) -> anyhow::Result<()> {
4379 : span::debug_assert_current_span_has_tenant_and_timeline_id();
4380 :
4381 : use std::sync::atomic::Ordering::Relaxed;
4382 :
4383 : let permit = match Arc::clone(&remote_layer.ongoing_download)
4384 : .acquire_owned()
4385 : .await
4386 : {
4387 : Ok(permit) => permit,
4388 : Err(_closed) => {
4389 : if remote_layer.download_replacement_failure.load(Relaxed) {
4390 : // this path will be hit often, in case there are upper retries. however
4391 : // hitting this error will prevent a busy loop between get_reconstruct_data and
4392 : // download, so an error is prefered.
4393 : //
4394 : // TODO: we really should poison the timeline, but panicking is not yet
4395 : // supported. Related: https://github.com/neondatabase/neon/issues/3621
4396 : anyhow::bail!("an earlier download succeeded but LayerMap::replace failed")
4397 : } else {
4398 61 : info!("download of layer has already finished");
4399 : return Ok(());
4400 : }
4401 : }
4402 : };
4403 :
4404 : let (sender, receiver) = tokio::sync::oneshot::channel();
4405 : // Spawn a task so that download does not outlive timeline when we detach tenant / delete timeline.
4406 : let self_clone = self.myself.upgrade().expect("timeline is gone");
4407 : task_mgr::spawn(
4408 : &tokio::runtime::Handle::current(),
4409 : TaskKind::RemoteDownloadTask,
4410 : Some(self.tenant_id),
4411 : Some(self.timeline_id),
4412 : &format!("download layer {}", remote_layer),
4413 : false,
4414 1379 : async move {
4415 1379 : let remote_client = self_clone.remote_client.as_ref().unwrap();
4416 :
4417 : // Does retries + exponential back-off internally.
4418 : // When this fails, don't layer further retry attempts here.
4419 1379 : let result = remote_client
4420 1379 : .download_layer_file(&remote_layer.filename(), &remote_layer.layer_metadata)
4421 459970 : .await;
4422 :
4423 1376 : if let Ok(size) = &result {
4424 1370 : info!("layer file download finished");
4425 :
4426 : // XXX the temp file is still around in Err() case
4427 : // and consumes space until we clean up upon pageserver restart.
4428 1370 : self_clone.metrics.resident_physical_size_add(*size);
4429 :
4430 : // Download complete. Replace the RemoteLayer with the corresponding
4431 : // Delta- or ImageLayer in the layer map.
4432 1370 : let mut guard = self_clone.layers.write().await;
4433 1370 : let new_layer =
4434 1370 : remote_layer.create_downloaded_layer(&guard, self_clone.conf, *size);
4435 1370 : {
4436 1370 : let l: Arc<dyn PersistentLayer> = remote_layer.clone();
4437 1370 : let failure = match guard.replace_and_verify(l, new_layer) {
4438 1369 : Ok(()) => false,
4439 1 : Err(e) => {
4440 1 : // this is a precondition failure, the layer filename derived
4441 1 : // attributes didn't match up, which doesn't seem likely.
4442 1 : error!("replacing downloaded layer into layermap failed: {e:#?}");
4443 1 : true
4444 : }
4445 : };
4446 :
4447 1370 : if failure {
4448 1 : // mark the remote layer permanently failed; the timeline is most
4449 1 : // likely unusable after this. sadly we cannot just poison the layermap
4450 1 : // lock with panic, because that would create an issue with shutdown.
4451 1 : //
4452 1 : // this does not change the retry semantics on failed downloads.
4453 1 : //
4454 1 : // use of Relaxed is valid because closing of the semaphore gives
4455 1 : // happens-before and wakes up any waiters; we write this value before
4456 1 : // and any waiters (or would be waiters) will load it after closing
4457 1 : // semaphore.
4458 1 : //
4459 1 : // See: https://github.com/neondatabase/neon/issues/3533
4460 1 : remote_layer
4461 1 : .download_replacement_failure
4462 1 : .store(true, Relaxed);
4463 1369 : }
4464 : }
4465 1370 : drop_wlock(guard);
4466 1370 :
4467 1370 : info!("on-demand download successful");
4468 :
4469 : // Now that we've inserted the download into the layer map,
4470 : // close the semaphore. This will make other waiters for
4471 : // this download return Ok(()).
4472 1370 : assert!(!remote_layer.ongoing_download.is_closed());
4473 1370 : remote_layer.ongoing_download.close();
4474 : } else {
4475 : // Keep semaphore open. We'll drop the permit at the end of the function.
4476 6 : error!(
4477 6 : "layer file download failed: {:?}",
4478 6 : result.as_ref().unwrap_err()
4479 6 : );
4480 : }
4481 :
4482 : // Don't treat it as an error if the task that triggered the download
4483 : // is no longer interested in the result.
4484 1376 : sender.send(result.map(|_sz| ())).ok();
4485 1376 :
4486 1376 : // In case we failed and there are other waiters, this will make one
4487 1376 : // of them retry the download in a new task.
4488 1376 : // XXX: This resets the exponential backoff because it's a new call to
4489 1376 : // download_layer file.
4490 1376 : drop(permit);
4491 1376 :
4492 1376 : Ok(())
4493 1376 : }
4494 : .in_current_span(),
4495 : );
4496 :
4497 : receiver.await.context("download task cancelled")?
4498 : }
4499 :
4500 3 : pub async fn spawn_download_all_remote_layers(
4501 3 : self: Arc<Self>,
4502 3 : request: DownloadRemoteLayersTaskSpawnRequest,
4503 3 : ) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
4504 3 : let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
4505 3 : if let Some(st) = &*status_guard {
4506 1 : match &st.state {
4507 : DownloadRemoteLayersTaskState::Running => {
4508 UBC 0 : return Err(st.clone());
4509 : }
4510 : DownloadRemoteLayersTaskState::ShutDown
4511 CBC 1 : | DownloadRemoteLayersTaskState::Completed => {
4512 1 : *status_guard = None;
4513 1 : }
4514 : }
4515 2 : }
4516 :
4517 3 : let self_clone = Arc::clone(&self);
4518 3 : let task_id = task_mgr::spawn(
4519 3 : task_mgr::BACKGROUND_RUNTIME.handle(),
4520 3 : task_mgr::TaskKind::DownloadAllRemoteLayers,
4521 3 : Some(self.tenant_id),
4522 3 : Some(self.timeline_id),
4523 3 : "download all remote layers task",
4524 : false,
4525 3 : async move {
4526 11 : self_clone.download_all_remote_layers(request).await;
4527 3 : let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
4528 3 : match &mut *status_guard {
4529 : None => {
4530 UBC 0 : warn!("tasks status is supposed to be Some(), since we are running");
4531 : }
4532 CBC 3 : Some(st) => {
4533 3 : let exp_task_id = format!("{}", task_mgr::current_task_id().unwrap());
4534 3 : if st.task_id != exp_task_id {
4535 UBC 0 : warn!("task id changed while we were still running, expecting {} but have {}", exp_task_id, st.task_id);
4536 CBC 3 : } else {
4537 3 : st.state = DownloadRemoteLayersTaskState::Completed;
4538 3 : }
4539 : }
4540 : };
4541 3 : Ok(())
4542 3 : }
4543 3 : .instrument(info_span!(parent: None, "download_all_remote_layers", tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))
4544 : );
4545 :
4546 3 : let initial_info = DownloadRemoteLayersTaskInfo {
4547 3 : task_id: format!("{task_id}"),
4548 3 : state: DownloadRemoteLayersTaskState::Running,
4549 3 : total_layer_count: 0,
4550 3 : successful_download_count: 0,
4551 3 : failed_download_count: 0,
4552 3 : };
4553 3 : *status_guard = Some(initial_info.clone());
4554 3 :
4555 3 : Ok(initial_info)
4556 3 : }
4557 :
4558 3 : async fn download_all_remote_layers(
4559 3 : self: &Arc<Self>,
4560 3 : request: DownloadRemoteLayersTaskSpawnRequest,
4561 3 : ) {
4562 3 : let mut downloads = Vec::new();
4563 : {
4564 3 : let guard = self.layers.read().await;
4565 3 : let layers = guard.layer_map();
4566 3 : layers
4567 3 : .iter_historic_layers()
4568 12 : .map(|l| guard.get_from_desc(&l))
4569 12 : .filter_map(|l| l.downcast_remote_layer())
4570 8 : .map(|l| self.download_remote_layer(l))
4571 8 : .for_each(|dl| downloads.push(dl))
4572 3 : }
4573 3 : let total_layer_count = downloads.len();
4574 3 : // limit download concurrency as specified in request
4575 3 : let downloads = futures::stream::iter(downloads);
4576 3 : let mut downloads = downloads.buffer_unordered(request.max_concurrent_downloads.get());
4577 3 :
4578 3 : macro_rules! lock_status {
4579 3 : ($st:ident) => {
4580 3 : let mut st = self.download_all_remote_layers_task_info.write().unwrap();
4581 3 : let st = st
4582 3 : .as_mut()
4583 3 : .expect("this function is only called after the task has been spawned");
4584 3 : assert_eq!(
4585 3 : st.task_id,
4586 3 : format!(
4587 3 : "{}",
4588 3 : task_mgr::current_task_id().expect("we run inside a task_mgr task")
4589 3 : )
4590 3 : );
4591 3 : let $st = st;
4592 3 : };
4593 3 : }
4594 3 :
4595 3 : {
4596 3 : lock_status!(st);
4597 3 : st.total_layer_count = total_layer_count as u64;
4598 : }
4599 11 : loop {
4600 22 : tokio::select! {
4601 11 : dl = downloads.next() => {
4602 : lock_status!(st);
4603 : match dl {
4604 : None => break,
4605 : Some(Ok(())) => {
4606 : st.successful_download_count += 1;
4607 : },
4608 : Some(Err(e)) => {
4609 4 : error!(error = %e, "layer download failed");
4610 : st.failed_download_count += 1;
4611 : }
4612 : }
4613 : }
4614 : _ = task_mgr::shutdown_watcher() => {
4615 : // Kind of pointless to watch for shutdowns here,
4616 : // as download_remote_layer spawns other task_mgr tasks internally.
4617 : lock_status!(st);
4618 : st.state = DownloadRemoteLayersTaskState::ShutDown;
4619 : }
4620 11 : }
4621 11 : }
4622 : {
4623 3 : lock_status!(st);
4624 3 : st.state = DownloadRemoteLayersTaskState::Completed;
4625 3 : }
4626 3 : }
4627 :
4628 22 : pub fn get_download_all_remote_layers_task_info(&self) -> Option<DownloadRemoteLayersTaskInfo> {
4629 22 : self.download_all_remote_layers_task_info
4630 22 : .read()
4631 22 : .unwrap()
4632 22 : .clone()
4633 22 : }
4634 : }
4635 :
4636 : pub struct DiskUsageEvictionInfo {
4637 : /// Timeline's largest layer (remote or resident)
4638 : pub max_layer_size: Option<u64>,
4639 : /// Timeline's resident layers
4640 : pub resident_layers: Vec<LocalLayerInfoForDiskUsageEviction>,
4641 : }
4642 :
4643 : pub struct LocalLayerInfoForDiskUsageEviction {
4644 : pub layer: Arc<dyn PersistentLayer>,
4645 : pub last_activity_ts: SystemTime,
4646 : }
4647 :
4648 : impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction {
4649 UBC 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4650 0 : // format the tv_sec, tv_nsec into rfc3339 in case someone is looking at it
4651 0 : // having to allocate a string to this is bad, but it will rarely be formatted
4652 0 : let ts = chrono::DateTime::<chrono::Utc>::from(self.last_activity_ts);
4653 0 : let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
4654 0 : f.debug_struct("LocalLayerInfoForDiskUsageEviction")
4655 0 : .field("layer", &self.layer)
4656 0 : .field("last_activity", &ts)
4657 0 : .finish()
4658 0 : }
4659 : }
4660 :
4661 : impl LocalLayerInfoForDiskUsageEviction {
4662 CBC 250 : pub fn file_size(&self) -> u64 {
4663 250 : self.layer.layer_desc().file_size
4664 250 : }
4665 : }
4666 :
4667 : impl Timeline {
4668 : /// Returns non-remote layers for eviction.
4669 13 : pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
4670 13 : let guard = self.layers.read().await;
4671 13 : let layers = guard.layer_map();
4672 13 :
4673 13 : let mut max_layer_size: Option<u64> = None;
4674 13 : let mut resident_layers = Vec::new();
4675 :
4676 250 : for l in layers.iter_historic_layers() {
4677 250 : let file_size = l.file_size();
4678 250 : max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
4679 250 :
4680 250 : let l = guard.get_from_desc(&l);
4681 250 :
4682 250 : if l.is_remote_layer() {
4683 UBC 0 : continue;
4684 CBC 250 : }
4685 250 :
4686 250 : let last_activity_ts = l.access_stats().latest_activity().unwrap_or_else(|| {
4687 UBC 0 : // We only use this fallback if there's an implementation error.
4688 0 : // `latest_activity` already does rate-limited warn!() log.
4689 0 : debug!(layer=%l, "last_activity returns None, using SystemTime::now");
4690 0 : SystemTime::now()
4691 CBC 250 : });
4692 250 :
4693 250 : resident_layers.push(LocalLayerInfoForDiskUsageEviction {
4694 250 : layer: l,
4695 250 : last_activity_ts,
4696 250 : });
4697 : }
4698 :
4699 13 : DiskUsageEvictionInfo {
4700 13 : max_layer_size,
4701 13 : resident_layers,
4702 13 : }
4703 13 : }
4704 : }
4705 :
4706 : type TraversalPathItem = (
4707 : ValueReconstructResult,
4708 : Lsn,
4709 : Box<dyn Send + FnOnce() -> TraversalId>,
4710 : );
4711 :
4712 : /// Helper function for get_reconstruct_data() to add the path of layers traversed
4713 : /// to an error, as anyhow context information.
4714 7 : fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructError {
4715 7 : // We want the original 'msg' to be the outermost context. The outermost context
4716 7 : // is the most high-level information, which also gets propagated to the client.
4717 7 : let mut msg_iter = path
4718 7 : .into_iter()
4719 12 : .map(|(r, c, l)| {
4720 12 : format!(
4721 12 : "layer traversal: result {:?}, cont_lsn {}, layer: {}",
4722 12 : r,
4723 12 : c,
4724 12 : l(),
4725 12 : )
4726 12 : })
4727 7 : .chain(std::iter::once(msg));
4728 7 : // Construct initial message from the first traversed layer
4729 7 : let err = anyhow!(msg_iter.next().unwrap());
4730 7 :
4731 7 : // Append all subsequent traversals, and the error message 'msg', as contexts.
4732 12 : let msg = msg_iter.fold(err, |err, msg| err.context(msg));
4733 7 : PageReconstructError::from(msg)
4734 7 : }
4735 :
4736 : /// Various functions to mutate the timeline.
4737 : // TODO Currently, Deref is used to allow easy access to read methods from this trait.
4738 : // This is probably considered a bad practice in Rust and should be fixed eventually,
4739 : // but will cause large code changes.
4740 : pub struct TimelineWriter<'a> {
4741 : tl: &'a Timeline,
4742 : _write_guard: tokio::sync::MutexGuard<'a, ()>,
4743 : }
4744 :
4745 : impl Deref for TimelineWriter<'_> {
4746 : type Target = Timeline;
4747 :
4748 UBC 0 : fn deref(&self) -> &Self::Target {
4749 0 : self.tl
4750 0 : }
4751 : }
4752 :
4753 : impl<'a> TimelineWriter<'a> {
4754 : /// Put a new page version that can be constructed from a WAL record
4755 : ///
4756 : /// This will implicitly extend the relation, if the page is beyond the
4757 : /// current end-of-file.
4758 CBC 76622124 : pub async fn put(
4759 76622124 : &self,
4760 76622124 : key: Key,
4761 76622124 : lsn: Lsn,
4762 76622124 : value: &Value,
4763 76622124 : ctx: &RequestContext,
4764 76622124 : ) -> anyhow::Result<()> {
4765 76622172 : self.tl.put_value(key, lsn, value, ctx).await
4766 76622167 : }
4767 :
4768 17687 : pub async fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
4769 17687 : self.tl.put_tombstone(key_range, lsn).await
4770 17687 : }
4771 :
4772 : /// Track the end of the latest digested WAL record.
4773 : /// Remember the (end of) last valid WAL record remembered in the timeline.
4774 : ///
4775 : /// Call this after you have finished writing all the WAL up to 'lsn'.
4776 : ///
4777 : /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for
4778 : /// the 'lsn' or anything older. The previous last record LSN is stored alongside
4779 : /// the latest and can be read.
4780 69330499 : pub fn finish_write(&self, new_lsn: Lsn) {
4781 69330499 : self.tl.finish_write(new_lsn);
4782 69330499 : }
4783 :
4784 1029042 : pub fn update_current_logical_size(&self, delta: i64) {
4785 1029042 : self.tl.update_current_logical_size(delta)
4786 1029042 : }
4787 : }
4788 :
4789 : // We need TimelineWriter to be send in upcoming conversion of
4790 : // Timeline::layers to tokio::sync::RwLock.
4791 1 : #[test]
4792 1 : fn is_send() {
4793 1 : fn _assert_send<T: Send>() {}
4794 1 : _assert_send::<TimelineWriter<'_>>();
4795 1 : }
4796 :
4797 : /// Add a suffix to a layer file's name: .{num}.old
4798 : /// Uses the first available num (starts at 0)
4799 1 : fn rename_to_backup(path: &Utf8Path) -> anyhow::Result<()> {
4800 1 : let filename = path
4801 1 : .file_name()
4802 1 : .ok_or_else(|| anyhow!("Path {path} don't have a file name"))?;
4803 1 : let mut new_path = path.to_owned();
4804 :
4805 1 : for i in 0u32.. {
4806 1 : new_path.set_file_name(format!("{filename}.{i}.old"));
4807 1 : if !new_path.exists() {
4808 1 : std::fs::rename(path, &new_path)
4809 1 : .with_context(|| format!("rename {path:?} to {new_path:?}"))?;
4810 1 : return Ok(());
4811 UBC 0 : }
4812 : }
4813 :
4814 0 : bail!("couldn't find an unused backup number for {:?}", path)
4815 CBC 1 : }
4816 :
4817 : /// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
4818 : ///
4819 : /// Returns `true` if the two `Arc` point to the same layer, false otherwise.
4820 : ///
4821 : /// If comparing persistent layers, ALWAYS compare the layer descriptor key.
4822 : #[inline(always)]
4823 7310 : pub fn compare_arced_layers<L: ?Sized>(left: &Arc<L>, right: &Arc<L>) -> bool {
4824 7310 : // "dyn Trait" objects are "fat pointers" in that they have two components:
4825 7310 : // - pointer to the object
4826 7310 : // - pointer to the vtable
4827 7310 : //
4828 7310 : // rust does not provide a guarantee that these vtables are unique, but however
4829 7310 : // `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
4830 7310 : // pointer and the vtable need to be equal.
4831 7310 : //
4832 7310 : // See: https://github.com/rust-lang/rust/issues/103763
4833 7310 : //
4834 7310 : // A future version of rust will most likely use this form below, where we cast each
4835 7310 : // pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
4836 7310 : // not affect the comparison.
4837 7310 : //
4838 7310 : // See: https://github.com/rust-lang/rust/pull/106450
4839 7310 : let left = Arc::as_ptr(left) as *const ();
4840 7310 : let right = Arc::as_ptr(right) as *const ();
4841 7310 :
4842 7310 : left == right
4843 7310 : }
4844 :
4845 : #[cfg(test)]
4846 : mod tests {
4847 : use std::sync::Arc;
4848 :
4849 : use utils::{id::TimelineId, lsn::Lsn};
4850 :
4851 : use crate::tenant::{harness::TenantHarness, storage_layer::PersistentLayer};
4852 :
4853 : use super::{EvictionError, Timeline};
4854 :
4855 1 : #[tokio::test]
4856 1 : async fn two_layer_eviction_attempts_at_the_same_time() {
4857 1 : let harness =
4858 1 : TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
4859 1 :
4860 1 : let ctx = any_context();
4861 1 : let tenant = harness.try_load(&ctx).await.unwrap();
4862 1 : let timeline = tenant
4863 1 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
4864 2 : .await
4865 1 : .unwrap();
4866 1 :
4867 1 : let rc = timeline
4868 1 : .remote_client
4869 1 : .clone()
4870 1 : .expect("just configured this");
4871 :
4872 1 : let layer = find_some_layer(&timeline).await;
4873 :
4874 1 : let cancel = tokio_util::sync::CancellationToken::new();
4875 1 : let batch = [layer];
4876 1 :
4877 1 : let first = {
4878 1 : let cancel = cancel.clone();
4879 1 : async {
4880 1 : timeline
4881 1 : .evict_layer_batch(&rc, &batch, cancel)
4882 UBC 0 : .await
4883 CBC 1 : .unwrap()
4884 1 : }
4885 : };
4886 1 : let second = async {
4887 1 : timeline
4888 1 : .evict_layer_batch(&rc, &batch, cancel)
4889 UBC 0 : .await
4890 CBC 1 : .unwrap()
4891 1 : };
4892 :
4893 1 : let (first, second) = tokio::join!(first, second);
4894 :
4895 1 : let (first, second) = (only_one(first), only_one(second));
4896 1 :
4897 1 : match (first, second) {
4898 : (Ok(()), Err(EvictionError::FileNotFound))
4899 1 : | (Err(EvictionError::FileNotFound), Ok(())) => {
4900 1 : // one of the evictions gets to do it,
4901 1 : // other one gets FileNotFound. all is good.
4902 1 : }
4903 UBC 0 : other => unreachable!("unexpected {:?}", other),
4904 : }
4905 : }
4906 :
4907 CBC 1 : #[tokio::test]
4908 1 : async fn layer_eviction_aba_fails() {
4909 1 : let harness = TenantHarness::create("layer_eviction_aba_fails").unwrap();
4910 1 :
4911 1 : let ctx = any_context();
4912 1 : let tenant = harness.try_load(&ctx).await.unwrap();
4913 1 : let timeline = tenant
4914 1 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
4915 2 : .await
4916 1 : .unwrap();
4917 :
4918 1 : let _e = tracing::info_span!("foobar", tenant_id = %tenant.tenant_id, timeline_id = %timeline.timeline_id).entered();
4919 1 :
4920 1 : let rc = timeline.remote_client.clone().unwrap();
4921 :
4922 : // TenantHarness allows uploads to happen given GenericRemoteStorage is configured
4923 1 : let layer = find_some_layer(&timeline).await;
4924 :
4925 1 : let cancel = tokio_util::sync::CancellationToken::new();
4926 1 : let batch = [layer];
4927 1 :
4928 1 : let first = {
4929 1 : let cancel = cancel.clone();
4930 1 : async {
4931 1 : timeline
4932 1 : .evict_layer_batch(&rc, &batch, cancel)
4933 UBC 0 : .await
4934 CBC 1 : .unwrap()
4935 1 : }
4936 : };
4937 :
4938 : // lets imagine this is stuck somehow, still referencing the original `Arc<dyn PersistentLayer>`
4939 1 : let second = {
4940 1 : let cancel = cancel.clone();
4941 1 : async {
4942 1 : timeline
4943 1 : .evict_layer_batch(&rc, &batch, cancel)
4944 UBC 0 : .await
4945 CBC 1 : .unwrap()
4946 1 : }
4947 : };
4948 :
4949 : // while it's stuck, we evict and end up redownloading it
4950 1 : only_one(first.await).expect("eviction succeeded");
4951 :
4952 1 : let layer = find_some_layer(&timeline).await;
4953 1 : let layer = layer.downcast_remote_layer().unwrap();
4954 1 : timeline.download_remote_layer(layer).await.unwrap();
4955 :
4956 1 : let res = only_one(second.await);
4957 :
4958 1 : assert!(
4959 1 : matches!(res, Err(EvictionError::LayerNotFound(_))),
4960 UBC 0 : "{res:?}"
4961 : );
4962 :
4963 : // no more specific asserting, outside of preconds this is the only valid replacement
4964 : // failure
4965 : }
4966 :
4967 CBC 2 : fn any_context() -> crate::context::RequestContext {
4968 2 : use crate::context::*;
4969 2 : use crate::task_mgr::*;
4970 2 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
4971 2 : }
4972 :
4973 4 : fn only_one<T>(mut input: Vec<Option<T>>) -> T {
4974 4 : assert_eq!(1, input.len());
4975 4 : input
4976 4 : .pop()
4977 4 : .expect("length just checked")
4978 4 : .expect("no cancellation")
4979 4 : }
4980 :
4981 3 : async fn find_some_layer(timeline: &Timeline) -> Arc<dyn PersistentLayer> {
4982 3 : let layers = timeline.layers.read().await;
4983 3 : let desc = layers
4984 3 : .layer_map()
4985 3 : .iter_historic_layers()
4986 3 : .next()
4987 3 : .expect("must find one layer to evict");
4988 3 :
4989 3 : layers.get_from_desc(&desc)
4990 3 : }
4991 : }
|