Line data Source code
1 : pub mod delete;
2 : mod eviction_task;
3 : mod init;
4 : pub mod layer_manager;
5 : mod logical_size;
6 : pub mod span;
7 : pub mod uninit;
8 : mod walreceiver;
9 :
10 : use anyhow::{anyhow, bail, ensure, Context, Result};
11 : use bytes::Bytes;
12 : use fail::fail_point;
13 : use futures::StreamExt;
14 : use itertools::Itertools;
15 : use pageserver_api::models::{
16 : DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
17 : DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceEventReason, LayerResidenceStatus,
18 : TimelineState,
19 : };
20 : use remote_storage::GenericRemoteStorage;
21 : use serde_with::serde_as;
22 : use storage_broker::BrokerClientChannel;
23 : use tokio::runtime::Handle;
24 : use tokio::sync::{oneshot, watch, TryAcquireError};
25 : use tokio_util::sync::CancellationToken;
26 : use tracing::*;
27 : use utils::id::TenantTimelineId;
28 :
29 : use std::cmp::{max, min, Ordering};
30 : use std::collections::{BinaryHeap, HashMap, HashSet};
31 : use std::ops::{Deref, Range};
32 : use std::path::{Path, PathBuf};
33 : use std::pin::pin;
34 : use std::sync::atomic::Ordering as AtomicOrdering;
35 : use std::sync::{Arc, Mutex, RwLock, Weak};
36 : use std::time::{Duration, Instant, SystemTime};
37 :
38 : use crate::context::{
39 : AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
40 : };
41 : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
42 : use crate::tenant::storage_layer::delta_layer::DeltaEntry;
43 : use crate::tenant::storage_layer::{
44 : DeltaLayerWriter, ImageLayerWriter, InMemoryLayer, LayerAccessStats, LayerFileName, RemoteLayer,
45 : };
46 : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
47 : use crate::tenant::{
48 : layer_map::{LayerMap, SearchResult},
49 : metadata::{save_metadata, TimelineMetadata},
50 : par_fsync,
51 : storage_layer::{PersistentLayer, ValueReconstructResult, ValueReconstructState},
52 : };
53 :
54 : use crate::config::PageServerConf;
55 : use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
56 : use crate::metrics::{
57 : TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
58 : RECONSTRUCT_TIME, UNEXPECTED_ONDEMAND_DOWNLOADS,
59 : };
60 : use crate::pgdatadir_mapping::LsnForTimestamp;
61 : use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
62 : use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
63 : use crate::tenant::config::{EvictionPolicy, TenantConfOpt};
64 : use pageserver_api::reltag::RelTag;
65 :
66 : use postgres_connection::PgConnectionConfig;
67 : use postgres_ffi::to_pg_timestamp;
68 : use utils::{
69 : completion,
70 : generation::Generation,
71 : id::{TenantId, TimelineId},
72 : lsn::{AtomicLsn, Lsn, RecordLsn},
73 : seqwait::SeqWait,
74 : simple_rcu::{Rcu, RcuReadGuard},
75 : };
76 :
77 : use crate::page_cache;
78 : use crate::repository::GcResult;
79 : use crate::repository::{Key, Value};
80 : use crate::task_mgr;
81 : use crate::task_mgr::TaskKind;
82 : use crate::walredo::WalRedoManager;
83 : use crate::ZERO_PAGE;
84 :
85 : use self::delete::DeleteTimelineFlow;
86 : pub(super) use self::eviction_task::EvictionTaskTenantState;
87 : use self::eviction_task::EvictionTaskTimelineState;
88 : use self::layer_manager::LayerManager;
89 : use self::logical_size::LogicalSize;
90 : use self::walreceiver::{WalReceiver, WalReceiverConf};
91 :
92 : use super::config::TenantConf;
93 : use super::debug_assert_current_span_has_tenant_and_timeline_id;
94 : use super::remote_timeline_client::index::IndexPart;
95 : use super::remote_timeline_client::RemoteTimelineClient;
96 : use super::storage_layer::{
97 : AsLayerDesc, DeltaLayer, ImageLayer, LayerAccessStatsReset, PersistentLayerDesc,
98 : };
99 :
100 78 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
101 : pub(super) enum FlushLoopState {
102 : NotStarted,
103 : Running {
104 : #[cfg(test)]
105 : expect_initdb_optimization: bool,
106 : #[cfg(test)]
107 : initdb_optimization_count: usize,
108 : },
109 : Exited,
110 : }
111 :
112 : /// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
113 0 : #[derive(Debug, Clone, PartialEq, Eq)]
114 : pub struct Hole {
115 : key_range: Range<Key>,
116 : coverage_size: usize,
117 : }
118 :
119 : impl Ord for Hole {
120 4 : fn cmp(&self, other: &Self) -> Ordering {
121 4 : other.coverage_size.cmp(&self.coverage_size) // inverse order
122 4 : }
123 : }
124 :
125 : impl PartialOrd for Hole {
126 4 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
127 4 : Some(self.cmp(other))
128 4 : }
129 : }
130 :
131 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
132 : /// Can be removed after all refactors are done.
133 290 : fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
134 290 : drop(rlock)
135 290 : }
136 :
137 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
138 : /// Can be removed after all refactors are done.
139 2831 : fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
140 2831 : drop(rlock)
141 2831 : }
142 :
143 : /// The outward-facing resources required to build a Timeline
144 : pub struct TimelineResources {
145 : pub remote_client: Option<RemoteTimelineClient>,
146 : }
147 :
148 : pub struct Timeline {
149 : conf: &'static PageServerConf,
150 : tenant_conf: Arc<RwLock<TenantConfOpt>>,
151 :
152 : myself: Weak<Self>,
153 :
154 : pub tenant_id: TenantId,
155 : pub timeline_id: TimelineId,
156 :
157 : /// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
158 : /// Never changes for the lifetime of this [`Timeline`] object.
159 : generation: Generation,
160 :
161 : pub pg_version: u32,
162 :
163 : /// The tuple has two elements.
164 : /// 1. `LayerFileManager` keeps track of the various physical representations of the layer files (inmem, local, remote).
165 : /// 2. `LayerMap`, the acceleration data structure for `get_reconstruct_data`.
166 : ///
167 : /// `LayerMap` maps out the `(PAGE,LSN) / (KEY,LSN)` space, which is composed of `(KeyRange, LsnRange)` rectangles.
168 : /// We describe these rectangles through the `PersistentLayerDesc` struct.
169 : ///
170 : /// When we want to reconstruct a page, we first find the `PersistentLayerDesc`'s that we need for page reconstruction,
171 : /// using `LayerMap`. Then, we use `LayerFileManager` to get the `PersistentLayer`'s that correspond to the
172 : /// `PersistentLayerDesc`'s.
173 : ///
174 : /// Hence, it's important to keep things coherent. The `LayerFileManager` must always have an entry for all
175 : /// `PersistentLayerDesc`'s in the `LayerMap`. If it doesn't, `LayerFileManager::get_from_desc` will panic at
176 : /// runtime, e.g., during page reconstruction.
177 : ///
178 : /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
179 : /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
180 : pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
181 :
182 : /// Set of key ranges which should be covered by image layers to
183 : /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
184 : /// It is used by compaction task when it checks if new image layer should be created.
185 : /// Newly created image layer doesn't help to remove the delta layer, until the
186 : /// newly created image layer falls off the PITR horizon. So on next GC cycle,
187 : /// gc_timeline may still want the new image layer to be created. To avoid redundant
188 : /// image layers creation we should check if image layer exists but beyond PITR horizon.
189 : /// This is why we need remember GC cutoff LSN.
190 : ///
191 : wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
192 :
193 : last_freeze_at: AtomicLsn,
194 : // Atomic would be more appropriate here.
195 : last_freeze_ts: RwLock<Instant>,
196 :
197 : // WAL redo manager
198 : walredo_mgr: Arc<dyn WalRedoManager + Sync + Send>,
199 :
200 : /// Remote storage client.
201 : /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
202 : pub remote_client: Option<Arc<RemoteTimelineClient>>,
203 :
204 : // What page versions do we hold in the repository? If we get a
205 : // request > last_record_lsn, we need to wait until we receive all
206 : // the WAL up to the request. The SeqWait provides functions for
207 : // that. TODO: If we get a request for an old LSN, such that the
208 : // versions have already been garbage collected away, we should
209 : // throw an error, but we don't track that currently.
210 : //
211 : // last_record_lsn.load().last points to the end of last processed WAL record.
212 : //
213 : // We also remember the starting point of the previous record in
214 : // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
215 : // first WAL record when the node is started up. But here, we just
216 : // keep track of it.
217 : last_record_lsn: SeqWait<RecordLsn, Lsn>,
218 :
219 : // All WAL records have been processed and stored durably on files on
220 : // local disk, up to this LSN. On crash and restart, we need to re-process
221 : // the WAL starting from this point.
222 : //
223 : // Some later WAL records might have been processed and also flushed to disk
224 : // already, so don't be surprised to see some, but there's no guarantee on
225 : // them yet.
226 : disk_consistent_lsn: AtomicLsn,
227 :
228 : // Parent timeline that this timeline was branched from, and the LSN
229 : // of the branch point.
230 : ancestor_timeline: Option<Arc<Timeline>>,
231 : ancestor_lsn: Lsn,
232 :
233 : pub(super) metrics: TimelineMetrics,
234 :
235 : /// Ensures layers aren't frozen by checkpointer between
236 : /// [`Timeline::get_layer_for_write`] and layer reads.
237 : /// Locked automatically by [`TimelineWriter`] and checkpointer.
238 : /// Must always be acquired before the layer map/individual layer lock
239 : /// to avoid deadlock.
240 : write_lock: tokio::sync::Mutex<()>,
241 :
242 : /// Used to avoid multiple `flush_loop` tasks running
243 : pub(super) flush_loop_state: Mutex<FlushLoopState>,
244 :
245 : /// layer_flush_start_tx can be used to wake up the layer-flushing task.
246 : /// The value is a counter, incremented every time a new flush cycle is requested.
247 : /// The flush cycle counter is sent back on the layer_flush_done channel when
248 : /// the flush finishes. You can use that to wait for the flush to finish.
249 : layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
250 : /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
251 : layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>,
252 :
253 : /// Layer removal lock.
254 : /// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
255 : /// This lock is acquired in [`Timeline::gc`] and [`Timeline::compact`].
256 : /// This is an `Arc<Mutex>` lock because we need an owned
257 : /// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
258 : /// Note that [`DeleteTimelineFlow`] uses `delete_progress` field.
259 : pub(super) layer_removal_cs: Arc<tokio::sync::Mutex<()>>,
260 :
261 : // Needed to ensure that we can't create a branch at a point that was already garbage collected
262 : pub latest_gc_cutoff_lsn: Rcu<Lsn>,
263 :
264 : // List of child timelines and their branch points. This is needed to avoid
265 : // garbage collecting data that is still needed by the child timelines.
266 : pub gc_info: std::sync::RwLock<GcInfo>,
267 :
268 : // It may change across major versions so for simplicity
269 : // keep it after running initdb for a timeline.
270 : // It is needed in checks when we want to error on some operations
271 : // when they are requested for pre-initdb lsn.
272 : // It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn",
273 : // though let's keep them both for better error visibility.
274 : pub initdb_lsn: Lsn,
275 :
276 : /// When did we last calculate the partitioning?
277 : partitioning: Mutex<(KeyPartitioning, Lsn)>,
278 :
279 : /// Configuration: how often should the partitioning be recalculated.
280 : repartition_threshold: u64,
281 :
282 : /// Current logical size of the "datadir", at the last LSN.
283 : current_logical_size: LogicalSize,
284 :
285 : /// Information about the last processed message by the WAL receiver,
286 : /// or None if WAL receiver has not received anything for this timeline
287 : /// yet.
288 : pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
289 : pub walreceiver: Mutex<Option<WalReceiver>>,
290 :
291 : /// Relation size cache
292 : pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
293 :
294 : download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
295 :
296 : state: watch::Sender<TimelineState>,
297 :
298 : /// Prevent two tasks from deleting the timeline at the same time. If held, the
299 : /// timeline is being deleted. If 'true', the timeline has already been deleted.
300 : pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
301 :
302 : eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
303 :
304 : /// Barrier to wait before doing initial logical size calculation. Used only during startup.
305 : initial_logical_size_can_start: Option<completion::Barrier>,
306 :
307 : /// Completion shared between all timelines loaded during startup; used to delay heavier
308 : /// background tasks until some logical sizes have been calculated.
309 : initial_logical_size_attempt: Mutex<Option<completion::Completion>>,
310 :
311 : /// Load or creation time information about the disk_consistent_lsn and when the loading
312 : /// happened. Used for consumption metrics.
313 : pub(crate) loaded_at: (Lsn, SystemTime),
314 : }
315 :
316 : pub struct WalReceiverInfo {
317 : pub wal_source_connconf: PgConnectionConfig,
318 : pub last_received_msg_lsn: Lsn,
319 : pub last_received_msg_ts: u128,
320 : }
321 :
322 : ///
323 : /// Information about how much history needs to be retained, needed by
324 : /// Garbage Collection.
325 : ///
326 : pub struct GcInfo {
327 : /// Specific LSNs that are needed.
328 : ///
329 : /// Currently, this includes all points where child branches have
330 : /// been forked off from. In the future, could also include
331 : /// explicit user-defined snapshot points.
332 : pub retain_lsns: Vec<Lsn>,
333 :
334 : /// In addition to 'retain_lsns', keep everything newer than this
335 : /// point.
336 : ///
337 : /// This is calculated by subtracting 'gc_horizon' setting from
338 : /// last-record LSN
339 : ///
340 : /// FIXME: is this inclusive or exclusive?
341 : pub horizon_cutoff: Lsn,
342 :
343 : /// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this
344 : /// point.
345 : ///
346 : /// This is calculated by finding a number such that a record is needed for PITR
347 : /// if only if its LSN is larger than 'pitr_cutoff'.
348 : pub pitr_cutoff: Lsn,
349 : }
350 :
351 : /// An error happened in a get() operation.
352 29 : #[derive(thiserror::Error)]
353 : pub enum PageReconstructError {
354 : #[error(transparent)]
355 : Other(#[from] anyhow::Error),
356 :
357 : /// The operation would require downloading a layer that is missing locally.
358 : NeedsDownload(TenantTimelineId, LayerFileName),
359 :
360 : /// The operation was cancelled
361 : Cancelled,
362 :
363 : /// The ancestor of this is being stopped
364 : AncestorStopping(TimelineId),
365 :
366 : /// An error happened replaying WAL records
367 : #[error(transparent)]
368 : WalRedo(#[from] crate::walredo::WalRedoError),
369 : }
370 :
371 : impl std::fmt::Debug for PageReconstructError {
372 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
373 0 : match self {
374 0 : Self::Other(err) => err.fmt(f),
375 0 : Self::NeedsDownload(tenant_timeline_id, layer_file_name) => {
376 0 : write!(
377 0 : f,
378 0 : "layer {}/{} needs download",
379 0 : tenant_timeline_id,
380 0 : layer_file_name.file_name()
381 0 : )
382 : }
383 0 : Self::Cancelled => write!(f, "cancelled"),
384 0 : Self::AncestorStopping(timeline_id) => {
385 0 : write!(f, "ancestor timeline {timeline_id} is being stopped")
386 : }
387 0 : Self::WalRedo(err) => err.fmt(f),
388 : }
389 0 : }
390 : }
391 :
392 : impl std::fmt::Display for PageReconstructError {
393 18 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
394 18 : match self {
395 17 : Self::Other(err) => err.fmt(f),
396 0 : Self::NeedsDownload(tenant_timeline_id, layer_file_name) => {
397 0 : write!(
398 0 : f,
399 0 : "layer {}/{} needs download",
400 0 : tenant_timeline_id,
401 0 : layer_file_name.file_name()
402 0 : )
403 : }
404 0 : Self::Cancelled => write!(f, "cancelled"),
405 1 : Self::AncestorStopping(timeline_id) => {
406 1 : write!(f, "ancestor timeline {timeline_id} is being stopped")
407 : }
408 0 : Self::WalRedo(err) => err.fmt(f),
409 : }
410 18 : }
411 : }
412 :
413 0 : #[derive(Clone, Copy)]
414 : pub enum LogicalSizeCalculationCause {
415 : Initial,
416 : ConsumptionMetricsSyntheticSize,
417 : EvictionTaskImitation,
418 : TenantSizeHandler,
419 : }
420 :
421 : /// Public interface functions
422 : impl Timeline {
423 : /// Get the LSN where this branch was created
424 2820 : pub fn get_ancestor_lsn(&self) -> Lsn {
425 2820 : self.ancestor_lsn
426 2820 : }
427 :
428 : /// Get the ancestor's timeline id
429 5176 : pub fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
430 5176 : self.ancestor_timeline
431 5176 : .as_ref()
432 5176 : .map(|ancestor| ancestor.timeline_id)
433 5176 : }
434 :
435 : /// Lock and get timeline's GC cuttof
436 4604171 : pub fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
437 4604171 : self.latest_gc_cutoff_lsn.read()
438 4604171 : }
439 :
440 : /// Look up given page version.
441 : ///
442 : /// If a remote layer file is needed, it is downloaded as part of this
443 : /// call.
444 : ///
445 : /// NOTE: It is considered an error to 'get' a key that doesn't exist. The
446 : /// abstraction above this needs to store suitable metadata to track what
447 : /// data exists with what keys, in separate metadata entries. If a
448 : /// non-existent key is requested, we may incorrectly return a value from
449 : /// an ancestor branch, for example, or waste a lot of cycles chasing the
450 : /// non-existing key.
451 : ///
452 7262298 : pub async fn get(
453 7262298 : &self,
454 7262298 : key: Key,
455 7262298 : lsn: Lsn,
456 7262298 : ctx: &RequestContext,
457 7262300 : ) -> Result<Bytes, PageReconstructError> {
458 7262300 : if !lsn.is_valid() {
459 0 : return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
460 7262300 : }
461 :
462 : // XXX: structured stats collection for layer eviction here.
463 0 : trace!(
464 0 : "get page request for {}@{} from task kind {:?}",
465 0 : key,
466 0 : lsn,
467 0 : ctx.task_kind()
468 0 : );
469 :
470 : // Check the page cache. We will get back the most recent page with lsn <= `lsn`.
471 : // The cached image can be returned directly if there is no WAL between the cached image
472 : // and requested LSN. The cached image can also be used to reduce the amount of WAL needed
473 : // for redo.
474 7262300 : let cached_page_img = match self.lookup_cached_page(&key, lsn).await {
475 1740445 : Some((cached_lsn, cached_img)) => {
476 1740445 : match cached_lsn.cmp(&lsn) {
477 1740405 : Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
478 : Ordering::Equal => {
479 40 : MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
480 40 : return Ok(cached_img); // exact LSN match, return the image
481 : }
482 : Ordering::Greater => {
483 0 : unreachable!("the returned lsn should never be after the requested lsn")
484 : }
485 : }
486 1740405 : Some((cached_lsn, cached_img))
487 : }
488 5521855 : None => None,
489 : };
490 :
491 7262260 : let mut reconstruct_state = ValueReconstructState {
492 7262260 : records: Vec::new(),
493 7262260 : img: cached_page_img,
494 7262260 : };
495 7262260 :
496 7262260 : let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
497 7262260 : self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
498 9754894 : .await?;
499 7262204 : timer.stop_and_record();
500 7262204 :
501 7262204 : RECONSTRUCT_TIME
502 7262204 : .observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
503 734 : .await
504 7262263 : }
505 :
506 : /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
507 89040437 : pub fn get_last_record_lsn(&self) -> Lsn {
508 89040437 : self.last_record_lsn.load().last
509 89040437 : }
510 :
511 2337 : pub fn get_prev_record_lsn(&self) -> Lsn {
512 2337 : self.last_record_lsn.load().prev
513 2337 : }
514 :
515 : /// Atomically get both last and prev.
516 1029 : pub fn get_last_record_rlsn(&self) -> RecordLsn {
517 1029 : self.last_record_lsn.load()
518 1029 : }
519 :
520 738701 : pub fn get_disk_consistent_lsn(&self) -> Lsn {
521 738701 : self.disk_consistent_lsn.load()
522 738701 : }
523 :
524 : pub fn get_remote_consistent_lsn(&self) -> Option<Lsn> {
525 736499 : if let Some(remote_client) = &self.remote_client {
526 116151 : remote_client.last_uploaded_consistent_lsn()
527 : } else {
528 620348 : None
529 : }
530 736499 : }
531 :
532 : /// The sum of the file size of all historic layers in the layer map.
533 : /// This method makes no distinction between local and remote layers.
534 : /// Hence, the result **does not represent local filesystem usage**.
535 2781 : pub async fn layer_size_sum(&self) -> u64 {
536 2781 : let guard = self.layers.read().await;
537 2781 : let layer_map = guard.layer_map();
538 2781 : let mut size = 0;
539 64575 : for l in layer_map.iter_historic_layers() {
540 64575 : size += l.file_size();
541 64575 : }
542 2781 : size
543 2781 : }
544 :
545 46 : pub fn resident_physical_size(&self) -> u64 {
546 46 : self.metrics.resident_physical_size_gauge.get()
547 46 : }
548 :
549 : ///
550 : /// Wait until WAL has been received and processed up to this LSN.
551 : ///
552 : /// You should call this before any of the other get_* or list_* functions. Calling
553 : /// those functions with an LSN that has been processed yet is an error.
554 : ///
555 1295744 : pub async fn wait_lsn(
556 1295744 : &self,
557 1295744 : lsn: Lsn,
558 1295744 : _ctx: &RequestContext, /* Prepare for use by cancellation */
559 1295745 : ) -> anyhow::Result<()> {
560 1295745 : anyhow::ensure!(self.is_active(), "Cannot wait for Lsn on inactive timeline");
561 :
562 : // This should never be called from the WAL receiver, because that could lead
563 : // to a deadlock.
564 1295745 : anyhow::ensure!(
565 1295745 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
566 0 : "wait_lsn cannot be called in WAL receiver"
567 : );
568 1295745 : anyhow::ensure!(
569 1295745 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
570 0 : "wait_lsn cannot be called in WAL receiver"
571 : );
572 1295745 : anyhow::ensure!(
573 1295745 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
574 0 : "wait_lsn cannot be called in WAL receiver"
575 : );
576 :
577 1295745 : let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
578 1295745 :
579 1295745 : match self
580 1295745 : .last_record_lsn
581 1295745 : .wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
582 169869 : .await
583 : {
584 1295741 : Ok(()) => Ok(()),
585 4 : Err(e) => {
586 4 : // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
587 4 : drop(_timer);
588 4 : let walreceiver_status = {
589 4 : match &*self.walreceiver.lock().unwrap() {
590 0 : None => "stopping or stopped".to_string(),
591 4 : Some(walreceiver) => match walreceiver.status() {
592 3 : Some(status) => status.to_human_readable_string(),
593 1 : None => "Not active".to_string(),
594 : },
595 : }
596 : };
597 4 : Err(anyhow::Error::new(e).context({
598 4 : format!(
599 4 : "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
600 4 : lsn,
601 4 : self.get_last_record_lsn(),
602 4 : self.get_disk_consistent_lsn(),
603 4 : walreceiver_status,
604 4 : )
605 4 : }))
606 : }
607 : }
608 1295745 : }
609 :
610 : /// Check that it is valid to request operations with that lsn.
611 582 : pub fn check_lsn_is_in_scope(
612 582 : &self,
613 582 : lsn: Lsn,
614 582 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
615 582 : ) -> anyhow::Result<()> {
616 582 : ensure!(
617 582 : lsn >= **latest_gc_cutoff_lsn,
618 9 : "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
619 9 : lsn,
620 9 : **latest_gc_cutoff_lsn,
621 : );
622 573 : Ok(())
623 582 : }
624 :
625 : /// Flush to disk all data that was written with the put_* functions
626 7192 : #[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
627 : pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
628 : self.freeze_inmem_layer(false).await;
629 : self.flush_frozen_layers_and_wait().await
630 : }
631 :
632 : /// Outermost timeline compaction operation; downloads needed layers.
633 1401 : pub async fn compact(
634 1401 : self: &Arc<Self>,
635 1401 : cancel: &CancellationToken,
636 1401 : ctx: &RequestContext,
637 1401 : ) -> anyhow::Result<()> {
638 : const ROUNDS: usize = 2;
639 :
640 : static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
641 238 : once_cell::sync::Lazy::new(|| {
642 238 : let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
643 238 : let permits = usize::max(
644 238 : 1,
645 238 : // while a lot of the work is done on spawn_blocking, we still do
646 238 : // repartitioning in the async context. this should give leave us some workers
647 238 : // unblocked to be blocked on other work, hopefully easing any outside visible
648 238 : // effects of restarts.
649 238 : //
650 238 : // 6/8 is a guess; previously we ran with unlimited 8 and more from
651 238 : // spawn_blocking.
652 238 : (total_threads * 3).checked_div(4).unwrap_or(0),
653 238 : );
654 238 : assert_ne!(permits, 0, "we will not be adding in permits later");
655 238 : assert!(
656 238 : permits < total_threads,
657 0 : "need threads avail for shorter work"
658 : );
659 238 : tokio::sync::Semaphore::new(permits)
660 238 : });
661 :
662 : // this wait probably never needs any "long time spent" logging, because we already nag if
663 : // compaction task goes over it's period (20s) which is quite often in production.
664 1401 : let _permit = tokio::select! {
665 1401 : permit = CONCURRENT_COMPACTIONS.acquire() => {
666 : permit
667 : },
668 : _ = cancel.cancelled() => {
669 : return Ok(());
670 : }
671 : };
672 :
673 1401 : let last_record_lsn = self.get_last_record_lsn();
674 1401 :
675 1401 : // Last record Lsn could be zero in case the timeline was just created
676 1401 : if !last_record_lsn.is_valid() {
677 0 : warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
678 0 : return Ok(());
679 1401 : }
680 :
681 : // retry two times to allow first round to find layers which need to be downloaded, then
682 : // download them, then retry compaction
683 1402 : for round in 0..ROUNDS {
684 : // should we error out with the most specific error?
685 1402 : let last_round = round == ROUNDS - 1;
686 :
687 1078251 : let res = self.compact_inner(ctx).await;
688 :
689 : // If `create_image_layers' or `compact_level0` scheduled any
690 : // uploads or deletions, but didn't update the index file yet,
691 : // do it now.
692 : //
693 : // This isn't necessary for correctness, the remote state is
694 : // consistent without the uploads and deletions, and we would
695 : // update the index file on next flush iteration too. But it
696 : // could take a while until that happens.
697 : //
698 : // Additionally, only do this once before we return from this function.
699 1392 : if last_round || res.is_ok() {
700 1390 : if let Some(remote_client) = &self.remote_client {
701 633 : remote_client.schedule_index_upload_for_file_changes()?;
702 757 : }
703 2 : }
704 :
705 1 : let rls = match res {
706 1390 : Ok(()) => return Ok(()),
707 1 : Err(CompactionError::DownloadRequired(rls)) if !last_round => {
708 1 : // this can be done at most one time before exiting, waiting
709 1 : rls
710 : }
711 0 : Err(CompactionError::DownloadRequired(rls)) => {
712 0 : anyhow::bail!("Compaction requires downloading multiple times (last was {} layers), possibly battling against eviction", rls.len())
713 : }
714 : Err(CompactionError::ShuttingDown) => {
715 0 : return Ok(());
716 : }
717 1 : Err(CompactionError::Other(e)) => {
718 1 : return Err(e);
719 : }
720 : };
721 :
722 : // this path can be visited in the second round of retrying, if first one found that we
723 : // must first download some remote layers
724 1 : let total = rls.len();
725 1 :
726 1 : let mut downloads = rls
727 1 : .into_iter()
728 3 : .map(|rl| self.download_remote_layer(rl))
729 1 : .collect::<futures::stream::FuturesUnordered<_>>();
730 1 :
731 1 : let mut failed = 0;
732 :
733 : loop {
734 4 : tokio::select! {
735 : _ = cancel.cancelled() => anyhow::bail!("Cancelled while downloading remote layers"),
736 4 : res = downloads.next() => {
737 : match res {
738 : Some(Ok(())) => {},
739 : Some(Err(e)) => {
740 0 : warn!("Downloading remote layer for compaction failed: {e:#}");
741 : failed += 1;
742 : }
743 : None => break,
744 : }
745 : }
746 : }
747 : }
748 :
749 1 : if failed != 0 {
750 0 : anyhow::bail!("{failed} out of {total} layers failed to download, retrying later");
751 1 : }
752 :
753 : // if everything downloaded fine, lets try again
754 : }
755 :
756 0 : unreachable!("retry loop exits")
757 1391 : }
758 :
759 : /// Compaction which might need to be retried after downloading remote layers.
760 1402 : async fn compact_inner(self: &Arc<Self>, ctx: &RequestContext) -> Result<(), CompactionError> {
761 : //
762 : // High level strategy for compaction / image creation:
763 : //
764 : // 1. First, calculate the desired "partitioning" of the
765 : // currently in-use key space. The goal is to partition the
766 : // key space into roughly fixed-size chunks, but also take into
767 : // account any existing image layers, and try to align the
768 : // chunk boundaries with the existing image layers to avoid
769 : // too much churn. Also try to align chunk boundaries with
770 : // relation boundaries. In principle, we don't know about
771 : // relation boundaries here, we just deal with key-value
772 : // pairs, and the code in pgdatadir_mapping.rs knows how to
773 : // map relations into key-value pairs. But in practice we know
774 : // that 'field6' is the block number, and the fields 1-5
775 : // identify a relation. This is just an optimization,
776 : // though.
777 : //
778 : // 2. Once we know the partitioning, for each partition,
779 : // decide if it's time to create a new image layer. The
780 : // criteria is: there has been too much "churn" since the last
781 : // image layer? The "churn" is fuzzy concept, it's a
782 : // combination of too many delta files, or too much WAL in
783 : // total in the delta file. Or perhaps: if creating an image
784 : // file would allow to delete some older files.
785 : //
786 : // 3. After that, we compact all level0 delta files if there
787 : // are too many of them. While compacting, we also garbage
788 : // collect any page versions that are no longer needed because
789 : // of the new image layers we created in step 2.
790 : //
791 : // TODO: This high level strategy hasn't been implemented yet.
792 : // Below are functions compact_level0() and create_image_layers()
793 : // but they are a bit ad hoc and don't quite work like it's explained
794 : // above. Rewrite it.
795 1402 : let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
796 : // Is the timeline being deleted?
797 1401 : if self.is_stopping() {
798 0 : trace!("Dropping out of compaction on timeline shutdown");
799 0 : return Err(CompactionError::ShuttingDown);
800 1401 : }
801 1401 :
802 1401 : let target_file_size = self.get_checkpoint_distance();
803 1401 :
804 1401 : // Define partitioning schema if needed
805 1401 :
806 1401 : match self
807 1401 : .repartition(
808 1401 : self.get_last_record_lsn(),
809 1401 : self.get_compaction_target_size(),
810 1401 : ctx,
811 1401 : )
812 383346 : .await
813 : {
814 1399 : Ok((partitioning, lsn)) => {
815 1399 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
816 1399 : let image_ctx = RequestContextBuilder::extend(ctx)
817 1399 : .access_stats_behavior(AccessStatsBehavior::Skip)
818 1399 : .build();
819 :
820 : // 2. Create new image layers for partitions that have been modified
821 : // "enough".
822 1399 : let layer_paths_to_upload = self
823 1399 : .create_image_layers(&partitioning, lsn, false, &image_ctx)
824 429215 : .await
825 1398 : .map_err(anyhow::Error::from)?;
826 1398 : if let Some(remote_client) = &self.remote_client {
827 1655 : for (path, layer_metadata) in layer_paths_to_upload {
828 1015 : remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
829 : }
830 758 : }
831 :
832 : // 3. Compact
833 1398 : let timer = self.metrics.compact_time_histo.start_timer();
834 1398 : self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx)
835 265683 : .await?;
836 1390 : timer.stop_and_record();
837 : }
838 0 : Err(err) => {
839 : // no partitioning? This is normal, if the timeline was just created
840 : // as an empty timeline. Also in unit tests, when we use the timeline
841 : // as a simple key-value store, ignoring the datadir layout. Log the
842 : // error but continue.
843 0 : error!("could not compact, repartitioning keyspace failed: {err:?}");
844 : }
845 : };
846 :
847 1390 : Ok(())
848 1392 : }
849 :
850 : /// Mutate the timeline with a [`TimelineWriter`].
851 74268456 : pub async fn writer(&self) -> TimelineWriter<'_> {
852 : TimelineWriter {
853 74268522 : tl: self,
854 74268522 : _write_guard: self.write_lock.lock().await,
855 : }
856 74268522 : }
857 :
858 : /// Retrieve current logical size of the timeline.
859 : ///
860 : /// The size could be lagging behind the actual number, in case
861 : /// the initial size calculation has not been run (gets triggered on the first size access).
862 : ///
863 : /// return size and boolean flag that shows if the size is exact
864 736545 : pub fn get_current_logical_size(
865 736545 : self: &Arc<Self>,
866 736545 : ctx: &RequestContext,
867 736545 : ) -> anyhow::Result<(u64, bool)> {
868 736545 : let current_size = self.current_logical_size.current_size()?;
869 736545 : debug!("Current size: {current_size:?}");
870 :
871 736545 : let mut is_exact = true;
872 736545 : let size = current_size.size();
873 562 : if let (CurrentLogicalSize::Approximate(_), Some(initial_part_end)) =
874 736545 : (current_size, self.current_logical_size.initial_part_end)
875 562 : {
876 562 : is_exact = false;
877 562 : self.try_spawn_size_init_task(initial_part_end, ctx);
878 735983 : }
879 :
880 736545 : Ok((size, is_exact))
881 736545 : }
882 :
883 : /// Check if more than 'checkpoint_distance' of WAL has been accumulated in
884 : /// the in-memory layer, and initiate flushing it if so.
885 : ///
886 : /// Also flush after a period of time without new data -- it helps
887 : /// safekeepers to regard pageserver as caught up and suspend activity.
888 734162 : pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
889 734162 : let last_lsn = self.get_last_record_lsn();
890 733761 : let open_layer_size = {
891 734162 : let guard = self.layers.read().await;
892 734162 : let layers = guard.layer_map();
893 734162 : let Some(open_layer) = layers.open_layer.as_ref() else {
894 401 : return Ok(());
895 : };
896 733761 : open_layer.size().await?
897 : };
898 733761 : let last_freeze_at = self.last_freeze_at.load();
899 733761 : let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
900 733761 : let distance = last_lsn.widening_sub(last_freeze_at);
901 733761 : // Checkpointing the open layer can be triggered by layer size or LSN range.
902 733761 : // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
903 733761 : // we want to stay below that with a big margin. The LSN distance determines how
904 733761 : // much WAL the safekeepers need to store.
905 733761 : if distance >= self.get_checkpoint_distance().into()
906 731746 : || open_layer_size > self.get_checkpoint_distance()
907 728806 : || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
908 : {
909 4955 : info!(
910 4955 : "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
911 4955 : distance,
912 4955 : open_layer_size,
913 4955 : last_freeze_ts.elapsed()
914 4955 : );
915 :
916 4955 : self.freeze_inmem_layer(true).await;
917 4955 : self.last_freeze_at.store(last_lsn);
918 4955 : *(self.last_freeze_ts.write().unwrap()) = Instant::now();
919 4955 :
920 4955 : // Wake up the layer flusher
921 4955 : self.flush_frozen_layers();
922 728806 : }
923 733761 : Ok(())
924 734162 : }
925 :
926 1190 : pub fn activate(
927 1190 : self: &Arc<Self>,
928 1190 : broker_client: BrokerClientChannel,
929 1190 : background_jobs_can_start: Option<&completion::Barrier>,
930 1190 : ctx: &RequestContext,
931 1190 : ) {
932 1190 : self.launch_wal_receiver(ctx, broker_client);
933 1190 : self.set_state(TimelineState::Active);
934 1190 : self.launch_eviction_task(background_jobs_can_start);
935 1190 : }
936 :
937 1052 : #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
938 : pub async fn shutdown(self: &Arc<Self>, freeze_and_flush: bool) {
939 : debug_assert_current_span_has_tenant_and_timeline_id();
940 :
941 : // prevent writes to the InMemoryLayer
942 : task_mgr::shutdown_tasks(
943 : Some(TaskKind::WalReceiverManager),
944 : Some(self.tenant_id),
945 : Some(self.timeline_id),
946 : )
947 : .await;
948 :
949 : // now all writers to InMemory layer are gone, do the final flush if requested
950 : if freeze_and_flush {
951 : match self.freeze_and_flush().await {
952 : Ok(()) => {}
953 : Err(e) => {
954 78 : warn!("failed to freeze and flush: {e:#}");
955 : return; // TODO: should probably drain remote timeline client anyways?
956 : }
957 : }
958 :
959 : // drain the upload queue
960 : let res = if let Some(client) = self.remote_client.as_ref() {
961 : // if we did not wait for completion here, it might be our shutdown process
962 : // didn't wait for remote uploads to complete at all, as new tasks can forever
963 : // be spawned.
964 : //
965 : // what is problematic is the shutting down of RemoteTimelineClient, because
966 : // obviously it does not make sense to stop while we wait for it, but what
967 : // about corner cases like s3 suddenly hanging up?
968 : client.wait_completion().await
969 : } else {
970 : Ok(())
971 : };
972 :
973 : if let Err(e) = res {
974 0 : warn!("failed to await for frozen and flushed uploads: {e:#}");
975 : }
976 : }
977 : }
978 :
979 : pub fn set_state(&self, new_state: TimelineState) {
980 2090 : match (self.current_state(), new_state) {
981 2090 : (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
982 170 : info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
983 : }
984 0 : (st, TimelineState::Loading) => {
985 0 : error!("ignoring transition from {st:?} into Loading state");
986 : }
987 19 : (TimelineState::Broken { .. }, new_state) => {
988 19 : error!("Ignoring state update {new_state:?} for broken timeline");
989 : }
990 : (TimelineState::Stopping, TimelineState::Active) => {
991 0 : error!("Not activating a Stopping timeline");
992 : }
993 1901 : (_, new_state) => {
994 1331 : if matches!(
995 1901 : new_state,
996 : TimelineState::Stopping | TimelineState::Broken { .. }
997 570 : ) {
998 570 : // drop the completion guard, if any; it might be holding off the completion
999 570 : // forever needlessly
1000 570 : self.initial_logical_size_attempt
1001 570 : .lock()
1002 570 : .unwrap_or_else(|e| e.into_inner())
1003 570 : .take();
1004 1331 : }
1005 1901 : self.state.send_replace(new_state);
1006 : }
1007 : }
1008 2090 : }
1009 :
1010 44 : pub fn set_broken(&self, reason: String) {
1011 44 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
1012 44 : let broken_state = TimelineState::Broken {
1013 44 : reason,
1014 44 : backtrace: backtrace_str,
1015 44 : };
1016 44 : self.set_state(broken_state)
1017 44 : }
1018 :
1019 2016136 : pub fn current_state(&self) -> TimelineState {
1020 2016136 : self.state.borrow().clone()
1021 2016136 : }
1022 :
1023 745 : pub fn is_broken(&self) -> bool {
1024 745 : matches!(&*self.state.borrow(), TimelineState::Broken { .. })
1025 745 : }
1026 :
1027 1302676 : pub fn is_active(&self) -> bool {
1028 1302676 : self.current_state() == TimelineState::Active
1029 1302676 : }
1030 :
1031 2547 : pub fn is_stopping(&self) -> bool {
1032 2547 : self.current_state() == TimelineState::Stopping
1033 2547 : }
1034 :
1035 1592 : pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
1036 1592 : self.state.subscribe()
1037 1592 : }
1038 :
1039 1069673 : pub async fn wait_to_become_active(
1040 1069673 : &self,
1041 1069673 : _ctx: &RequestContext, // Prepare for use by cancellation
1042 1069674 : ) -> Result<(), TimelineState> {
1043 1069674 : let mut receiver = self.state.subscribe();
1044 1069690 : loop {
1045 1069690 : let current_state = receiver.borrow().clone();
1046 1069690 : match current_state {
1047 : TimelineState::Loading => {
1048 16 : receiver
1049 16 : .changed()
1050 16 : .await
1051 16 : .expect("holding a reference to self");
1052 : }
1053 : TimelineState::Active { .. } => {
1054 1069672 : return Ok(());
1055 : }
1056 : TimelineState::Broken { .. } | TimelineState::Stopping => {
1057 : // There's no chance the timeline can transition back into ::Active
1058 2 : return Err(current_state);
1059 : }
1060 : }
1061 : }
1062 1069674 : }
1063 :
1064 101 : pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
1065 101 : let guard = self.layers.read().await;
1066 101 : let layer_map = guard.layer_map();
1067 101 : let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
1068 101 : if let Some(open_layer) = &layer_map.open_layer {
1069 23 : in_memory_layers.push(open_layer.info());
1070 78 : }
1071 101 : for frozen_layer in &layer_map.frozen_layers {
1072 0 : in_memory_layers.push(frozen_layer.info());
1073 0 : }
1074 :
1075 101 : let mut historic_layers = Vec::new();
1076 2273 : for historic_layer in layer_map.iter_historic_layers() {
1077 2273 : let historic_layer = guard.get_from_desc(&historic_layer);
1078 2273 : historic_layers.push(historic_layer.info(reset));
1079 2273 : }
1080 :
1081 101 : LayerMapInfo {
1082 101 : in_memory_layers,
1083 101 : historic_layers,
1084 101 : }
1085 101 : }
1086 :
1087 24 : #[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))]
1088 : pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
1089 : let Some(layer) = self.find_layer(layer_file_name).await else {
1090 : return Ok(None);
1091 : };
1092 : let Some(remote_layer) = layer.downcast_remote_layer() else {
1093 : return Ok(Some(false));
1094 : };
1095 : if self.remote_client.is_none() {
1096 : return Ok(Some(false));
1097 : }
1098 :
1099 : self.download_remote_layer(remote_layer).await?;
1100 : Ok(Some(true))
1101 : }
1102 :
1103 : /// Like [`evict_layer_batch`](Self::evict_layer_batch), but for just one layer.
1104 : /// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
1105 25 : pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
1106 25 : let Some(local_layer) = self.find_layer(layer_file_name).await else {
1107 0 : return Ok(None);
1108 : };
1109 25 : let remote_client = self
1110 25 : .remote_client
1111 25 : .as_ref()
1112 25 : .ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?;
1113 :
1114 25 : let cancel = CancellationToken::new();
1115 25 : let results = self
1116 25 : .evict_layer_batch(remote_client, &[local_layer], cancel)
1117 2 : .await?;
1118 25 : assert_eq!(results.len(), 1);
1119 25 : let result: Option<Result<(), EvictionError>> = results.into_iter().next().unwrap();
1120 25 : match result {
1121 0 : None => anyhow::bail!("task_mgr shutdown requested"),
1122 25 : Some(Ok(())) => Ok(Some(true)),
1123 0 : Some(Err(e)) => Err(anyhow::Error::new(e)),
1124 : }
1125 25 : }
1126 :
1127 : /// Evict a batch of layers.
1128 : ///
1129 : /// GenericRemoteStorage reference is required as a (witness)[witness_article] for "remote storage is configured."
1130 : ///
1131 : /// [witness_article]: https://willcrichton.net/rust-api-type-patterns/witnesses.html
1132 11 : pub(crate) async fn evict_layers(
1133 11 : &self,
1134 11 : _: &GenericRemoteStorage,
1135 11 : layers_to_evict: &[Arc<dyn PersistentLayer>],
1136 11 : cancel: CancellationToken,
1137 11 : ) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
1138 11 : let remote_client = self.remote_client.clone().expect(
1139 11 : "GenericRemoteStorage is configured, so timeline must have RemoteTimelineClient",
1140 11 : );
1141 11 :
1142 11 : self.evict_layer_batch(&remote_client, layers_to_evict, cancel)
1143 0 : .await
1144 11 : }
1145 :
1146 : /// Evict multiple layers at once, continuing through errors.
1147 : ///
1148 : /// Try to evict the given `layers_to_evict` by
1149 : ///
1150 : /// 1. Replacing the given layer object in the layer map with a corresponding [`RemoteLayer`] object.
1151 : /// 2. Deleting the now unreferenced layer file from disk.
1152 : ///
1153 : /// The `remote_client` should be this timeline's `self.remote_client`.
1154 : /// We make the caller provide it so that they are responsible for handling the case
1155 : /// where someone wants to evict the layer but no remote storage is configured.
1156 : ///
1157 : /// Returns either `Err()` or `Ok(results)` where `results.len() == layers_to_evict.len()`.
1158 : /// If `Err()` is returned, no eviction was attempted.
1159 : /// Each position of `Ok(results)` corresponds to the layer in `layers_to_evict`.
1160 : /// Meaning of each `result[i]`:
1161 : /// - `Some(Err(...))` if layer replacement failed for an unexpected reason
1162 : /// - `Some(Ok(true))` if everything went well.
1163 : /// - `Some(Ok(false))` if there was an expected reason why the layer could not be replaced, e.g.:
1164 : /// - evictee was not yet downloaded
1165 : /// - replacement failed for an expectable reason (e.g., layer removed by GC before we grabbed all locks)
1166 : /// - `None` if no eviction attempt was made for the layer because `cancel.is_cancelled() == true`.
1167 67 : async fn evict_layer_batch(
1168 67 : &self,
1169 67 : remote_client: &Arc<RemoteTimelineClient>,
1170 67 : layers_to_evict: &[Arc<dyn PersistentLayer>],
1171 67 : cancel: CancellationToken,
1172 67 : ) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
1173 67 : // ensure that the layers have finished uploading
1174 67 : // (don't hold the layer_removal_cs while we do it, we're not removing anything yet)
1175 67 : remote_client
1176 67 : .wait_completion()
1177 2 : .await
1178 67 : .context("wait for layer upload ops to complete")?;
1179 :
1180 : // now lock out layer removal (compaction, gc, timeline deletion)
1181 67 : let layer_removal_guard = self.layer_removal_cs.lock().await;
1182 :
1183 : {
1184 : // to avoid racing with detach and delete_timeline
1185 67 : let state = self.current_state();
1186 67 : anyhow::ensure!(
1187 67 : state == TimelineState::Active,
1188 0 : "timeline is not active but {state:?}"
1189 : );
1190 : }
1191 :
1192 : // start the batch update
1193 67 : let mut guard = self.layers.write().await;
1194 67 : let mut results = Vec::with_capacity(layers_to_evict.len());
1195 :
1196 210 : for l in layers_to_evict.iter() {
1197 210 : let res = if cancel.is_cancelled() {
1198 0 : None
1199 : } else {
1200 210 : Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut guard))
1201 : };
1202 210 : results.push(res);
1203 : }
1204 :
1205 : // commit the updates & release locks
1206 67 : drop_wlock(guard);
1207 67 : drop(layer_removal_guard);
1208 67 :
1209 67 : assert_eq!(results.len(), layers_to_evict.len());
1210 67 : Ok(results)
1211 67 : }
1212 :
1213 210 : fn evict_layer_batch_impl(
1214 210 : &self,
1215 210 : _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
1216 210 : local_layer: &Arc<dyn PersistentLayer>,
1217 210 : layer_mgr: &mut LayerManager,
1218 210 : ) -> Result<(), EvictionError> {
1219 210 : if local_layer.is_remote_layer() {
1220 0 : return Err(EvictionError::CannotEvictRemoteLayer);
1221 210 : }
1222 210 :
1223 210 : let layer_file_size = local_layer.layer_desc().file_size;
1224 :
1225 210 : let local_layer_mtime = local_layer
1226 210 : .local_path()
1227 210 : .expect("local layer should have a local path")
1228 210 : .metadata()
1229 210 : // when the eviction fails because we have already deleted the layer in compaction for
1230 210 : // example, a NotFound error bubbles up from here.
1231 210 : .map_err(|e| {
1232 1 : if e.kind() == std::io::ErrorKind::NotFound {
1233 1 : EvictionError::FileNotFound
1234 : } else {
1235 0 : EvictionError::StatFailed(e)
1236 : }
1237 210 : })?
1238 209 : .modified()
1239 209 : .map_err(EvictionError::StatFailed)?;
1240 :
1241 209 : let local_layer_residence_duration =
1242 209 : match SystemTime::now().duration_since(local_layer_mtime) {
1243 0 : Err(e) => {
1244 0 : warn!(layer = %local_layer, "layer mtime is in the future: {}", e);
1245 0 : None
1246 : }
1247 209 : Ok(delta) => Some(delta),
1248 : };
1249 :
1250 209 : let layer_metadata = LayerFileMetadata::new(layer_file_size, self.generation);
1251 :
1252 209 : let new_remote_layer = Arc::new(match local_layer.filename() {
1253 6 : LayerFileName::Image(image_name) => RemoteLayer::new_img(
1254 6 : self.tenant_id,
1255 6 : self.timeline_id,
1256 6 : &image_name,
1257 6 : &layer_metadata,
1258 6 : local_layer
1259 6 : .access_stats()
1260 6 : .clone_for_residence_change(LayerResidenceStatus::Evicted),
1261 6 : ),
1262 203 : LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
1263 203 : self.tenant_id,
1264 203 : self.timeline_id,
1265 203 : &delta_name,
1266 203 : &layer_metadata,
1267 203 : local_layer
1268 203 : .access_stats()
1269 203 : .clone_for_residence_change(LayerResidenceStatus::Evicted),
1270 203 : ),
1271 : });
1272 :
1273 209 : assert_eq!(local_layer.layer_desc(), new_remote_layer.layer_desc());
1274 :
1275 209 : layer_mgr
1276 209 : .replace_and_verify(local_layer.clone(), new_remote_layer)
1277 209 : .map_err(EvictionError::LayerNotFound)?;
1278 :
1279 208 : if let Err(e) = local_layer.delete_resident_layer_file() {
1280 : // this should never happen, because of layer_removal_cs usage and above stat
1281 : // access for mtime
1282 0 : error!("failed to remove layer file on evict after replacement: {e:#?}");
1283 208 : }
1284 : // Always decrement the physical size gauge, even if we failed to delete the file.
1285 : // Rationale: we already replaced the layer with a remote layer in the layer map,
1286 : // and any subsequent download_remote_layer will
1287 : // 1. overwrite the file on disk and
1288 : // 2. add the downloaded size to the resident size gauge.
1289 : //
1290 : // If there is no re-download, and we restart the pageserver, then load_layer_map
1291 : // will treat the file as a local layer again, count it towards resident size,
1292 : // and it'll be like the layer removal never happened.
1293 : // The bump in resident size is perhaps unexpected but overall a robust behavior.
1294 208 : self.metrics
1295 208 : .resident_physical_size_gauge
1296 208 : .sub(layer_file_size);
1297 208 :
1298 208 : self.metrics.evictions.inc();
1299 :
1300 208 : if let Some(delta) = local_layer_residence_duration {
1301 208 : self.metrics
1302 208 : .evictions_with_low_residence_duration
1303 208 : .read()
1304 208 : .unwrap()
1305 208 : .observe(delta);
1306 208 : info!(layer=%local_layer, residence_millis=delta.as_millis(), "evicted layer after known residence period");
1307 : } else {
1308 0 : info!(layer=%local_layer, "evicted layer after unknown residence period");
1309 : }
1310 :
1311 208 : Ok(())
1312 210 : }
1313 : }
1314 :
1315 0 : #[derive(Debug, thiserror::Error)]
1316 : pub(crate) enum EvictionError {
1317 : #[error("cannot evict a remote layer")]
1318 : CannotEvictRemoteLayer,
1319 : /// Most likely the to-be evicted layer has been deleted by compaction or gc which use the same
1320 : /// locks, so they got to execute before the eviction.
1321 : #[error("file backing the layer has been removed already")]
1322 : FileNotFound,
1323 : #[error("stat failed")]
1324 : StatFailed(#[source] std::io::Error),
1325 : /// In practice, this can be a number of things, but lets assume it means only this.
1326 : ///
1327 : /// This case includes situations such as the Layer was evicted and redownloaded in between,
1328 : /// because the file existed before an replacement attempt was made but now the Layers are
1329 : /// different objects in memory.
1330 : #[error("layer was no longer part of LayerMap")]
1331 : LayerNotFound(#[source] anyhow::Error),
1332 : }
1333 :
1334 : /// Number of times we will compute partition within a checkpoint distance.
1335 : const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
1336 :
1337 : // Private functions
1338 : impl Timeline {
1339 1468302 : fn get_checkpoint_distance(&self) -> u64 {
1340 1468302 : let tenant_conf = self.tenant_conf.read().unwrap();
1341 1468302 : tenant_conf
1342 1468302 : .checkpoint_distance
1343 1468302 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
1344 1468302 : }
1345 :
1346 728806 : fn get_checkpoint_timeout(&self) -> Duration {
1347 728806 : let tenant_conf = self.tenant_conf.read().unwrap();
1348 728806 : tenant_conf
1349 728806 : .checkpoint_timeout
1350 728806 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
1351 728806 : }
1352 :
1353 1438 : fn get_compaction_target_size(&self) -> u64 {
1354 1438 : let tenant_conf = self.tenant_conf.read().unwrap();
1355 1438 : tenant_conf
1356 1438 : .compaction_target_size
1357 1438 : .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
1358 1438 : }
1359 :
1360 1398 : fn get_compaction_threshold(&self) -> usize {
1361 1398 : let tenant_conf = self.tenant_conf.read().unwrap();
1362 1398 : tenant_conf
1363 1398 : .compaction_threshold
1364 1398 : .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
1365 1398 : }
1366 :
1367 16758 : fn get_image_creation_threshold(&self) -> usize {
1368 16758 : let tenant_conf = self.tenant_conf.read().unwrap();
1369 16758 : tenant_conf
1370 16758 : .image_creation_threshold
1371 16758 : .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
1372 16758 : }
1373 :
1374 2062 : fn get_eviction_policy(&self) -> EvictionPolicy {
1375 2062 : let tenant_conf = self.tenant_conf.read().unwrap();
1376 2062 : tenant_conf
1377 2062 : .eviction_policy
1378 2062 : .unwrap_or(self.conf.default_tenant_conf.eviction_policy)
1379 2062 : }
1380 :
1381 1423 : fn get_evictions_low_residence_duration_metric_threshold(
1382 1423 : tenant_conf: &TenantConfOpt,
1383 1423 : default_tenant_conf: &TenantConf,
1384 1423 : ) -> Duration {
1385 1423 : tenant_conf
1386 1423 : .evictions_low_residence_duration_metric_threshold
1387 1423 : .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
1388 1423 : }
1389 :
1390 12272 : fn get_gc_feedback(&self) -> bool {
1391 12272 : let tenant_conf = self.tenant_conf.read().unwrap();
1392 12272 : tenant_conf
1393 12272 : .gc_feedback
1394 12272 : .unwrap_or(self.conf.default_tenant_conf.gc_feedback)
1395 12272 : }
1396 :
1397 29 : pub(super) fn tenant_conf_updated(&self) {
1398 29 : // NB: Most tenant conf options are read by background loops, so,
1399 29 : // changes will automatically be picked up.
1400 29 :
1401 29 : // The threshold is embedded in the metric. So, we need to update it.
1402 29 : {
1403 29 : let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
1404 29 : &self.tenant_conf.read().unwrap(),
1405 29 : &self.conf.default_tenant_conf,
1406 29 : );
1407 29 : let tenant_id_str = self.tenant_id.to_string();
1408 29 : let timeline_id_str = self.timeline_id.to_string();
1409 29 : self.metrics
1410 29 : .evictions_with_low_residence_duration
1411 29 : .write()
1412 29 : .unwrap()
1413 29 : .change_threshold(&tenant_id_str, &timeline_id_str, new_threshold);
1414 29 : }
1415 29 : }
1416 :
1417 : /// Open a Timeline handle.
1418 : ///
1419 : /// Loads the metadata for the timeline into memory, but not the layer map.
1420 : #[allow(clippy::too_many_arguments)]
1421 1394 : pub(super) fn new(
1422 1394 : conf: &'static PageServerConf,
1423 1394 : tenant_conf: Arc<RwLock<TenantConfOpt>>,
1424 1394 : metadata: &TimelineMetadata,
1425 1394 : ancestor: Option<Arc<Timeline>>,
1426 1394 : timeline_id: TimelineId,
1427 1394 : tenant_id: TenantId,
1428 1394 : generation: Generation,
1429 1394 : walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
1430 1394 : resources: TimelineResources,
1431 1394 : pg_version: u32,
1432 1394 : initial_logical_size_can_start: Option<completion::Barrier>,
1433 1394 : initial_logical_size_attempt: Option<completion::Completion>,
1434 1394 : state: TimelineState,
1435 1394 : ) -> Arc<Self> {
1436 1394 : let disk_consistent_lsn = metadata.disk_consistent_lsn();
1437 1394 : let (state, _) = watch::channel(state);
1438 1394 :
1439 1394 : let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
1440 1394 : let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
1441 1394 :
1442 1394 : let tenant_conf_guard = tenant_conf.read().unwrap();
1443 1394 :
1444 1394 : let evictions_low_residence_duration_metric_threshold =
1445 1394 : Self::get_evictions_low_residence_duration_metric_threshold(
1446 1394 : &tenant_conf_guard,
1447 1394 : &conf.default_tenant_conf,
1448 1394 : );
1449 1394 : drop(tenant_conf_guard);
1450 1394 :
1451 1394 : Arc::new_cyclic(|myself| {
1452 1394 : let mut result = Timeline {
1453 1394 : conf,
1454 1394 : tenant_conf,
1455 1394 : myself: myself.clone(),
1456 1394 : timeline_id,
1457 1394 : tenant_id,
1458 1394 : generation,
1459 1394 : pg_version,
1460 1394 : layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())),
1461 1394 : wanted_image_layers: Mutex::new(None),
1462 1394 :
1463 1394 : walredo_mgr,
1464 1394 : walreceiver: Mutex::new(None),
1465 1394 :
1466 1394 : remote_client: resources.remote_client.map(Arc::new),
1467 1394 :
1468 1394 : // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
1469 1394 : last_record_lsn: SeqWait::new(RecordLsn {
1470 1394 : last: disk_consistent_lsn,
1471 1394 : prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
1472 1394 : }),
1473 1394 : disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
1474 1394 :
1475 1394 : last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
1476 1394 : last_freeze_ts: RwLock::new(Instant::now()),
1477 1394 :
1478 1394 : loaded_at: (disk_consistent_lsn, SystemTime::now()),
1479 1394 :
1480 1394 : ancestor_timeline: ancestor,
1481 1394 : ancestor_lsn: metadata.ancestor_lsn(),
1482 1394 :
1483 1394 : metrics: TimelineMetrics::new(
1484 1394 : &tenant_id,
1485 1394 : &timeline_id,
1486 1394 : crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
1487 1394 : "mtime",
1488 1394 : evictions_low_residence_duration_metric_threshold,
1489 1394 : ),
1490 1394 : ),
1491 1394 :
1492 1394 : flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
1493 1394 :
1494 1394 : layer_flush_start_tx,
1495 1394 : layer_flush_done_tx,
1496 1394 :
1497 1394 : write_lock: tokio::sync::Mutex::new(()),
1498 1394 : layer_removal_cs: Default::default(),
1499 1394 :
1500 1394 : gc_info: std::sync::RwLock::new(GcInfo {
1501 1394 : retain_lsns: Vec::new(),
1502 1394 : horizon_cutoff: Lsn(0),
1503 1394 : pitr_cutoff: Lsn(0),
1504 1394 : }),
1505 1394 :
1506 1394 : latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
1507 1394 : initdb_lsn: metadata.initdb_lsn(),
1508 1394 :
1509 1394 : current_logical_size: if disk_consistent_lsn.is_valid() {
1510 : // we're creating timeline data with some layer files existing locally,
1511 : // need to recalculate timeline's logical size based on data in the layers.
1512 710 : LogicalSize::deferred_initial(disk_consistent_lsn)
1513 : } else {
1514 : // we're creating timeline data without any layers existing locally,
1515 : // initial logical size is 0.
1516 684 : LogicalSize::empty_initial()
1517 : },
1518 1394 : partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
1519 1394 : repartition_threshold: 0,
1520 1394 :
1521 1394 : last_received_wal: Mutex::new(None),
1522 1394 : rel_size_cache: RwLock::new(HashMap::new()),
1523 1394 :
1524 1394 : download_all_remote_layers_task_info: RwLock::new(None),
1525 1394 :
1526 1394 : state,
1527 1394 :
1528 1394 : eviction_task_timeline_state: tokio::sync::Mutex::new(
1529 1394 : EvictionTaskTimelineState::default(),
1530 1394 : ),
1531 1394 : delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
1532 1394 :
1533 1394 : initial_logical_size_can_start,
1534 1394 : initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt),
1535 1394 : };
1536 1394 : result.repartition_threshold =
1537 1394 : result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
1538 1394 : result
1539 1394 : .metrics
1540 1394 : .last_record_gauge
1541 1394 : .set(disk_consistent_lsn.0 as i64);
1542 1394 : result
1543 1394 : })
1544 1394 : }
1545 :
1546 2042 : pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
1547 2042 : let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
1548 2042 : match *flush_loop_state {
1549 1364 : FlushLoopState::NotStarted => (),
1550 : FlushLoopState::Running { .. } => {
1551 678 : info!(
1552 678 : "skipping attempt to start flush_loop twice {}/{}",
1553 678 : self.tenant_id, self.timeline_id
1554 678 : );
1555 678 : return;
1556 : }
1557 : FlushLoopState::Exited => {
1558 0 : warn!(
1559 0 : "ignoring attempt to restart exited flush_loop {}/{}",
1560 0 : self.tenant_id, self.timeline_id
1561 0 : );
1562 0 : return;
1563 : }
1564 : }
1565 :
1566 1364 : let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
1567 1364 : let self_clone = Arc::clone(self);
1568 1364 :
1569 1364 : debug!("spawning flush loop");
1570 1364 : *flush_loop_state = FlushLoopState::Running {
1571 1364 : #[cfg(test)]
1572 1364 : expect_initdb_optimization: false,
1573 1364 : #[cfg(test)]
1574 1364 : initdb_optimization_count: 0,
1575 1364 : };
1576 1364 : task_mgr::spawn(
1577 1364 : task_mgr::BACKGROUND_RUNTIME.handle(),
1578 1364 : task_mgr::TaskKind::LayerFlushTask,
1579 1364 : Some(self.tenant_id),
1580 1364 : Some(self.timeline_id),
1581 1364 : "layer flush task",
1582 : false,
1583 1364 : async move {
1584 1364 : let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
1585 23419 : self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
1586 527 : let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
1587 527 : assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
1588 527 : *flush_loop_state = FlushLoopState::Exited;
1589 527 : Ok(())
1590 527 : }
1591 1364 : .instrument(info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))
1592 : );
1593 2042 : }
1594 :
1595 : /// Creates and starts the wal receiver.
1596 : ///
1597 : /// This function is expected to be called at most once per Timeline's lifecycle
1598 : /// when the timeline is activated.
1599 1190 : fn launch_wal_receiver(
1600 1190 : self: &Arc<Self>,
1601 1190 : ctx: &RequestContext,
1602 1190 : broker_client: BrokerClientChannel,
1603 1190 : ) {
1604 1190 : info!(
1605 1190 : "launching WAL receiver for timeline {} of tenant {}",
1606 1190 : self.timeline_id, self.tenant_id
1607 1190 : );
1608 :
1609 1190 : let tenant_conf_guard = self.tenant_conf.read().unwrap();
1610 1190 : let wal_connect_timeout = tenant_conf_guard
1611 1190 : .walreceiver_connect_timeout
1612 1190 : .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
1613 1190 : let lagging_wal_timeout = tenant_conf_guard
1614 1190 : .lagging_wal_timeout
1615 1190 : .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
1616 1190 : let max_lsn_wal_lag = tenant_conf_guard
1617 1190 : .max_lsn_wal_lag
1618 1190 : .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
1619 1190 : drop(tenant_conf_guard);
1620 1190 :
1621 1190 : let mut guard = self.walreceiver.lock().unwrap();
1622 1190 : assert!(
1623 1190 : guard.is_none(),
1624 0 : "multiple launches / re-launches of WAL receiver are not supported"
1625 : );
1626 1190 : *guard = Some(WalReceiver::start(
1627 1190 : Arc::clone(self),
1628 1190 : WalReceiverConf {
1629 1190 : wal_connect_timeout,
1630 1190 : lagging_wal_timeout,
1631 1190 : max_lsn_wal_lag,
1632 1190 : auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
1633 1190 : availability_zone: self.conf.availability_zone.clone(),
1634 1190 : },
1635 1190 : broker_client,
1636 1190 : ctx,
1637 1190 : ));
1638 1190 : }
1639 :
1640 : /// Initialize with an empty layer map. Used when creating a new timeline.
1641 1042 : pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
1642 1042 : let mut layers = self.layers.try_write().expect(
1643 1042 : "in the context where we call this function, no other task has access to the object",
1644 1042 : );
1645 1042 : layers.initialize_empty(Lsn(start_lsn.0));
1646 1042 : }
1647 :
1648 : /// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
1649 : /// files.
1650 326 : pub(super) async fn load_layer_map(
1651 326 : &self,
1652 326 : disk_consistent_lsn: Lsn,
1653 326 : index_part: Option<IndexPart>,
1654 326 : ) -> anyhow::Result<()> {
1655 : use init::{Decision::*, Discovered, FutureLayer};
1656 : use LayerFileName::*;
1657 :
1658 326 : let mut guard = self.layers.write().await;
1659 :
1660 326 : let timer = self.metrics.load_layer_map_histo.start_timer();
1661 326 :
1662 326 : // Scan timeline directory and create ImageFileName and DeltaFilename
1663 326 : // structs representing all files on disk
1664 326 : let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id);
1665 326 : let (conf, tenant_id, timeline_id) = (self.conf, self.tenant_id, self.timeline_id);
1666 326 : let span = tracing::Span::current();
1667 326 :
1668 326 : // Copy to move into the task we're about to spawn
1669 326 : let generation = self.generation;
1670 :
1671 326 : let (loaded_layers, to_sync, total_physical_size) = tokio::task::spawn_blocking({
1672 326 : move || {
1673 326 : let _g = span.entered();
1674 326 : let discovered = init::scan_timeline_dir(&timeline_path)?;
1675 326 : let mut discovered_layers = Vec::with_capacity(discovered.len());
1676 326 : let mut unrecognized_files = Vec::new();
1677 326 :
1678 326 : let mut path = timeline_path;
1679 :
1680 4590 : for discovered in discovered {
1681 4264 : let (name, kind) = match discovered {
1682 3850 : Discovered::Layer(file_name, file_size) => {
1683 3850 : discovered_layers.push((file_name, file_size));
1684 3850 : continue;
1685 : }
1686 : Discovered::Metadata | Discovered::IgnoredBackup => {
1687 326 : continue;
1688 : }
1689 0 : Discovered::Unknown(file_name) => {
1690 0 : // we will later error if there are any
1691 0 : unrecognized_files.push(file_name);
1692 0 : continue;
1693 : }
1694 83 : Discovered::Ephemeral(name) => (name, "old ephemeral file"),
1695 5 : Discovered::Temporary(name) => (name, "temporary timeline file"),
1696 0 : Discovered::TemporaryDownload(name) => (name, "temporary download"),
1697 : };
1698 88 : path.push(name);
1699 88 : init::cleanup(&path, kind)?;
1700 88 : path.pop();
1701 : }
1702 :
1703 326 : if !unrecognized_files.is_empty() {
1704 : // assume that if there are any there are many many.
1705 0 : let n = unrecognized_files.len();
1706 0 : let first = &unrecognized_files[..n.min(10)];
1707 0 : anyhow::bail!(
1708 0 : "unrecognized files in timeline dir (total {n}), first 10: {first:?}"
1709 0 : );
1710 326 : }
1711 326 :
1712 326 : let decided = init::reconcile(
1713 326 : discovered_layers,
1714 326 : index_part.as_ref(),
1715 326 : disk_consistent_lsn,
1716 326 : generation,
1717 326 : );
1718 326 :
1719 326 : let mut loaded_layers = Vec::new();
1720 326 : let mut needs_upload = Vec::new();
1721 326 : let mut needs_cleanup = Vec::new();
1722 326 : let mut total_physical_size = 0;
1723 :
1724 6667 : for (name, decision) in decided {
1725 6340 : let decision = match decision {
1726 1 : Ok(UseRemote { local, remote }) => {
1727 1 : path.push(name.file_name());
1728 1 : init::cleanup_local_file_for_remote(&path, &local, &remote)?;
1729 1 : path.pop();
1730 1 :
1731 1 : UseRemote { local, remote }
1732 : }
1733 6339 : Ok(decision) => decision,
1734 1 : Err(FutureLayer { local }) => {
1735 1 : if local.is_some() {
1736 1 : path.push(name.file_name());
1737 1 : init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
1738 1 : path.pop();
1739 0 : }
1740 1 : needs_cleanup.push(name);
1741 1 : continue;
1742 : }
1743 : };
1744 :
1745 6340 : match &name {
1746 5374 : Delta(d) => assert!(d.lsn_range.end <= disk_consistent_lsn + 1),
1747 966 : Image(i) => assert!(i.lsn <= disk_consistent_lsn),
1748 : }
1749 :
1750 6340 : let status = match &decision {
1751 3848 : UseLocal(_) | NeedsUpload(_) => LayerResidenceStatus::Resident,
1752 2492 : Evicted(_) | UseRemote { .. } => LayerResidenceStatus::Evicted,
1753 : };
1754 :
1755 6340 : let stats = LayerAccessStats::for_loading_layer(status);
1756 :
1757 6340 : let layer: Arc<dyn PersistentLayer> = match (name, &decision) {
1758 834 : (Delta(d), UseLocal(m) | NeedsUpload(m)) => {
1759 3830 : total_physical_size += m.file_size();
1760 3830 : Arc::new(DeltaLayer::new(
1761 3830 : conf,
1762 3830 : timeline_id,
1763 3830 : tenant_id,
1764 3830 : &d,
1765 3830 : m.file_size(),
1766 3830 : stats,
1767 3830 : ))
1768 : }
1769 16 : (Image(i), UseLocal(m) | NeedsUpload(m)) => {
1770 18 : total_physical_size += m.file_size();
1771 18 : Arc::new(ImageLayer::new(
1772 18 : conf,
1773 18 : timeline_id,
1774 18 : tenant_id,
1775 18 : &i,
1776 18 : m.file_size(),
1777 18 : stats,
1778 18 : ))
1779 : }
1780 1544 : (Delta(d), Evicted(remote) | UseRemote { remote, .. }) => Arc::new(
1781 1544 : RemoteLayer::new_delta(tenant_id, timeline_id, &d, remote, stats),
1782 1544 : ),
1783 948 : (Image(i), Evicted(remote) | UseRemote { remote, .. }) => Arc::new(
1784 948 : RemoteLayer::new_img(tenant_id, timeline_id, &i, remote, stats),
1785 948 : ),
1786 : };
1787 :
1788 6340 : if let NeedsUpload(m) = decision {
1789 850 : needs_upload.push((layer.clone(), m));
1790 5490 : }
1791 :
1792 6340 : loaded_layers.push(layer);
1793 : }
1794 326 : Ok((
1795 326 : loaded_layers,
1796 326 : (needs_upload, needs_cleanup),
1797 326 : total_physical_size,
1798 326 : ))
1799 326 : }
1800 326 : })
1801 324 : .await
1802 326 : .map_err(anyhow::Error::new)
1803 326 : .and_then(|x| x)?;
1804 :
1805 326 : let num_layers = loaded_layers.len();
1806 326 :
1807 326 : guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
1808 :
1809 326 : if let Some(rtc) = self.remote_client.as_ref() {
1810 179 : let (needs_upload, needs_cleanup) = to_sync;
1811 565 : for (layer, m) in needs_upload {
1812 386 : rtc.schedule_layer_file_upload(&layer.layer_desc().filename(), &m)?;
1813 : }
1814 179 : rtc.schedule_layer_file_deletion(&needs_cleanup)?;
1815 179 : rtc.schedule_index_upload_for_file_changes()?;
1816 : // Tenant::create_timeline will wait for these uploads to happen before returning, or
1817 : // on retry.
1818 147 : }
1819 :
1820 326 : info!(
1821 326 : "loaded layer map with {} layers at {}, total physical size: {}",
1822 326 : num_layers, disk_consistent_lsn, total_physical_size
1823 326 : );
1824 326 : self.metrics
1825 326 : .resident_physical_size_gauge
1826 326 : .set(total_physical_size);
1827 326 :
1828 326 : timer.stop_and_record();
1829 326 : Ok(())
1830 326 : }
1831 :
1832 562 : fn try_spawn_size_init_task(self: &Arc<Self>, lsn: Lsn, ctx: &RequestContext) {
1833 562 : let state = self.current_state();
1834 545 : if matches!(
1835 562 : state,
1836 : TimelineState::Broken { .. } | TimelineState::Stopping
1837 : ) {
1838 : // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
1839 17 : return;
1840 545 : }
1841 :
1842 545 : let permit = match Arc::clone(&self.current_logical_size.initial_size_computation)
1843 545 : .try_acquire_owned()
1844 : {
1845 370 : Ok(permit) => permit,
1846 : Err(TryAcquireError::NoPermits) => {
1847 : // computation already ongoing or finished with success
1848 175 : return;
1849 : }
1850 0 : Err(TryAcquireError::Closed) => unreachable!("we never call close"),
1851 : };
1852 370 : debug_assert!(self
1853 370 : .current_logical_size
1854 370 : .initial_logical_size
1855 370 : .get()
1856 370 : .is_none());
1857 :
1858 370 : info!(
1859 370 : "spawning logical size computation from context of task kind {:?}",
1860 370 : ctx.task_kind()
1861 370 : );
1862 : // We need to start the computation task.
1863 : // It gets a separate context since it will outlive the request that called this function.
1864 370 : let self_clone = Arc::clone(self);
1865 370 : let background_ctx = ctx.detached_child(
1866 370 : TaskKind::InitialLogicalSizeCalculation,
1867 370 : DownloadBehavior::Download,
1868 370 : );
1869 370 : task_mgr::spawn(
1870 370 : task_mgr::BACKGROUND_RUNTIME.handle(),
1871 370 : task_mgr::TaskKind::InitialLogicalSizeCalculation,
1872 370 : Some(self.tenant_id),
1873 370 : Some(self.timeline_id),
1874 370 : "initial size calculation",
1875 370 : false,
1876 370 : // NB: don't log errors here, task_mgr will do that.
1877 370 : async move {
1878 370 :
1879 370 : let cancel = task_mgr::shutdown_token();
1880 :
1881 : // in case we were created during pageserver initialization, wait for
1882 : // initialization to complete before proceeding. startup time init runs on the same
1883 : // runtime.
1884 370 : tokio::select! {
1885 371 : _ = cancel.cancelled() => { return Ok(()); },
1886 371 : _ = completion::Barrier::maybe_wait(self_clone.initial_logical_size_can_start.clone()) => {}
1887 371 : };
1888 :
1889 : // hold off background tasks from starting until all timelines get to try at least
1890 : // once initial logical size calculation; though retry will rarely be useful.
1891 : // holding off is done because heavier tasks execute blockingly on the same
1892 : // runtime.
1893 : //
1894 : // dropping this at every outcome is probably better than trying to cling on to it,
1895 : // delay will be terminated by a timeout regardless.
1896 370 : let _completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() };
1897 370 :
1898 370 : // no extra cancellation here, because nothing really waits for this to complete compared
1899 370 : // to spawn_ondemand_logical_size_calculation.
1900 370 : let cancel = CancellationToken::new();
1901 :
1902 370 : let calculated_size = match self_clone
1903 370 : .logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx, cancel)
1904 139210 : .await
1905 : {
1906 361 : Ok(s) => s,
1907 : Err(CalculateLogicalSizeError::Cancelled) => {
1908 : // Don't make noise, this is a common task.
1909 : // In the unlikely case that there is another call to this function, we'll retry
1910 : // because initial_logical_size is still None.
1911 1 : info!("initial size calculation cancelled, likely timeline delete / tenant detach");
1912 1 : return Ok(());
1913 : }
1914 4 : Err(CalculateLogicalSizeError::Other(err)) => {
1915 1 : if let Some(e @ PageReconstructError::AncestorStopping(_)) =
1916 4 : err.root_cause().downcast_ref()
1917 : {
1918 : // This can happen if the timeline parent timeline switches to
1919 : // Stopping state while we're still calculating the initial
1920 : // timeline size for the child, for example if the tenant is
1921 : // being detached or the pageserver is shut down. Like with
1922 : // CalculateLogicalSizeError::Cancelled, don't make noise.
1923 1 : info!("initial size calculation failed because the timeline or its ancestor is Stopping, likely because the tenant is being detached: {e:#}");
1924 1 : return Ok(());
1925 3 : }
1926 3 : return Err(err.context("Failed to calculate logical size"));
1927 : }
1928 : };
1929 :
1930 : // we cannot query current_logical_size.current_size() to know the current
1931 : // *negative* value, only truncated to u64.
1932 361 : let added = self_clone
1933 361 : .current_logical_size
1934 361 : .size_added_after_initial
1935 361 : .load(AtomicOrdering::Relaxed);
1936 361 :
1937 361 : let sum = calculated_size.saturating_add_signed(added);
1938 361 :
1939 361 : // set the gauge value before it can be set in `update_current_logical_size`.
1940 361 : self_clone.metrics.current_logical_size_gauge.set(sum);
1941 361 :
1942 361 : match self_clone
1943 361 : .current_logical_size
1944 361 : .initial_logical_size
1945 361 : .set(calculated_size)
1946 : {
1947 361 : Ok(()) => (),
1948 0 : Err(_what_we_just_attempted_to_set) => {
1949 0 : let existing_size = self_clone
1950 0 : .current_logical_size
1951 0 : .initial_logical_size
1952 0 : .get()
1953 0 : .expect("once_cell set was lost, then get failed, impossible.");
1954 0 : // This shouldn't happen because the semaphore is initialized with 1.
1955 0 : // But if it happens, just complain & report success so there are no further retries.
1956 0 : error!("Tried to update initial timeline size value to {calculated_size}, but the size was already set to {existing_size}, not changing")
1957 : }
1958 : }
1959 : // now that `initial_logical_size.is_some()`, reduce permit count to 0
1960 : // so that we prevent future callers from spawning this task
1961 361 : permit.forget();
1962 361 : Ok(())
1963 370 : }.in_current_span(),
1964 370 : );
1965 562 : }
1966 :
1967 32 : pub fn spawn_ondemand_logical_size_calculation(
1968 32 : self: &Arc<Self>,
1969 32 : lsn: Lsn,
1970 32 : cause: LogicalSizeCalculationCause,
1971 32 : ctx: RequestContext,
1972 32 : cancel: CancellationToken,
1973 32 : ) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
1974 32 : let (sender, receiver) = oneshot::channel();
1975 32 : let self_clone = Arc::clone(self);
1976 32 : // XXX if our caller loses interest, i.e., ctx is cancelled,
1977 32 : // we should stop the size calculation work and return an error.
1978 32 : // That would require restructuring this function's API to
1979 32 : // return the result directly, instead of a Receiver for the result.
1980 32 : let ctx = ctx.detached_child(
1981 32 : TaskKind::OndemandLogicalSizeCalculation,
1982 32 : DownloadBehavior::Download,
1983 32 : );
1984 32 : task_mgr::spawn(
1985 32 : task_mgr::BACKGROUND_RUNTIME.handle(),
1986 32 : task_mgr::TaskKind::OndemandLogicalSizeCalculation,
1987 32 : Some(self.tenant_id),
1988 32 : Some(self.timeline_id),
1989 32 : "ondemand logical size calculation",
1990 32 : false,
1991 32 : async move {
1992 32 : let res = self_clone
1993 32 : .logical_size_calculation_task(lsn, cause, &ctx, cancel)
1994 1349 : .await;
1995 32 : let _ = sender.send(res).ok();
1996 32 : Ok(()) // Receiver is responsible for handling errors
1997 32 : }
1998 32 : .in_current_span(),
1999 32 : );
2000 32 : receiver
2001 32 : }
2002 :
2003 1608 : #[instrument(skip_all)]
2004 : async fn logical_size_calculation_task(
2005 : self: &Arc<Self>,
2006 : lsn: Lsn,
2007 : cause: LogicalSizeCalculationCause,
2008 : ctx: &RequestContext,
2009 : cancel: CancellationToken,
2010 : ) -> Result<u64, CalculateLogicalSizeError> {
2011 : span::debug_assert_current_span_has_tenant_and_timeline_id();
2012 :
2013 : let mut timeline_state_updates = self.subscribe_for_state_updates();
2014 : let self_calculation = Arc::clone(self);
2015 :
2016 402 : let mut calculation = pin!(async {
2017 402 : let cancel = cancel.child_token();
2018 402 : let ctx = ctx.attached_child();
2019 402 : self_calculation
2020 402 : .calculate_logical_size(lsn, cause, cancel, &ctx)
2021 140559 : .await
2022 398 : });
2023 400 : let timeline_state_cancellation = async {
2024 : loop {
2025 140297 : match timeline_state_updates.changed().await {
2026 : Ok(()) => {
2027 0 : let new_state = timeline_state_updates.borrow().clone();
2028 0 : match new_state {
2029 : // we're running this job for active timelines only
2030 0 : TimelineState::Active => continue,
2031 : TimelineState::Broken { .. }
2032 : | TimelineState::Stopping
2033 : | TimelineState::Loading => {
2034 0 : break format!("aborted because timeline became inactive (new state: {new_state:?})")
2035 : }
2036 : }
2037 : }
2038 0 : Err(_sender_dropped_error) => {
2039 0 : // can't happen, the sender is not dropped as long as the Timeline exists
2040 0 : break "aborted because state watch was dropped".to_string();
2041 : }
2042 : }
2043 : }
2044 0 : };
2045 :
2046 401 : let taskmgr_shutdown_cancellation = async {
2047 140425 : task_mgr::shutdown_watcher().await;
2048 1 : "aborted because task_mgr shutdown requested".to_string()
2049 1 : };
2050 :
2051 : loop {
2052 140961 : tokio::select! {
2053 397 : res = &mut calculation => { return res }
2054 0 : reason = timeline_state_cancellation => {
2055 0 : debug!(reason = reason, "cancelling calculation");
2056 : cancel.cancel();
2057 : return calculation.await;
2058 : }
2059 1 : reason = taskmgr_shutdown_cancellation => {
2060 0 : debug!(reason = reason, "cancelling calculation");
2061 : cancel.cancel();
2062 : return calculation.await;
2063 : }
2064 : }
2065 : }
2066 : }
2067 :
2068 : /// Calculate the logical size of the database at the latest LSN.
2069 : ///
2070 : /// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
2071 : /// especially if we need to download remote layers.
2072 408 : pub async fn calculate_logical_size(
2073 408 : &self,
2074 408 : up_to_lsn: Lsn,
2075 408 : cause: LogicalSizeCalculationCause,
2076 408 : cancel: CancellationToken,
2077 408 : ctx: &RequestContext,
2078 408 : ) -> Result<u64, CalculateLogicalSizeError> {
2079 408 : info!(
2080 408 : "Calculating logical size for timeline {} at {}",
2081 408 : self.timeline_id, up_to_lsn
2082 408 : );
2083 : // These failpoints are used by python tests to ensure that we don't delete
2084 : // the timeline while the logical size computation is ongoing.
2085 : // The first failpoint is used to make this function pause.
2086 : // Then the python test initiates timeline delete operation in a thread.
2087 : // It waits for a few seconds, then arms the second failpoint and disables
2088 : // the first failpoint. The second failpoint prints an error if the timeline
2089 : // delete code has deleted the on-disk state while we're still running here.
2090 : // It shouldn't do that. If it does it anyway, the error will be caught
2091 : // by the test suite, highlighting the problem.
2092 408 : fail::fail_point!("timeline-calculate-logical-size-pause");
2093 408 : fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
2094 2 : if !self
2095 2 : .conf
2096 2 : .metadata_path(&self.tenant_id, &self.timeline_id)
2097 2 : .exists()
2098 : {
2099 0 : error!("timeline-calculate-logical-size-pre metadata file does not exist")
2100 2 : }
2101 : // need to return something
2102 2 : Ok(0)
2103 408 : });
2104 : // See if we've already done the work for initial size calculation.
2105 : // This is a short-cut for timelines that are mostly unused.
2106 406 : if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
2107 4 : return Ok(size);
2108 402 : }
2109 402 : let storage_time_metrics = match cause {
2110 : LogicalSizeCalculationCause::Initial
2111 : | LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
2112 390 : | LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
2113 : LogicalSizeCalculationCause::EvictionTaskImitation => {
2114 12 : &self.metrics.imitate_logical_size_histo
2115 : }
2116 : };
2117 402 : let timer = storage_time_metrics.start_timer();
2118 402 : let logical_size = self
2119 402 : .get_current_logical_size_non_incremental(up_to_lsn, cancel, ctx)
2120 140934 : .await?;
2121 0 : debug!("calculated logical size: {logical_size}");
2122 393 : timer.stop_and_record();
2123 393 : Ok(logical_size)
2124 404 : }
2125 :
2126 : /// Update current logical size, adding `delta' to the old value.
2127 1094354 : fn update_current_logical_size(&self, delta: i64) {
2128 1094354 : let logical_size = &self.current_logical_size;
2129 1094354 : logical_size.increment_size(delta);
2130 1094354 :
2131 1094354 : // Also set the value in the prometheus gauge. Note that
2132 1094354 : // there is a race condition here: if this is is called by two
2133 1094354 : // threads concurrently, the prometheus gauge might be set to
2134 1094354 : // one value while current_logical_size is set to the
2135 1094354 : // other.
2136 1094354 : match logical_size.current_size() {
2137 1093225 : Ok(CurrentLogicalSize::Exact(new_current_size)) => self
2138 1093225 : .metrics
2139 1093225 : .current_logical_size_gauge
2140 1093225 : .set(new_current_size),
2141 1129 : Ok(CurrentLogicalSize::Approximate(_)) => {
2142 1129 : // don't update the gauge yet, this allows us not to update the gauge back and
2143 1129 : // forth between the initial size calculation task.
2144 1129 : }
2145 : // this is overflow
2146 0 : Err(e) => error!("Failed to compute current logical size for metrics update: {e:?}"),
2147 : }
2148 1094354 : }
2149 :
2150 31 : async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
2151 31 : let guard = self.layers.read().await;
2152 83 : for historic_layer in guard.layer_map().iter_historic_layers() {
2153 83 : let historic_layer_name = historic_layer.filename().file_name();
2154 83 : if layer_file_name == historic_layer_name {
2155 31 : return Some(guard.get_from_desc(&historic_layer));
2156 52 : }
2157 : }
2158 :
2159 0 : None
2160 31 : }
2161 : }
2162 :
2163 : type TraversalId = String;
2164 :
2165 : trait TraversalLayerExt {
2166 : fn traversal_id(&self) -> TraversalId;
2167 : }
2168 :
2169 : impl TraversalLayerExt for Arc<dyn PersistentLayer> {
2170 995 : fn traversal_id(&self) -> TraversalId {
2171 995 : let timeline_id = self.layer_desc().timeline_id;
2172 995 : match self.local_path() {
2173 2 : Some(local_path) => {
2174 2 : debug_assert!(local_path.to_str().unwrap().contains(&format!("{}", timeline_id)),
2175 0 : "need timeline ID to uniquely identify the layer when traversal crosses ancestor boundary",
2176 : );
2177 2 : format!("{}", local_path.display())
2178 : }
2179 : None => {
2180 993 : format!("remote {}/{self}", timeline_id)
2181 : }
2182 : }
2183 995 : }
2184 : }
2185 :
2186 : impl TraversalLayerExt for Arc<InMemoryLayer> {
2187 2 : fn traversal_id(&self) -> TraversalId {
2188 2 : format!("timeline {} in-memory {self}", self.get_timeline_id())
2189 2 : }
2190 : }
2191 :
2192 : impl Timeline {
2193 : ///
2194 : /// Get a handle to a Layer for reading.
2195 : ///
2196 : /// The returned Layer might be from an ancestor timeline, if the
2197 : /// segment hasn't been updated on this timeline yet.
2198 : ///
2199 : /// This function takes the current timeline's locked LayerMap as an argument,
2200 : /// so callers can avoid potential race conditions.
2201 7262258 : async fn get_reconstruct_data(
2202 7262258 : &self,
2203 7262258 : key: Key,
2204 7262258 : request_lsn: Lsn,
2205 7262258 : reconstruct_state: &mut ValueReconstructState,
2206 7262258 : ctx: &RequestContext,
2207 7262260 : ) -> Result<(), PageReconstructError> {
2208 7262260 : // Start from the current timeline.
2209 7262260 : let mut timeline_owned;
2210 7262260 : let mut timeline = self;
2211 7262260 :
2212 7262260 : let mut read_count = scopeguard::guard(0, |cnt| {
2213 7262224 : crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
2214 7262260 : });
2215 7262260 :
2216 7262260 : // For debugging purposes, collect the path of layers that we traversed
2217 7262260 : // through. It's included in the error message if we fail to find the key.
2218 7262260 : let mut traversal_path = Vec::<TraversalPathItem>::new();
2219 :
2220 7262260 : let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
2221 1740405 : *cached_lsn
2222 : } else {
2223 5521855 : Lsn(0)
2224 : };
2225 :
2226 : // 'prev_lsn' tracks the last LSN that we were at in our search. It's used
2227 : // to check that each iteration make some progress, to break infinite
2228 : // looping if something goes wrong.
2229 7262260 : let mut prev_lsn = Lsn(u64::MAX);
2230 7262260 :
2231 7262260 : let mut result = ValueReconstructResult::Continue;
2232 7262260 : let mut cont_lsn = Lsn(request_lsn.0 + 1);
2233 :
2234 60385848 : 'outer: loop {
2235 60385848 : // The function should have updated 'state'
2236 60385848 : //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
2237 60385848 : match result {
2238 5627647 : ValueReconstructResult::Complete => return Ok(()),
2239 : ValueReconstructResult::Continue => {
2240 : // If we reached an earlier cached page image, we're done.
2241 54758199 : if cont_lsn == cached_lsn + 1 {
2242 1634558 : MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
2243 1634558 : return Ok(());
2244 53123641 : }
2245 53123641 : if prev_lsn <= cont_lsn {
2246 : // Didn't make any progress in last iteration. Error out to avoid
2247 : // getting stuck in the loop.
2248 0 : return Err(layer_traversal_error(format!(
2249 0 : "could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
2250 0 : key,
2251 0 : Lsn(cont_lsn.0 - 1),
2252 0 : request_lsn,
2253 0 : timeline.ancestor_lsn
2254 0 : ), traversal_path));
2255 53123641 : }
2256 53123641 : prev_lsn = cont_lsn;
2257 : }
2258 : ValueReconstructResult::Missing => {
2259 : return Err(layer_traversal_error(
2260 2 : if cfg!(test) {
2261 2 : format!(
2262 2 : "could not find data for key {} at LSN {}, for request at LSN {}\n{}",
2263 2 : key, cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
2264 2 : )
2265 : } else {
2266 0 : format!(
2267 0 : "could not find data for key {} at LSN {}, for request at LSN {}",
2268 0 : key, cont_lsn, request_lsn
2269 0 : )
2270 : },
2271 2 : traversal_path,
2272 : ));
2273 : }
2274 : }
2275 :
2276 : // Recurse into ancestor if needed
2277 53123641 : if Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
2278 0 : trace!(
2279 0 : "going into ancestor {}, cont_lsn is {}",
2280 0 : timeline.ancestor_lsn,
2281 0 : cont_lsn
2282 0 : );
2283 1068484 : let ancestor = match timeline.get_ancestor_timeline() {
2284 1068484 : Ok(timeline) => timeline,
2285 0 : Err(e) => return Err(PageReconstructError::from(e)),
2286 : };
2287 :
2288 : // It's possible that the ancestor timeline isn't active yet, or
2289 : // is active but hasn't yet caught up to the branch point. Wait
2290 : // for it.
2291 : //
2292 : // This cannot happen while the pageserver is running normally,
2293 : // because you cannot create a branch from a point that isn't
2294 : // present in the pageserver yet. However, we don't wait for the
2295 : // branch point to be uploaded to cloud storage before creating
2296 : // a branch. I.e., the branch LSN need not be remote consistent
2297 : // for the branching operation to succeed.
2298 : //
2299 : // Hence, if we try to load a tenant in such a state where
2300 : // 1. the existence of the branch was persisted (in IndexPart and/or locally)
2301 : // 2. but the ancestor state is behind branch_lsn because it was not yet persisted
2302 : // then we will need to wait for the ancestor timeline to
2303 : // re-stream WAL up to branch_lsn before we access it.
2304 : //
2305 : // How can a tenant get in such a state?
2306 : // - ungraceful pageserver process exit
2307 : // - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
2308 : //
2309 : // NB: this could be avoided by requiring
2310 : // branch_lsn >= remote_consistent_lsn
2311 : // during branch creation.
2312 1068484 : match ancestor.wait_to_become_active(ctx).await {
2313 1068482 : Ok(()) => {}
2314 2 : Err(state) if state == TimelineState::Stopping => {
2315 1 : return Err(PageReconstructError::AncestorStopping(ancestor.timeline_id));
2316 : }
2317 1 : Err(state) => {
2318 1 : return Err(PageReconstructError::Other(anyhow::anyhow!(
2319 1 : "Timeline {} will not become active. Current state: {:?}",
2320 1 : ancestor.timeline_id,
2321 1 : &state,
2322 1 : )));
2323 : }
2324 : }
2325 1068482 : ancestor
2326 1068482 : .wait_lsn(timeline.ancestor_lsn, ctx)
2327 10 : .await
2328 1068482 : .with_context(|| {
2329 0 : format!(
2330 0 : "wait for lsn {} on ancestor timeline_id={}",
2331 0 : timeline.ancestor_lsn, ancestor.timeline_id
2332 0 : )
2333 1068482 : })?;
2334 :
2335 1068482 : timeline_owned = ancestor;
2336 1068482 : timeline = &*timeline_owned;
2337 1068482 : prev_lsn = Lsn(u64::MAX);
2338 1068482 : continue 'outer;
2339 52055157 : }
2340 :
2341 : #[allow(clippy::never_loop)] // see comment at bottom of this loop
2342 : 'layer_map_search: loop {
2343 994 : let remote_layer = {
2344 52056143 : let guard = timeline.layers.read().await;
2345 52056115 : let layers = guard.layer_map();
2346 :
2347 : // Check the open and frozen in-memory layers first, in order from newest
2348 : // to oldest.
2349 52056115 : if let Some(open_layer) = &layers.open_layer {
2350 47011144 : let start_lsn = open_layer.get_lsn_range().start;
2351 47011144 : if cont_lsn > start_lsn {
2352 : //info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
2353 : // Get all the data needed to reconstruct the page version from this layer.
2354 : // But if we have an older cached page image, no need to go past that.
2355 5990712 : let lsn_floor = max(cached_lsn + 1, start_lsn);
2356 5990712 : result = match open_layer
2357 5990712 : .get_value_reconstruct_data(
2358 5990712 : key,
2359 5990712 : lsn_floor..cont_lsn,
2360 5990712 : reconstruct_state,
2361 5990712 : ctx,
2362 5990712 : )
2363 347548 : .await
2364 : {
2365 5990711 : Ok(result) => result,
2366 0 : Err(e) => return Err(PageReconstructError::from(e)),
2367 : };
2368 5990711 : cont_lsn = lsn_floor;
2369 5990711 : // metrics: open_layer does not count as fs access, so we are not updating `read_count`
2370 5990711 : traversal_path.push((
2371 5990711 : result,
2372 5990711 : cont_lsn,
2373 5990711 : Box::new({
2374 5990711 : let open_layer = Arc::clone(open_layer);
2375 5990711 : move || open_layer.traversal_id()
2376 5990711 : }),
2377 5990711 : ));
2378 5990711 : continue 'outer;
2379 41020432 : }
2380 5044971 : }
2381 46065403 : for frozen_layer in layers.frozen_layers.iter().rev() {
2382 2038486 : let start_lsn = frozen_layer.get_lsn_range().start;
2383 2038486 : if cont_lsn > start_lsn {
2384 : //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
2385 257399 : let lsn_floor = max(cached_lsn + 1, start_lsn);
2386 257399 : result = match frozen_layer
2387 257399 : .get_value_reconstruct_data(
2388 257399 : key,
2389 257399 : lsn_floor..cont_lsn,
2390 257399 : reconstruct_state,
2391 257399 : ctx,
2392 257399 : )
2393 4230 : .await
2394 : {
2395 257399 : Ok(result) => result,
2396 0 : Err(e) => return Err(PageReconstructError::from(e)),
2397 : };
2398 257399 : cont_lsn = lsn_floor;
2399 257399 : // metrics: open_layer does not count as fs access, so we are not updating `read_count`
2400 257399 : traversal_path.push((
2401 257399 : result,
2402 257399 : cont_lsn,
2403 257399 : Box::new({
2404 257399 : let frozen_layer = Arc::clone(frozen_layer);
2405 257399 : move || frozen_layer.traversal_id()
2406 257399 : }),
2407 257399 : ));
2408 257399 : continue 'outer;
2409 1781087 : }
2410 : }
2411 :
2412 45808004 : if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
2413 45696390 : let layer = guard.get_from_desc(&layer);
2414 : // If it's a remote layer, download it and retry.
2415 994 : if let Some(remote_layer) =
2416 45696390 : super::storage_layer::downcast_remote_layer(&layer)
2417 : {
2418 : // TODO: push a breadcrumb to 'traversal_path' to record the fact that
2419 : // we downloaded / would need to download this layer.
2420 994 : remote_layer // download happens outside the scope of `layers` guard object
2421 : } else {
2422 : // Get all the data needed to reconstruct the page version from this layer.
2423 : // But if we have an older cached page image, no need to go past that.
2424 45695396 : let lsn_floor = max(cached_lsn + 1, lsn_floor);
2425 45695396 : result = match layer
2426 45695396 : .get_value_reconstruct_data(
2427 45695396 : key,
2428 45695396 : lsn_floor..cont_lsn,
2429 45695396 : reconstruct_state,
2430 45695396 : ctx,
2431 45695396 : )
2432 1375502 : .await
2433 : {
2434 45695382 : Ok(result) => result,
2435 11 : Err(e) => return Err(PageReconstructError::from(e)),
2436 : };
2437 45695382 : cont_lsn = lsn_floor;
2438 45695382 : *read_count += 1;
2439 45695382 : traversal_path.push((
2440 45695382 : result,
2441 45695382 : cont_lsn,
2442 45695382 : Box::new({
2443 45695382 : let layer = Arc::clone(&layer);
2444 45695382 : move || layer.traversal_id()
2445 45695382 : }),
2446 45695382 : ));
2447 45695382 : continue 'outer;
2448 : }
2449 111614 : } else if timeline.ancestor_timeline.is_some() {
2450 : // Nothing on this timeline. Traverse to parent
2451 111614 : result = ValueReconstructResult::Continue;
2452 111614 : cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
2453 111614 : continue 'outer;
2454 : } else {
2455 : // Nothing found
2456 0 : result = ValueReconstructResult::Missing;
2457 0 : continue 'outer;
2458 : }
2459 : };
2460 : // Download the remote_layer and replace it in the layer map.
2461 : // For that, we need to release the mutex. Otherwise, we'd deadlock.
2462 : //
2463 : // The control flow is so weird here because `drop(layers)` inside
2464 : // the if stmt above is not enough for current rustc: it requires
2465 : // that the layers lock guard is not in scope across the download
2466 : // await point.
2467 994 : let remote_layer_as_persistent: Arc<dyn PersistentLayer> =
2468 994 : Arc::clone(&remote_layer) as Arc<dyn PersistentLayer>;
2469 994 : let id = remote_layer_as_persistent.traversal_id();
2470 993 : info!(
2471 993 : "need remote layer {} for task kind {:?}",
2472 993 : id,
2473 993 : ctx.task_kind()
2474 993 : );
2475 :
2476 : // The next layer doesn't exist locally. Need to download it.
2477 : // (The control flow is a bit complicated here because we must drop the 'layers'
2478 : // lock before awaiting on the Future.)
2479 993 : match (
2480 993 : ctx.download_behavior(),
2481 993 : self.conf.ondemand_download_behavior_treat_error_as_warn,
2482 993 : ) {
2483 : (DownloadBehavior::Download, _) => {
2484 993 : info!(
2485 993 : "on-demand downloading remote layer {id} for task kind {:?}",
2486 993 : ctx.task_kind()
2487 993 : );
2488 1388 : timeline.download_remote_layer(remote_layer).await?;
2489 986 : continue 'layer_map_search;
2490 : }
2491 : (DownloadBehavior::Warn, _) | (DownloadBehavior::Error, true) => {
2492 0 : warn!(
2493 0 : "unexpectedly on-demand downloading remote layer {} for task kind {:?}",
2494 0 : id,
2495 0 : ctx.task_kind()
2496 0 : );
2497 0 : UNEXPECTED_ONDEMAND_DOWNLOADS.inc();
2498 0 : timeline.download_remote_layer(remote_layer).await?;
2499 0 : continue 'layer_map_search;
2500 : }
2501 : (DownloadBehavior::Error, false) => {
2502 0 : return Err(PageReconstructError::NeedsDownload(
2503 0 : TenantTimelineId::new(self.tenant_id, self.timeline_id),
2504 0 : remote_layer.filename(),
2505 0 : ))
2506 : }
2507 : }
2508 : }
2509 : }
2510 7262224 : }
2511 :
2512 7262300 : async fn lookup_cached_page(&self, key: &Key, lsn: Lsn) -> Option<(Lsn, Bytes)> {
2513 7262300 : let cache = page_cache::get();
2514 :
2515 : // FIXME: It's pointless to check the cache for things that are not 8kB pages.
2516 : // We should look at the key to determine if it's a cacheable object
2517 7262300 : let (lsn, read_guard) = cache
2518 7262300 : .lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn)
2519 5521855 : .await?;
2520 1740445 : let img = Bytes::from(read_guard.to_vec());
2521 1740445 : Some((lsn, img))
2522 7262300 : }
2523 :
2524 1068483 : fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
2525 1068483 : let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
2526 0 : format!(
2527 0 : "Ancestor is missing. Timeline id: {} Ancestor id {:?}",
2528 0 : self.timeline_id,
2529 0 : self.get_ancestor_timeline_id(),
2530 0 : )
2531 1068483 : })?;
2532 1068483 : Ok(Arc::clone(ancestor))
2533 1068483 : }
2534 :
2535 : ///
2536 : /// Get a handle to the latest layer for appending.
2537 : ///
2538 82563785 : async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
2539 82563872 : let mut guard = self.layers.write().await;
2540 82563863 : let layer = guard.get_layer_for_write(
2541 82563863 : lsn,
2542 82563863 : self.get_last_record_lsn(),
2543 82563863 : self.conf,
2544 82563863 : self.timeline_id,
2545 82563863 : self.tenant_id,
2546 82563863 : )?;
2547 82563863 : Ok(layer)
2548 82563863 : }
2549 :
2550 82545991 : async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
2551 : //info!("PUT: key {} at {}", key, lsn);
2552 82546078 : let layer = self.get_layer_for_write(lsn).await?;
2553 82546069 : layer.put_value(key, lsn, val).await?;
2554 82546066 : Ok(())
2555 82546066 : }
2556 :
2557 17794 : async fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
2558 17794 : let layer = self.get_layer_for_write(lsn).await?;
2559 17794 : layer.put_tombstone(key_range, lsn).await?;
2560 17794 : Ok(())
2561 17794 : }
2562 :
2563 74268468 : fn finish_write(&self, new_lsn: Lsn) {
2564 74268468 : assert!(new_lsn.is_aligned());
2565 :
2566 74268468 : self.metrics.last_record_gauge.set(new_lsn.0 as i64);
2567 74268468 : self.last_record_lsn.advance(new_lsn);
2568 74268468 : }
2569 :
2570 6753 : async fn freeze_inmem_layer(&self, write_lock_held: bool) {
2571 : // Freeze the current open in-memory layer. It will be written to disk on next
2572 : // iteration.
2573 6753 : let _write_guard = if write_lock_held {
2574 4955 : None
2575 : } else {
2576 1798 : Some(self.write_lock.lock().await)
2577 : };
2578 6753 : let mut guard = self.layers.write().await;
2579 6753 : guard
2580 6753 : .try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at)
2581 28 : .await;
2582 6753 : }
2583 :
2584 : /// Layer flusher task's main loop.
2585 1364 : async fn flush_loop(
2586 1364 : self: &Arc<Self>,
2587 1364 : mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
2588 1364 : ctx: &RequestContext,
2589 1364 : ) {
2590 1364 : info!("started flush loop");
2591 : loop {
2592 7304 : tokio::select! {
2593 13416 : _ = task_mgr::shutdown_watcher() => {
2594 13416 : info!("shutting down layer flush task");
2595 13416 : break;
2596 13416 : },
2597 13416 : _ = layer_flush_start_rx.changed() => {}
2598 13416 : }
2599 :
2600 0 : trace!("waking up");
2601 5945 : let timer = self.metrics.flush_time_histo.start_timer();
2602 5945 : let flush_counter = *layer_flush_start_rx.borrow();
2603 5940 : let result = loop {
2604 12454 : let layer_to_flush = {
2605 12454 : let guard = self.layers.read().await;
2606 12454 : guard.layer_map().frozen_layers.front().cloned()
2607 : // drop 'layers' lock to allow concurrent reads and writes
2608 : };
2609 12454 : let Some(layer_to_flush) = layer_to_flush else {
2610 5940 : break Ok(());
2611 : };
2612 15251 : if let Err(err) = self.flush_frozen_layer(layer_to_flush, ctx).await {
2613 0 : error!("could not flush frozen layer: {err:?}");
2614 0 : break Err(err);
2615 6509 : }
2616 : };
2617 : // Notify any listeners that we're done
2618 5940 : let _ = self
2619 5940 : .layer_flush_done_tx
2620 5940 : .send_replace((flush_counter, result));
2621 5940 :
2622 5940 : timer.stop_and_record();
2623 : }
2624 527 : }
2625 :
2626 1798 : async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
2627 1798 : let mut rx = self.layer_flush_done_tx.subscribe();
2628 1798 :
2629 1798 : // Increment the flush cycle counter and wake up the flush task.
2630 1798 : // Remember the new value, so that when we listen for the flush
2631 1798 : // to finish, we know when the flush that we initiated has
2632 1798 : // finished, instead of some other flush that was started earlier.
2633 1798 : let mut my_flush_request = 0;
2634 1798 :
2635 1798 : let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
2636 1798 : if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
2637 78 : anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
2638 1720 : }
2639 1720 :
2640 1720 : self.layer_flush_start_tx.send_modify(|counter| {
2641 1720 : my_flush_request = *counter + 1;
2642 1720 : *counter = my_flush_request;
2643 1720 : });
2644 :
2645 3440 : loop {
2646 3440 : {
2647 3440 : let (last_result_counter, last_result) = &*rx.borrow();
2648 3440 : if *last_result_counter >= my_flush_request {
2649 1720 : if let Err(_err) = last_result {
2650 : // We already logged the original error in
2651 : // flush_loop. We cannot propagate it to the caller
2652 : // here, because it might not be Cloneable
2653 0 : anyhow::bail!(
2654 0 : "Could not flush frozen layer. Request id: {}",
2655 0 : my_flush_request
2656 0 : );
2657 : } else {
2658 1720 : return Ok(());
2659 : }
2660 1720 : }
2661 : }
2662 0 : trace!("waiting for flush to complete");
2663 1720 : rx.changed().await?;
2664 0 : trace!("done")
2665 : }
2666 1798 : }
2667 :
2668 4955 : fn flush_frozen_layers(&self) {
2669 4955 : self.layer_flush_start_tx.send_modify(|val| *val += 1);
2670 4955 : }
2671 :
2672 : /// Flush one frozen in-memory layer to disk, as a new delta layer.
2673 6514 : #[instrument(skip_all, fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id, layer=%frozen_layer))]
2674 : async fn flush_frozen_layer(
2675 : self: &Arc<Self>,
2676 : frozen_layer: Arc<InMemoryLayer>,
2677 : ctx: &RequestContext,
2678 : ) -> anyhow::Result<()> {
2679 : // As a special case, when we have just imported an image into the repository,
2680 : // instead of writing out a L0 delta layer, we directly write out image layer
2681 : // files instead. This is possible as long as *all* the data imported into the
2682 : // repository have the same LSN.
2683 : let lsn_range = frozen_layer.get_lsn_range();
2684 : let (layer_paths_to_upload, delta_layer_to_add) =
2685 : if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
2686 : #[cfg(test)]
2687 : match &mut *self.flush_loop_state.lock().unwrap() {
2688 : FlushLoopState::NotStarted | FlushLoopState::Exited => {
2689 : panic!("flush loop not running")
2690 : }
2691 : FlushLoopState::Running {
2692 : initdb_optimization_count,
2693 : ..
2694 : } => {
2695 : *initdb_optimization_count += 1;
2696 : }
2697 : }
2698 : // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
2699 : // require downloading anything during initial import.
2700 : let (partitioning, _lsn) = self
2701 : .repartition(self.initdb_lsn, self.get_compaction_target_size(), ctx)
2702 : .await?;
2703 : // For image layers, we add them immediately into the layer map.
2704 : (
2705 : self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
2706 : .await?,
2707 : None,
2708 : )
2709 : } else {
2710 : #[cfg(test)]
2711 : match &mut *self.flush_loop_state.lock().unwrap() {
2712 : FlushLoopState::NotStarted | FlushLoopState::Exited => {
2713 : panic!("flush loop not running")
2714 : }
2715 : FlushLoopState::Running {
2716 : expect_initdb_optimization,
2717 : ..
2718 : } => {
2719 : assert!(!*expect_initdb_optimization, "expected initdb optimization");
2720 : }
2721 : }
2722 : // Normal case, write out a L0 delta layer file.
2723 : // `create_delta_layer` will not modify the layer map.
2724 : // We will remove frozen layer and add delta layer in one atomic operation later.
2725 : let layer = self.create_delta_layer(&frozen_layer).await?;
2726 : (
2727 : HashMap::from([(
2728 : layer.filename(),
2729 : LayerFileMetadata::new(layer.layer_desc().file_size, self.generation),
2730 : )]),
2731 : Some(layer),
2732 : )
2733 : };
2734 :
2735 : // The new on-disk layers are now in the layer map. We can remove the
2736 : // in-memory layer from the map now. The flushed layer is stored in
2737 : // the mapping in `create_delta_layer`.
2738 : {
2739 : let mut guard = self.layers.write().await;
2740 :
2741 : if let Some(ref l) = delta_layer_to_add {
2742 : // TODO: move access stats, metrics update, etc. into layer manager.
2743 : l.access_stats().record_residence_event(
2744 : LayerResidenceStatus::Resident,
2745 : LayerResidenceEventReason::LayerCreate,
2746 : );
2747 :
2748 : // update metrics
2749 : let sz = l.layer_desc().file_size;
2750 : self.metrics.resident_physical_size_gauge.add(sz);
2751 : self.metrics.num_persistent_files_created.inc_by(1);
2752 : self.metrics.persistent_bytes_written.inc_by(sz);
2753 : }
2754 :
2755 : guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer);
2756 : // release lock on 'layers'
2757 : }
2758 :
2759 : // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
2760 : // a compaction can delete the file and then it won't be available for uploads any more.
2761 : // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
2762 : // race situation.
2763 : // See https://github.com/neondatabase/neon/issues/4526
2764 6513 : pausable_failpoint!("flush-frozen-pausable");
2765 :
2766 : // This failpoint is used by another test case `test_pageserver_recovery`.
2767 0 : fail_point!("flush-frozen-exit");
2768 :
2769 : // Update the metadata file, with new 'disk_consistent_lsn'
2770 : //
2771 : // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
2772 : // *all* the layers, to avoid fsyncing the file multiple times.
2773 : let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
2774 : let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
2775 :
2776 : // If we were able to advance 'disk_consistent_lsn', save it the metadata file.
2777 : // After crash, we will restart WAL streaming and processing from that point.
2778 : if disk_consistent_lsn != old_disk_consistent_lsn {
2779 : assert!(disk_consistent_lsn > old_disk_consistent_lsn);
2780 : self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload)
2781 : .await
2782 : .context("update_metadata_file")?;
2783 : // Also update the in-memory copy
2784 : self.disk_consistent_lsn.store(disk_consistent_lsn);
2785 : }
2786 : Ok(())
2787 : }
2788 :
2789 : /// Update metadata file
2790 6521 : async fn update_metadata_file(
2791 6521 : &self,
2792 6521 : disk_consistent_lsn: Lsn,
2793 6521 : layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
2794 6521 : ) -> anyhow::Result<()> {
2795 6521 : // We can only save a valid 'prev_record_lsn' value on disk if we
2796 6521 : // flushed *all* in-memory changes to disk. We only track
2797 6521 : // 'prev_record_lsn' in memory for the latest processed record, so we
2798 6521 : // don't remember what the correct value that corresponds to some old
2799 6521 : // LSN is. But if we flush everything, then the value corresponding
2800 6521 : // current 'last_record_lsn' is correct and we can store it on disk.
2801 6521 : let RecordLsn {
2802 6521 : last: last_record_lsn,
2803 6521 : prev: prev_record_lsn,
2804 6521 : } = self.last_record_lsn.load();
2805 6521 : let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
2806 1327 : Some(prev_record_lsn)
2807 : } else {
2808 5194 : None
2809 : };
2810 :
2811 6521 : let ancestor_timeline_id = self
2812 6521 : .ancestor_timeline
2813 6521 : .as_ref()
2814 6521 : .map(|ancestor| ancestor.timeline_id);
2815 6521 :
2816 6521 : let metadata = TimelineMetadata::new(
2817 6521 : disk_consistent_lsn,
2818 6521 : ondisk_prev_record_lsn,
2819 6521 : ancestor_timeline_id,
2820 6521 : self.ancestor_lsn,
2821 6521 : *self.latest_gc_cutoff_lsn.read(),
2822 6521 : self.initdb_lsn,
2823 6521 : self.pg_version,
2824 6521 : );
2825 6521 :
2826 6521 : fail_point!("checkpoint-before-saving-metadata", |x| bail!(
2827 0 : "{}",
2828 0 : x.unwrap()
2829 6521 : ));
2830 :
2831 6521 : save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
2832 0 : .await
2833 6521 : .context("save_metadata")?;
2834 :
2835 6521 : if let Some(remote_client) = &self.remote_client {
2836 7883 : for (path, layer_metadata) in layer_paths_to_upload {
2837 3939 : remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
2838 : }
2839 3944 : remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
2840 2577 : }
2841 :
2842 6521 : Ok(())
2843 6521 : }
2844 :
2845 : // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
2846 : // in layer map immediately. The caller is responsible to put it into the layer map.
2847 6477 : async fn create_delta_layer(
2848 6477 : self: &Arc<Self>,
2849 6477 : frozen_layer: &Arc<InMemoryLayer>,
2850 6477 : ) -> anyhow::Result<DeltaLayer> {
2851 6477 : let span = tracing::info_span!("blocking");
2852 6477 : let new_delta: DeltaLayer = tokio::task::spawn_blocking({
2853 6477 : let _g = span.entered();
2854 6477 : let self_clone = Arc::clone(self);
2855 6477 : let frozen_layer = Arc::clone(frozen_layer);
2856 6477 : move || {
2857 : // Write it out
2858 : // Keep this inside `spawn_blocking` and `Handle::current`
2859 : // as long as the write path is still sync and the read impl
2860 : // is still not fully async. Otherwise executor threads would
2861 : // be blocked.
2862 6477 : let new_delta = Handle::current().block_on(frozen_layer.write_to_disk())?;
2863 6477 : let new_delta_path = new_delta.path();
2864 6477 :
2865 6477 : // Sync it to disk.
2866 6477 : //
2867 6477 : // We must also fsync the timeline dir to ensure the directory entries for
2868 6477 : // new layer files are durable.
2869 6477 : //
2870 6477 : // NB: timeline dir must be synced _after_ the file contents are durable.
2871 6477 : // So, two separate fsyncs are required, they mustn't be batched.
2872 6477 : //
2873 6477 : // TODO: If we're running inside 'flush_frozen_layers' and there are multiple
2874 6477 : // files to flush, the fsync overhead can be reduces as follows:
2875 6477 : // 1. write them all to temporary file names
2876 6477 : // 2. fsync them
2877 6477 : // 3. rename to the final name
2878 6477 : // 4. fsync the parent directory.
2879 6477 : // Note that (1),(2),(3) today happen inside write_to_disk().
2880 6477 : par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
2881 6477 : par_fsync::par_fsync(&[self_clone
2882 6477 : .conf
2883 6477 : .timeline_path(&self_clone.tenant_id, &self_clone.timeline_id)])
2884 6477 : .context("fsync of timeline dir")?;
2885 :
2886 6476 : anyhow::Ok(new_delta)
2887 6477 : }
2888 6477 : })
2889 6476 : .await
2890 6476 : .context("spawn_blocking")??;
2891 :
2892 6476 : Ok(new_delta)
2893 6476 : }
2894 :
2895 1438 : async fn repartition(
2896 1438 : &self,
2897 1438 : lsn: Lsn,
2898 1438 : partition_size: u64,
2899 1438 : ctx: &RequestContext,
2900 1438 : ) -> anyhow::Result<(KeyPartitioning, Lsn)> {
2901 1438 : {
2902 1438 : let partitioning_guard = self.partitioning.lock().unwrap();
2903 1438 : let distance = lsn.0 - partitioning_guard.1 .0;
2904 1438 : if partitioning_guard.1 != Lsn(0) && distance <= self.repartition_threshold {
2905 0 : debug!(
2906 0 : distance,
2907 0 : threshold = self.repartition_threshold,
2908 0 : "no repartitioning needed"
2909 0 : );
2910 778 : return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
2911 660 : }
2912 : }
2913 383412 : let keyspace = self.collect_keyspace(lsn, ctx).await?;
2914 658 : let partitioning = keyspace.partition(partition_size);
2915 658 :
2916 658 : let mut partitioning_guard = self.partitioning.lock().unwrap();
2917 658 : if lsn > partitioning_guard.1 {
2918 658 : *partitioning_guard = (partitioning, lsn);
2919 658 : } else {
2920 0 : warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
2921 : }
2922 658 : Ok((partitioning_guard.0.clone(), partitioning_guard.1))
2923 1436 : }
2924 :
2925 : // Is it time to create a new image layer for the given partition?
2926 16758 : async fn time_for_new_image_layer(
2927 16758 : &self,
2928 16758 : partition: &KeySpace,
2929 16758 : lsn: Lsn,
2930 16758 : ) -> anyhow::Result<bool> {
2931 16758 : let threshold = self.get_image_creation_threshold();
2932 :
2933 16758 : let guard = self.layers.read().await;
2934 16758 : let layers = guard.layer_map();
2935 16758 :
2936 16758 : let mut max_deltas = 0;
2937 16758 : {
2938 16758 : let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
2939 16758 : if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
2940 3398 : let img_range =
2941 3398 : partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
2942 3398 : if wanted.overlaps(&img_range) {
2943 : //
2944 : // gc_timeline only pays attention to image layers that are older than the GC cutoff,
2945 : // but create_image_layers creates image layers at last-record-lsn.
2946 : // So it's possible that gc_timeline wants a new image layer to be created for a key range,
2947 : // but the range is already covered by image layers at more recent LSNs. Before we
2948 : // create a new image layer, check if the range is already covered at more recent LSNs.
2949 0 : if !layers
2950 0 : .image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))?
2951 : {
2952 0 : debug!(
2953 0 : "Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
2954 0 : img_range.start, img_range.end, cutoff_lsn, lsn
2955 0 : );
2956 0 : return Ok(true);
2957 0 : }
2958 3398 : }
2959 13360 : }
2960 : }
2961 :
2962 2063764 : for part_range in &partition.ranges {
2963 2048082 : let image_coverage = layers.image_coverage(part_range, lsn)?;
2964 3979335 : for (img_range, last_img) in image_coverage {
2965 1932329 : let img_lsn = if let Some(last_img) = last_img {
2966 177346 : last_img.get_lsn_range().end
2967 : } else {
2968 1754983 : Lsn(0)
2969 : };
2970 : // Let's consider an example:
2971 : //
2972 : // delta layer with LSN range 71-81
2973 : // delta layer with LSN range 81-91
2974 : // delta layer with LSN range 91-101
2975 : // image layer at LSN 100
2976 : //
2977 : // If 'lsn' is still 100, i.e. no new WAL has been processed since the last image layer,
2978 : // there's no need to create a new one. We check this case explicitly, to avoid passing
2979 : // a bogus range to count_deltas below, with start > end. It's even possible that there
2980 : // are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
2981 : // after we read last_record_lsn, which is passed here in the 'lsn' argument.
2982 1932329 : if img_lsn < lsn {
2983 1915119 : let num_deltas =
2984 1915119 : layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?;
2985 :
2986 1915119 : max_deltas = max_deltas.max(num_deltas);
2987 1915119 : if num_deltas >= threshold {
2988 0 : debug!(
2989 0 : "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
2990 0 : img_range.start, img_range.end, num_deltas, img_lsn, lsn
2991 0 : );
2992 1076 : return Ok(true);
2993 1914043 : }
2994 17210 : }
2995 : }
2996 : }
2997 :
2998 0 : debug!(
2999 0 : max_deltas,
3000 0 : "none of the partitioned ranges had >= {threshold} deltas"
3001 0 : );
3002 15682 : Ok(false)
3003 16758 : }
3004 :
3005 1436 : async fn create_image_layers(
3006 1436 : &self,
3007 1436 : partitioning: &KeyPartitioning,
3008 1436 : lsn: Lsn,
3009 1436 : force: bool,
3010 1436 : ctx: &RequestContext,
3011 1436 : ) -> Result<HashMap<LayerFileName, LayerFileMetadata>, PageReconstructError> {
3012 1436 : let timer = self.metrics.create_images_time_histo.start_timer();
3013 1436 : let mut image_layers: Vec<ImageLayer> = Vec::new();
3014 1436 :
3015 1436 : // We need to avoid holes between generated image layers.
3016 1436 : // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
3017 1436 : // image layer with hole between them. In this case such layer can not be utilized by GC.
3018 1436 : //
3019 1436 : // How such hole between partitions can appear?
3020 1436 : // if we have relation with relid=1 and size 100 and relation with relid=2 with size 200 then result of
3021 1436 : // KeySpace::partition may contain partitions <100000000..100000099> and <200000000..200000199>.
3022 1436 : // If there is delta layer <100000000..300000000> then it never be garbage collected because
3023 1436 : // image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
3024 1436 : let mut start = Key::MIN;
3025 :
3026 16795 : for partition in partitioning.parts.iter() {
3027 16795 : let img_range = start..partition.ranges.last().unwrap().end;
3028 16795 : start = img_range.end;
3029 16795 : if force || self.time_for_new_image_layer(partition, lsn).await? {
3030 1113 : let mut image_layer_writer = ImageLayerWriter::new(
3031 1113 : self.conf,
3032 1113 : self.timeline_id,
3033 1113 : self.tenant_id,
3034 1113 : &img_range,
3035 1113 : lsn,
3036 1113 : )
3037 0 : .await?;
3038 :
3039 1113 : fail_point!("image-layer-writer-fail-before-finish", |_| {
3040 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
3041 0 : "failpoint image-layer-writer-fail-before-finish"
3042 0 : )))
3043 1113 : });
3044 40031 : for range in &partition.ranges {
3045 38919 : let mut key = range.start;
3046 244071 : while key < range.end {
3047 425559 : let img = match self.get(key, lsn, ctx).await {
3048 205153 : Ok(img) => img,
3049 0 : Err(err) => {
3050 0 : // If we fail to reconstruct a VM or FSM page, we can zero the
3051 0 : // page without losing any actual user data. That seems better
3052 0 : // than failing repeatedly and getting stuck.
3053 0 : //
3054 0 : // We had a bug at one point, where we truncated the FSM and VM
3055 0 : // in the pageserver, but the Postgres didn't know about that
3056 0 : // and continued to generate incremental WAL records for pages
3057 0 : // that didn't exist in the pageserver. Trying to replay those
3058 0 : // WAL records failed to find the previous image of the page.
3059 0 : // This special case allows us to recover from that situation.
3060 0 : // See https://github.com/neondatabase/neon/issues/2601.
3061 0 : //
3062 0 : // Unfortunately we cannot do this for the main fork, or for
3063 0 : // any metadata keys, keys, as that would lead to actual data
3064 0 : // loss.
3065 0 : if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
3066 0 : warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
3067 0 : ZERO_PAGE.clone()
3068 : } else {
3069 0 : return Err(err);
3070 : }
3071 : }
3072 : };
3073 205153 : image_layer_writer.put_image(key, &img).await?;
3074 205152 : key = key.next();
3075 : }
3076 : }
3077 1112 : let image_layer = image_layer_writer.finish().await?;
3078 1112 : image_layers.push(image_layer);
3079 15682 : }
3080 : }
3081 : // All layers that the GC wanted us to create have now been created.
3082 : //
3083 : // It's possible that another GC cycle happened while we were compacting, and added
3084 : // something new to wanted_image_layers, and we now clear that before processing it.
3085 : // That's OK, because the next GC iteration will put it back in.
3086 1435 : *self.wanted_image_layers.lock().unwrap() = None;
3087 1435 :
3088 1435 : // Sync the new layer to disk before adding it to the layer map, to make sure
3089 1435 : // we don't garbage collect something based on the new layer, before it has
3090 1435 : // reached the disk.
3091 1435 : //
3092 1435 : // We must also fsync the timeline dir to ensure the directory entries for
3093 1435 : // new layer files are durable
3094 1435 : //
3095 1435 : // Compaction creates multiple image layers. It would be better to create them all
3096 1435 : // and fsync them all in parallel.
3097 1435 : let all_paths = image_layers
3098 1435 : .iter()
3099 1435 : .map(|layer| layer.path())
3100 1435 : .collect::<Vec<_>>();
3101 1435 :
3102 1435 : par_fsync::par_fsync_async(&all_paths)
3103 171 : .await
3104 1435 : .context("fsync of newly created layer files")?;
3105 :
3106 1435 : par_fsync::par_fsync_async(&[self.conf.timeline_path(&self.tenant_id, &self.timeline_id)])
3107 1435 : .await
3108 1435 : .context("fsync of timeline dir")?;
3109 :
3110 1435 : let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
3111 :
3112 1435 : let mut guard = self.layers.write().await;
3113 1435 : let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id);
3114 :
3115 2547 : for l in &image_layers {
3116 1112 : let path = l.filename();
3117 1112 : let metadata = timeline_path
3118 1112 : .join(path.file_name())
3119 1112 : .metadata()
3120 1112 : .with_context(|| format!("reading metadata of layer file {}", path.file_name()))?;
3121 :
3122 1112 : layer_paths_to_upload.insert(
3123 1112 : path,
3124 1112 : LayerFileMetadata::new(metadata.len(), self.generation),
3125 1112 : );
3126 1112 :
3127 1112 : self.metrics
3128 1112 : .resident_physical_size_gauge
3129 1112 : .add(metadata.len());
3130 1112 : let l = Arc::new(l);
3131 1112 : l.access_stats().record_residence_event(
3132 1112 : LayerResidenceStatus::Resident,
3133 1112 : LayerResidenceEventReason::LayerCreate,
3134 1112 : );
3135 : }
3136 1435 : guard.track_new_image_layers(image_layers);
3137 1435 : drop_wlock(guard);
3138 1435 : timer.stop_and_record();
3139 1435 :
3140 1435 : Ok(layer_paths_to_upload)
3141 1435 : }
3142 : }
3143 :
3144 1069 : #[derive(Default)]
3145 : struct CompactLevel0Phase1Result {
3146 : new_layers: Vec<Arc<DeltaLayer>>,
3147 : deltas_to_compact: Vec<Arc<PersistentLayerDesc>>,
3148 : }
3149 :
3150 : /// Top-level failure to compact.
3151 0 : #[derive(Debug)]
3152 : enum CompactionError {
3153 : /// L0 compaction requires layers to be downloaded.
3154 : ///
3155 : /// This should not happen repeatedly, but will be retried once by top-level
3156 : /// `Timeline::compact`.
3157 : DownloadRequired(Vec<Arc<RemoteLayer>>),
3158 : /// The timeline or pageserver is shutting down
3159 : ShuttingDown,
3160 : /// Compaction cannot be done right now; page reconstruction and so on.
3161 : Other(anyhow::Error),
3162 : }
3163 :
3164 : impl From<anyhow::Error> for CompactionError {
3165 1 : fn from(value: anyhow::Error) -> Self {
3166 1 : CompactionError::Other(value)
3167 1 : }
3168 : }
3169 :
3170 : #[serde_as]
3171 3990 : #[derive(serde::Serialize)]
3172 : struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
3173 :
3174 9786 : #[derive(Default)]
3175 : enum DurationRecorder {
3176 : #[default]
3177 : NotStarted,
3178 : Recorded(RecordedDuration, tokio::time::Instant),
3179 : }
3180 :
3181 : impl DurationRecorder {
3182 2844 : pub fn till_now(&self) -> DurationRecorder {
3183 2844 : match self {
3184 : DurationRecorder::NotStarted => {
3185 0 : panic!("must only call on recorded measurements")
3186 : }
3187 2844 : DurationRecorder::Recorded(_, ended) => {
3188 2844 : let now = tokio::time::Instant::now();
3189 2844 : DurationRecorder::Recorded(RecordedDuration(now - *ended), now)
3190 2844 : }
3191 2844 : }
3192 2844 : }
3193 1995 : pub fn into_recorded(self) -> Option<RecordedDuration> {
3194 1995 : match self {
3195 0 : DurationRecorder::NotStarted => None,
3196 1995 : DurationRecorder::Recorded(recorded, _) => Some(recorded),
3197 : }
3198 1995 : }
3199 : }
3200 :
3201 1398 : #[derive(Default)]
3202 : struct CompactLevel0Phase1StatsBuilder {
3203 : version: Option<u64>,
3204 : tenant_id: Option<TenantId>,
3205 : timeline_id: Option<TimelineId>,
3206 : read_lock_acquisition_micros: DurationRecorder,
3207 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
3208 : read_lock_held_key_sort_micros: DurationRecorder,
3209 : read_lock_held_prerequisites_micros: DurationRecorder,
3210 : read_lock_held_compute_holes_micros: DurationRecorder,
3211 : read_lock_drop_micros: DurationRecorder,
3212 : write_layer_files_micros: DurationRecorder,
3213 : level0_deltas_count: Option<usize>,
3214 : new_deltas_count: Option<usize>,
3215 : new_deltas_size: Option<u64>,
3216 : }
3217 :
3218 : #[serde_as]
3219 285 : #[derive(serde::Serialize)]
3220 : struct CompactLevel0Phase1Stats {
3221 : version: u64,
3222 : #[serde_as(as = "serde_with::DisplayFromStr")]
3223 : tenant_id: TenantId,
3224 : #[serde_as(as = "serde_with::DisplayFromStr")]
3225 : timeline_id: TimelineId,
3226 : read_lock_acquisition_micros: RecordedDuration,
3227 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
3228 : read_lock_held_key_sort_micros: RecordedDuration,
3229 : read_lock_held_prerequisites_micros: RecordedDuration,
3230 : read_lock_held_compute_holes_micros: RecordedDuration,
3231 : read_lock_drop_micros: RecordedDuration,
3232 : write_layer_files_micros: RecordedDuration,
3233 : level0_deltas_count: usize,
3234 : new_deltas_count: usize,
3235 : new_deltas_size: u64,
3236 : }
3237 :
3238 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
3239 : type Error = anyhow::Error;
3240 :
3241 285 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
3242 285 : Ok(Self {
3243 285 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
3244 285 : tenant_id: value
3245 285 : .tenant_id
3246 285 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
3247 285 : timeline_id: value
3248 285 : .timeline_id
3249 285 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
3250 285 : read_lock_acquisition_micros: value
3251 285 : .read_lock_acquisition_micros
3252 285 : .into_recorded()
3253 285 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
3254 285 : read_lock_held_spawn_blocking_startup_micros: value
3255 285 : .read_lock_held_spawn_blocking_startup_micros
3256 285 : .into_recorded()
3257 285 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
3258 285 : read_lock_held_key_sort_micros: value
3259 285 : .read_lock_held_key_sort_micros
3260 285 : .into_recorded()
3261 285 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
3262 285 : read_lock_held_prerequisites_micros: value
3263 285 : .read_lock_held_prerequisites_micros
3264 285 : .into_recorded()
3265 285 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
3266 285 : read_lock_held_compute_holes_micros: value
3267 285 : .read_lock_held_compute_holes_micros
3268 285 : .into_recorded()
3269 285 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
3270 285 : read_lock_drop_micros: value
3271 285 : .read_lock_drop_micros
3272 285 : .into_recorded()
3273 285 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
3274 285 : write_layer_files_micros: value
3275 285 : .write_layer_files_micros
3276 285 : .into_recorded()
3277 285 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
3278 285 : level0_deltas_count: value
3279 285 : .level0_deltas_count
3280 285 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
3281 285 : new_deltas_count: value
3282 285 : .new_deltas_count
3283 285 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
3284 285 : new_deltas_size: value
3285 285 : .new_deltas_size
3286 285 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
3287 : })
3288 285 : }
3289 : }
3290 :
3291 : impl Timeline {
3292 : /// Level0 files first phase of compaction, explained in the [`compact_inner`] comment.
3293 : ///
3294 : /// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
3295 : /// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
3296 : /// start of level0 files compaction, the on-demand download should be revisited as well.
3297 : ///
3298 : /// [`compact_inner`]: Self::compact_inner
3299 1398 : async fn compact_level0_phase1(
3300 1398 : self: &Arc<Self>,
3301 1398 : _layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
3302 1398 : guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
3303 1398 : mut stats: CompactLevel0Phase1StatsBuilder,
3304 1398 : target_file_size: u64,
3305 1398 : ctx: &RequestContext,
3306 1398 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
3307 1398 : stats.read_lock_held_spawn_blocking_startup_micros =
3308 1398 : stats.read_lock_acquisition_micros.till_now(); // set by caller
3309 1398 : let layers = guard.layer_map();
3310 1398 : let level0_deltas = layers.get_level0_deltas()?;
3311 1398 : let mut level0_deltas = level0_deltas
3312 1398 : .into_iter()
3313 10762 : .map(|x| guard.get_from_desc(&x))
3314 1398 : .collect_vec();
3315 1398 : stats.level0_deltas_count = Some(level0_deltas.len());
3316 1398 : // Only compact if enough layers have accumulated.
3317 1398 : let threshold = self.get_compaction_threshold();
3318 1398 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
3319 0 : debug!(
3320 0 : level0_deltas = level0_deltas.len(),
3321 0 : threshold, "too few deltas to compact"
3322 0 : );
3323 1069 : return Ok(CompactLevel0Phase1Result::default());
3324 329 : }
3325 329 :
3326 329 : // This failpoint is used together with `test_duplicate_layers` integration test.
3327 329 : // It returns the compaction result exactly the same layers as input to compaction.
3328 329 : // We want to ensure that this will not cause any problem when updating the layer map
3329 329 : // after the compaction is finished.
3330 329 : //
3331 329 : // Currently, there are two rare edge cases that will cause duplicated layers being
3332 329 : // inserted.
3333 329 : // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
3334 329 : // is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
3335 329 : // map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
3336 329 : // point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
3337 329 : // and this causes an overwrite. This is acceptable because the content is the same, and we should do a
3338 329 : // layer replace instead of the normal remove / upload process.
3339 329 : // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
3340 329 : // size length. Compaction will likely create the same set of n files afterwards.
3341 329 : //
3342 329 : // This failpoint is a superset of both of the cases.
3343 329 : fail_point!("compact-level0-phase1-return-same", |_| {
3344 37 : println!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
3345 37 : Ok(CompactLevel0Phase1Result {
3346 37 : new_layers: level0_deltas
3347 37 : .iter()
3348 2839 : .map(|x| x.clone().downcast_delta_layer().unwrap())
3349 37 : .collect(),
3350 37 : deltas_to_compact: level0_deltas
3351 37 : .iter()
3352 2839 : .map(|x| x.layer_desc().clone().into())
3353 37 : .collect(),
3354 37 : })
3355 329 : });
3356 :
3357 : // Gather the files to compact in this iteration.
3358 : //
3359 : // Start with the oldest Level 0 delta file, and collect any other
3360 : // level 0 files that form a contiguous sequence, such that the end
3361 : // LSN of previous file matches the start LSN of the next file.
3362 : //
3363 : // Note that if the files don't form such a sequence, we might
3364 : // "compact" just a single file. That's a bit pointless, but it allows
3365 : // us to get rid of the level 0 file, and compact the other files on
3366 : // the next iteration. This could probably made smarter, but such
3367 : // "gaps" in the sequence of level 0 files should only happen in case
3368 : // of a crash, partial download from cloud storage, or something like
3369 : // that, so it's not a big deal in practice.
3370 14666 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
3371 292 : let mut level0_deltas_iter = level0_deltas.iter();
3372 292 :
3373 292 : let first_level0_delta = level0_deltas_iter.next().unwrap();
3374 292 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
3375 292 : let mut deltas_to_compact = vec![Arc::clone(first_level0_delta)];
3376 4956 : for l in level0_deltas_iter {
3377 4664 : let lsn_range = &l.layer_desc().lsn_range;
3378 4664 :
3379 4664 : if lsn_range.start != prev_lsn_end {
3380 0 : break;
3381 4664 : }
3382 4664 : deltas_to_compact.push(Arc::clone(l));
3383 4664 : prev_lsn_end = lsn_range.end;
3384 : }
3385 292 : let lsn_range = Range {
3386 292 : start: deltas_to_compact
3387 292 : .first()
3388 292 : .unwrap()
3389 292 : .layer_desc()
3390 292 : .lsn_range
3391 292 : .start,
3392 292 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
3393 292 : };
3394 292 :
3395 292 : let remotes = deltas_to_compact
3396 292 : .iter()
3397 4956 : .filter(|l| l.is_remote_layer())
3398 292 : .inspect(|l| info!("compact requires download of {l}"))
3399 292 : .map(|l| {
3400 3 : l.clone()
3401 3 : .downcast_remote_layer()
3402 3 : .expect("just checked it is remote layer")
3403 292 : })
3404 292 : .collect::<Vec<_>>();
3405 292 :
3406 292 : if !remotes.is_empty() {
3407 : // caller is holding the lock to layer_removal_cs, and we don't want to download while
3408 : // holding that; in future download_remote_layer might take it as well. this is
3409 : // regardless of earlier image creation downloading on-demand, while holding the lock.
3410 1 : return Err(CompactionError::DownloadRequired(remotes));
3411 291 : }
3412 :
3413 291 : info!(
3414 291 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
3415 291 : lsn_range.start,
3416 291 : lsn_range.end,
3417 291 : deltas_to_compact.len(),
3418 291 : level0_deltas.len()
3419 291 : );
3420 :
3421 4953 : for l in deltas_to_compact.iter() {
3422 4953 : info!("compact includes {l}");
3423 : }
3424 :
3425 : // We don't need the original list of layers anymore. Drop it so that
3426 : // we don't accidentally use it later in the function.
3427 291 : drop(level0_deltas);
3428 291 :
3429 291 : stats.read_lock_held_prerequisites_micros = stats
3430 291 : .read_lock_held_spawn_blocking_startup_micros
3431 291 : .till_now();
3432 291 :
3433 291 : // Determine N largest holes where N is number of compacted layers.
3434 291 : let max_holes = deltas_to_compact.len();
3435 291 : let last_record_lsn = self.get_last_record_lsn();
3436 291 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
3437 291 : let min_hole_coverage_size = 3; // TODO: something more flexible?
3438 291 :
3439 291 : // min-heap (reserve space for one more element added before eviction)
3440 291 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
3441 291 : let mut prev: Option<Key> = None;
3442 291 :
3443 291 : let mut all_keys = Vec::new();
3444 291 :
3445 291 : let downcast_deltas: Vec<_> = deltas_to_compact
3446 291 : .iter()
3447 4953 : .map(|l| l.clone().downcast_delta_layer().expect("delta layer"))
3448 291 : .collect();
3449 4953 : for dl in downcast_deltas.iter() {
3450 : // TODO: replace this with an await once we fully go async
3451 4953 : all_keys.extend(DeltaLayer::load_keys(dl, ctx).await?);
3452 : }
3453 :
3454 : // The current stdlib sorting implementation is designed in a way where it is
3455 : // particularly fast where the slice is made up of sorted sub-ranges.
3456 536602652 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
3457 291 :
3458 291 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
3459 :
3460 40589793 : for DeltaEntry { key: next_key, .. } in all_keys.iter() {
3461 40589793 : let next_key = *next_key;
3462 40589793 : if let Some(prev_key) = prev {
3463 : // just first fast filter
3464 40589503 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
3465 196839 : let key_range = prev_key..next_key;
3466 : // Measuring hole by just subtraction of i128 representation of key range boundaries
3467 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
3468 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
3469 : // That is why it is better to measure size of hole as number of covering image layers.
3470 196839 : let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
3471 196839 : if coverage_size >= min_hole_coverage_size {
3472 15 : heap.push(Hole {
3473 15 : key_range,
3474 15 : coverage_size,
3475 15 : });
3476 15 : if heap.len() > max_holes {
3477 0 : heap.pop(); // remove smallest hole
3478 15 : }
3479 196824 : }
3480 40392664 : }
3481 290 : }
3482 40589793 : prev = Some(next_key.next());
3483 : }
3484 290 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
3485 290 : drop_rlock(guard);
3486 290 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
3487 290 : let mut holes = heap.into_vec();
3488 290 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
3489 290 : let mut next_hole = 0; // index of next hole in holes vector
3490 290 :
3491 290 : // This iterator walks through all key-value pairs from all the layers
3492 290 : // we're compacting, in key, LSN order.
3493 290 : let all_values_iter = all_keys.iter();
3494 290 :
3495 290 : // This iterator walks through all keys and is needed to calculate size used by each key
3496 290 : let mut all_keys_iter = all_keys
3497 290 : .iter()
3498 32498816 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
3499 32498526 : .coalesce(|mut prev, cur| {
3500 32498526 : // Coalesce keys that belong to the same key pair.
3501 32498526 : // This ensures that compaction doesn't put them
3502 32498526 : // into different layer files.
3503 32498526 : // Still limit this by the target file size,
3504 32498526 : // so that we keep the size of the files in
3505 32498526 : // check.
3506 32498526 : if prev.0 == cur.0 && prev.2 < target_file_size {
3507 30746597 : prev.2 += cur.2;
3508 30746597 : Ok(prev)
3509 : } else {
3510 1751929 : Err((prev, cur))
3511 : }
3512 32498526 : });
3513 290 :
3514 290 : // Merge the contents of all the input delta layers into a new set
3515 290 : // of delta layers, based on the current partitioning.
3516 290 : //
3517 290 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
3518 290 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
3519 290 : // would be too large. In that case, we also split on the LSN dimension.
3520 290 : //
3521 290 : // LSN
3522 290 : // ^
3523 290 : // |
3524 290 : // | +-----------+ +--+--+--+--+
3525 290 : // | | | | | | | |
3526 290 : // | +-----------+ | | | | |
3527 290 : // | | | | | | | |
3528 290 : // | +-----------+ ==> | | | | |
3529 290 : // | | | | | | | |
3530 290 : // | +-----------+ | | | | |
3531 290 : // | | | | | | | |
3532 290 : // | +-----------+ +--+--+--+--+
3533 290 : // |
3534 290 : // +--------------> key
3535 290 : //
3536 290 : //
3537 290 : // If one key (X) has a lot of page versions:
3538 290 : //
3539 290 : // LSN
3540 290 : // ^
3541 290 : // | (X)
3542 290 : // | +-----------+ +--+--+--+--+
3543 290 : // | | | | | | | |
3544 290 : // | +-----------+ | | +--+ |
3545 290 : // | | | | | | | |
3546 290 : // | +-----------+ ==> | | | | |
3547 290 : // | | | | | +--+ |
3548 290 : // | +-----------+ | | | | |
3549 290 : // | | | | | | | |
3550 290 : // | +-----------+ +--+--+--+--+
3551 290 : // |
3552 290 : // +--------------> key
3553 290 : // TODO: this actually divides the layers into fixed-size chunks, not
3554 290 : // based on the partitioning.
3555 290 : //
3556 290 : // TODO: we should also opportunistically materialize and
3557 290 : // garbage collect what we can.
3558 290 : let mut new_layers = Vec::new();
3559 290 : let mut prev_key: Option<Key> = None;
3560 290 : let mut writer: Option<DeltaLayerWriter> = None;
3561 290 : let mut key_values_total_size = 0u64;
3562 290 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
3563 290 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
3564 :
3565 : for &DeltaEntry {
3566 32493263 : key, lsn, ref val, ..
3567 32493548 : } in all_values_iter
3568 : {
3569 32493263 : let value = val.load().await?;
3570 32493259 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
3571 32493259 : // We need to check key boundaries once we reach next key or end of layer with the same key
3572 32493259 : if !same_key || lsn == dup_end_lsn {
3573 1752209 : let mut next_key_size = 0u64;
3574 1752209 : let is_dup_layer = dup_end_lsn.is_valid();
3575 1752209 : dup_start_lsn = Lsn::INVALID;
3576 1752209 : if !same_key {
3577 1752149 : dup_end_lsn = Lsn::INVALID;
3578 1752149 : }
3579 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
3580 1752214 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
3581 1752214 : next_key_size = next_size;
3582 1752214 : if key != next_key {
3583 1751864 : if dup_end_lsn.is_valid() {
3584 23 : // We are writting segment with duplicates:
3585 23 : // place all remaining values of this key in separate segment
3586 23 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
3587 23 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
3588 1751841 : }
3589 1751864 : break;
3590 350 : }
3591 350 : key_values_total_size += next_size;
3592 350 : // Check if it is time to split segment: if total keys size is larger than target file size.
3593 350 : // We need to avoid generation of empty segments if next_size > target_file_size.
3594 350 : if key_values_total_size > target_file_size && lsn != next_lsn {
3595 : // Split key between multiple layers: such layer can contain only single key
3596 60 : dup_start_lsn = if dup_end_lsn.is_valid() {
3597 37 : dup_end_lsn // new segment with duplicates starts where old one stops
3598 : } else {
3599 23 : lsn // start with the first LSN for this key
3600 : };
3601 60 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
3602 60 : break;
3603 290 : }
3604 : }
3605 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
3606 1752209 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
3607 0 : dup_start_lsn = dup_end_lsn;
3608 0 : dup_end_lsn = lsn_range.end;
3609 1752209 : }
3610 1752209 : if writer.is_some() {
3611 1751919 : let written_size = writer.as_mut().unwrap().size();
3612 1751919 : let contains_hole =
3613 1751919 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
3614 : // check if key cause layer overflow or contains hole...
3615 1751919 : if is_dup_layer
3616 1751836 : || dup_end_lsn.is_valid()
3617 1751818 : || written_size + key_values_total_size > target_file_size
3618 1743185 : || contains_hole
3619 : {
3620 : // ... if so, flush previous layer and prepare to write new one
3621 : new_layers.push(Arc::new(
3622 8747 : writer
3623 8747 : .take()
3624 8747 : .unwrap()
3625 8747 : .finish(prev_key.unwrap().next())
3626 0 : .await?,
3627 : ));
3628 8747 : writer = None;
3629 8747 :
3630 8747 : if contains_hole {
3631 15 : // skip hole
3632 15 : next_hole += 1;
3633 8732 : }
3634 1743172 : }
3635 290 : }
3636 : // Remember size of key value because at next iteration we will access next item
3637 1752209 : key_values_total_size = next_key_size;
3638 30741049 : }
3639 32493258 : if writer.is_none() {
3640 9037 : // Create writer if not initiaized yet
3641 9037 : writer = Some(
3642 : DeltaLayerWriter::new(
3643 9037 : self.conf,
3644 9037 : self.timeline_id,
3645 9037 : self.tenant_id,
3646 9037 : key,
3647 9037 : if dup_end_lsn.is_valid() {
3648 : // this is a layer containing slice of values of the same key
3649 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
3650 83 : dup_start_lsn..dup_end_lsn
3651 : } else {
3652 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
3653 8954 : lsn_range.clone()
3654 : },
3655 : )
3656 0 : .await?,
3657 : );
3658 32484221 : }
3659 :
3660 32493258 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
3661 0 : Err(CompactionError::Other(anyhow::anyhow!(
3662 0 : "failpoint delta-layer-writer-fail-before-finish"
3663 0 : )))
3664 32493258 : });
3665 :
3666 32493258 : writer.as_mut().unwrap().put_value(key, lsn, value).await?;
3667 32493258 : prev_key = Some(key);
3668 : }
3669 285 : if let Some(writer) = writer {
3670 285 : new_layers.push(Arc::new(writer.finish(prev_key.unwrap().next()).await?));
3671 0 : }
3672 :
3673 : // Sync layers
3674 285 : if !new_layers.is_empty() {
3675 : // Print a warning if the created layer is larger than double the target size
3676 : // Add two pages for potential overhead. This should in theory be already
3677 : // accounted for in the target calculation, but for very small targets,
3678 : // we still might easily hit the limit otherwise.
3679 285 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
3680 8738 : for layer in new_layers.iter() {
3681 8738 : if layer.layer_desc().file_size > warn_limit {
3682 0 : warn!(
3683 0 : %layer,
3684 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
3685 0 : );
3686 8738 : }
3687 : }
3688 8738 : let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
3689 285 :
3690 285 : // Fsync all the layer files and directory using multiple threads to
3691 285 : // minimize latency.
3692 285 : par_fsync::par_fsync(&layer_paths).context("fsync all new layers")?;
3693 :
3694 285 : par_fsync::par_fsync(&[self.conf.timeline_path(&self.tenant_id, &self.timeline_id)])
3695 285 : .context("fsync of timeline dir")?;
3696 :
3697 285 : layer_paths.pop().unwrap();
3698 0 : }
3699 :
3700 285 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
3701 285 : stats.new_deltas_count = Some(new_layers.len());
3702 8738 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
3703 285 :
3704 285 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
3705 285 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
3706 : {
3707 285 : Ok(stats_json) => {
3708 285 : info!(
3709 285 : stats_json = stats_json.as_str(),
3710 285 : "compact_level0_phase1 stats available"
3711 285 : )
3712 : }
3713 0 : Err(e) => {
3714 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
3715 : }
3716 : }
3717 :
3718 285 : Ok(CompactLevel0Phase1Result {
3719 285 : new_layers,
3720 285 : deltas_to_compact: deltas_to_compact
3721 285 : .into_iter()
3722 4405 : .map(|x| Arc::new(x.layer_desc().clone()))
3723 285 : .collect(),
3724 285 : })
3725 1392 : }
3726 :
3727 : ///
3728 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
3729 : /// as Level 1 files.
3730 : ///
3731 1398 : async fn compact_level0(
3732 1398 : self: &Arc<Self>,
3733 1398 : layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
3734 1398 : target_file_size: u64,
3735 1398 : ctx: &RequestContext,
3736 1398 : ) -> Result<(), CompactionError> {
3737 : let CompactLevel0Phase1Result {
3738 1391 : new_layers,
3739 1391 : deltas_to_compact,
3740 : } = {
3741 1398 : let phase1_span = info_span!("compact_level0_phase1");
3742 1398 : let ctx = ctx.attached_child();
3743 1398 : let mut stats = CompactLevel0Phase1StatsBuilder {
3744 1398 : version: Some(2),
3745 1398 : tenant_id: Some(self.tenant_id),
3746 1398 : timeline_id: Some(self.timeline_id),
3747 1398 : ..Default::default()
3748 1398 : };
3749 1398 :
3750 1398 : let begin = tokio::time::Instant::now();
3751 1398 : let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
3752 1398 : let now = tokio::time::Instant::now();
3753 1398 : stats.read_lock_acquisition_micros =
3754 1398 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
3755 1398 : let layer_removal_cs = layer_removal_cs.clone();
3756 1398 : self.compact_level0_phase1(
3757 1398 : layer_removal_cs,
3758 1398 : phase1_layers_locked,
3759 1398 : stats,
3760 1398 : target_file_size,
3761 1398 : &ctx,
3762 1398 : )
3763 1398 : .instrument(phase1_span)
3764 265395 : .await?
3765 : };
3766 :
3767 1391 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
3768 : // nothing to do
3769 1069 : return Ok(());
3770 322 : }
3771 :
3772 : // Before deleting any layers, we need to wait for their upload ops to finish.
3773 : // See remote_timeline_client module level comment on consistency.
3774 : // Do it here because we don't want to hold self.layers.write() while waiting.
3775 322 : if let Some(remote_client) = &self.remote_client {
3776 0 : debug!("waiting for upload ops to complete");
3777 203 : remote_client
3778 203 : .wait_completion()
3779 29 : .await
3780 203 : .context("wait for layer upload ops to complete")?;
3781 119 : }
3782 :
3783 321 : let mut guard = self.layers.write().await;
3784 321 : let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
3785 321 :
3786 321 : // In some rare cases, we may generate a file with exactly the same key range / LSN as before the compaction.
3787 321 : // We should move to numbering the layer files instead of naming them using key range / LSN some day. But for
3788 321 : // now, we just skip the file to avoid unintentional modification to files on the disk and in the layer map.
3789 321 : let mut duplicated_layers = HashSet::new();
3790 321 :
3791 321 : let mut insert_layers = Vec::new();
3792 321 : let mut remove_layers = Vec::new();
3793 :
3794 11362 : for l in new_layers {
3795 11041 : let new_delta_path = l.path();
3796 :
3797 11041 : let metadata = new_delta_path.metadata().with_context(|| {
3798 0 : format!(
3799 0 : "read file metadata for new created layer {}",
3800 0 : new_delta_path.display()
3801 0 : )
3802 11041 : })?;
3803 :
3804 11041 : if let Some(remote_client) = &self.remote_client {
3805 6879 : remote_client.schedule_layer_file_upload(
3806 6879 : &l.filename(),
3807 6879 : &LayerFileMetadata::new(metadata.len(), self.generation),
3808 6879 : )?;
3809 4162 : }
3810 :
3811 : // update the timeline's physical size
3812 11041 : self.metrics
3813 11041 : .resident_physical_size_gauge
3814 11041 : .add(metadata.len());
3815 11041 :
3816 11041 : new_layer_paths.insert(
3817 11041 : new_delta_path,
3818 11041 : LayerFileMetadata::new(metadata.len(), self.generation),
3819 11041 : );
3820 11041 : l.access_stats().record_residence_event(
3821 11041 : LayerResidenceStatus::Resident,
3822 11041 : LayerResidenceEventReason::LayerCreate,
3823 11041 : );
3824 11041 : let l = l as Arc<dyn PersistentLayer>;
3825 11041 : if guard.contains(&l) {
3826 2839 : duplicated_layers.insert(l.layer_desc().key());
3827 2839 : } else {
3828 8202 : if LayerMap::is_l0(l.layer_desc()) {
3829 0 : return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
3830 8202 : }
3831 8202 : insert_layers.push(l);
3832 : }
3833 : }
3834 :
3835 : // Now that we have reshuffled the data to set of new delta layers, we can
3836 : // delete the old ones
3837 321 : let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
3838 7556 : for ldesc in deltas_to_compact {
3839 7235 : if duplicated_layers.contains(&ldesc.key()) {
3840 : // skip duplicated layers, they will not be removed; we have already overwritten them
3841 : // with new layers in the compaction phase 1.
3842 2839 : continue;
3843 4396 : }
3844 4396 : layer_names_to_delete.push(ldesc.filename());
3845 4396 : remove_layers.push(guard.get_from_desc(&ldesc));
3846 : }
3847 :
3848 321 : guard.finish_compact_l0(
3849 321 : layer_removal_cs,
3850 321 : remove_layers,
3851 321 : insert_layers,
3852 321 : &self.metrics,
3853 321 : )?;
3854 :
3855 321 : drop_wlock(guard);
3856 :
3857 : // Also schedule the deletions in remote storage
3858 321 : if let Some(remote_client) = &self.remote_client {
3859 202 : remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
3860 119 : }
3861 :
3862 321 : Ok(())
3863 1392 : }
3864 :
3865 : /// Update information about which layer files need to be retained on
3866 : /// garbage collection. This is separate from actually performing the GC,
3867 : /// and is updated more frequently, so that compaction can remove obsolete
3868 : /// page versions more aggressively.
3869 : ///
3870 : /// TODO: that's wishful thinking, compaction doesn't actually do that
3871 : /// currently.
3872 : ///
3873 : /// The caller specifies how much history is needed with the 3 arguments:
3874 : ///
3875 : /// retain_lsns: keep a version of each page at these LSNs
3876 : /// cutoff_horizon: also keep everything newer than this LSN
3877 : /// pitr: the time duration required to keep data for PITR
3878 : ///
3879 : /// The 'retain_lsns' list is currently used to prevent removing files that
3880 : /// are needed by child timelines. In the future, the user might be able to
3881 : /// name additional points in time to retain. The caller is responsible for
3882 : /// collecting that information.
3883 : ///
3884 : /// The 'cutoff_horizon' point is used to retain recent versions that might still be
3885 : /// needed by read-only nodes. (As of this writing, the caller just passes
3886 : /// the latest LSN subtracted by a constant, and doesn't do anything smart
3887 : /// to figure out what read-only nodes might actually need.)
3888 : ///
3889 : /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
3890 : /// whether a record is needed for PITR.
3891 : ///
3892 : /// NOTE: This function holds a short-lived lock to protect the 'gc_info'
3893 : /// field, so that the three values passed as argument are stored
3894 : /// atomically. But the caller is responsible for ensuring that no new
3895 : /// branches are created that would need to be included in 'retain_lsns',
3896 : /// for example. The caller should hold `Tenant::gc_cs` lock to ensure
3897 : /// that.
3898 : ///
3899 2679 : #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
3900 : pub(super) async fn update_gc_info(
3901 : &self,
3902 : retain_lsns: Vec<Lsn>,
3903 : cutoff_horizon: Lsn,
3904 : pitr: Duration,
3905 : ctx: &RequestContext,
3906 : ) -> anyhow::Result<()> {
3907 : // First, calculate pitr_cutoff_timestamp and then convert it to LSN.
3908 : //
3909 : // Some unit tests depend on garbage-collection working even when
3910 : // CLOG data is missing, so that find_lsn_for_timestamp() doesn't
3911 : // work, so avoid calling it altogether if time-based retention is not
3912 : // configured. It would be pointless anyway.
3913 : let pitr_cutoff = if pitr != Duration::ZERO {
3914 : let now = SystemTime::now();
3915 : if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
3916 : let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
3917 :
3918 : match self.find_lsn_for_timestamp(pitr_timestamp, ctx).await? {
3919 : LsnForTimestamp::Present(lsn) => lsn,
3920 : LsnForTimestamp::Future(lsn) => {
3921 : // The timestamp is in the future. That sounds impossible,
3922 : // but what it really means is that there hasn't been
3923 : // any commits since the cutoff timestamp.
3924 0 : debug!("future({})", lsn);
3925 : cutoff_horizon
3926 : }
3927 : LsnForTimestamp::Past(lsn) => {
3928 0 : debug!("past({})", lsn);
3929 : // conservative, safe default is to remove nothing, when we
3930 : // have no commit timestamp data available
3931 : *self.get_latest_gc_cutoff_lsn()
3932 : }
3933 : LsnForTimestamp::NoData(lsn) => {
3934 0 : debug!("nodata({})", lsn);
3935 : // conservative, safe default is to remove nothing, when we
3936 : // have no commit timestamp data available
3937 : *self.get_latest_gc_cutoff_lsn()
3938 : }
3939 : }
3940 : } else {
3941 : // If we don't have enough data to convert to LSN,
3942 : // play safe and don't remove any layers.
3943 : *self.get_latest_gc_cutoff_lsn()
3944 : }
3945 : } else {
3946 : // No time-based retention was configured. Set time-based cutoff to
3947 : // same as LSN based.
3948 : cutoff_horizon
3949 : };
3950 :
3951 : // Grab the lock and update the values
3952 : *self.gc_info.write().unwrap() = GcInfo {
3953 : retain_lsns,
3954 : horizon_cutoff: cutoff_horizon,
3955 : pitr_cutoff,
3956 : };
3957 :
3958 : Ok(())
3959 : }
3960 :
3961 : ///
3962 : /// Garbage collect layer files on a timeline that are no longer needed.
3963 : ///
3964 : /// Currently, we don't make any attempt at removing unneeded page versions
3965 : /// within a layer file. We can only remove the whole file if it's fully
3966 : /// obsolete.
3967 : ///
3968 836 : pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
3969 836 : let timer = self.metrics.garbage_collect_histo.start_timer();
3970 836 :
3971 836 : fail_point!("before-timeline-gc");
3972 :
3973 836 : let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
3974 : // Is the timeline being deleted?
3975 836 : if self.is_stopping() {
3976 0 : anyhow::bail!("timeline is Stopping");
3977 836 : }
3978 836 :
3979 836 : let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
3980 836 : let gc_info = self.gc_info.read().unwrap();
3981 836 :
3982 836 : let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
3983 836 : let pitr_cutoff = gc_info.pitr_cutoff;
3984 836 : let retain_lsns = gc_info.retain_lsns.clone();
3985 836 : (horizon_cutoff, pitr_cutoff, retain_lsns)
3986 836 : };
3987 836 :
3988 836 : let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
3989 :
3990 836 : let res = self
3991 836 : .gc_timeline(
3992 836 : layer_removal_cs.clone(),
3993 836 : horizon_cutoff,
3994 836 : pitr_cutoff,
3995 836 : retain_lsns,
3996 836 : new_gc_cutoff,
3997 836 : )
3998 836 : .instrument(
3999 836 : info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
4000 : )
4001 261 : .await?;
4002 :
4003 : // only record successes
4004 831 : timer.stop_and_record();
4005 831 :
4006 831 : Ok(res)
4007 831 : }
4008 :
4009 836 : async fn gc_timeline(
4010 836 : &self,
4011 836 : layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
4012 836 : horizon_cutoff: Lsn,
4013 836 : pitr_cutoff: Lsn,
4014 836 : retain_lsns: Vec<Lsn>,
4015 836 : new_gc_cutoff: Lsn,
4016 836 : ) -> anyhow::Result<GcResult> {
4017 836 : let now = SystemTime::now();
4018 836 : let mut result: GcResult = GcResult::default();
4019 836 :
4020 836 : // Nothing to GC. Return early.
4021 836 : let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
4022 836 : if latest_gc_cutoff >= new_gc_cutoff {
4023 105 : info!(
4024 105 : "Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
4025 105 : );
4026 105 : return Ok(result);
4027 731 : }
4028 731 :
4029 731 : // We need to ensure that no one tries to read page versions or create
4030 731 : // branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
4031 731 : // for details. This will block until the old value is no longer in use.
4032 731 : //
4033 731 : // The GC cutoff should only ever move forwards.
4034 731 : {
4035 731 : let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
4036 731 : ensure!(
4037 731 : *write_guard <= new_gc_cutoff,
4038 0 : "Cannot move GC cutoff LSN backwards (was {}, new {})",
4039 0 : *write_guard,
4040 : new_gc_cutoff
4041 : );
4042 731 : write_guard.store_and_unlock(new_gc_cutoff).wait();
4043 : }
4044 :
4045 731 : info!("GC starting");
4046 :
4047 0 : debug!("retain_lsns: {:?}", retain_lsns);
4048 :
4049 : // Before deleting any layers, we need to wait for their upload ops to finish.
4050 : // See storage_sync module level comment on consistency.
4051 : // Do it here because we don't want to hold self.layers.write() while waiting.
4052 731 : if let Some(remote_client) = &self.remote_client {
4053 0 : debug!("waiting for upload ops to complete");
4054 231 : remote_client
4055 231 : .wait_completion()
4056 158 : .await
4057 231 : .context("wait for layer upload ops to complete")?;
4058 500 : }
4059 :
4060 731 : let mut layers_to_remove = Vec::new();
4061 731 : let mut wanted_image_layers = KeySpaceRandomAccum::default();
4062 :
4063 : // Scan all layers in the timeline (remote or on-disk).
4064 : //
4065 : // Garbage collect the layer if all conditions are satisfied:
4066 : // 1. it is older than cutoff LSN;
4067 : // 2. it is older than PITR interval;
4068 : // 3. it doesn't need to be retained for 'retain_lsns';
4069 : // 4. newer on-disk image layers cover the layer's whole key range
4070 : //
4071 : // TODO holding a write lock is too agressive and avoidable
4072 731 : let mut guard = self.layers.write().await;
4073 731 : let layers = guard.layer_map();
4074 15537 : 'outer: for l in layers.iter_historic_layers() {
4075 15537 : result.layers_total += 1;
4076 15537 :
4077 15537 : // 1. Is it newer than GC horizon cutoff point?
4078 15537 : if l.get_lsn_range().end > horizon_cutoff {
4079 0 : debug!(
4080 0 : "keeping {} because it's newer than horizon_cutoff {}",
4081 0 : l.filename(),
4082 0 : horizon_cutoff,
4083 0 : );
4084 1887 : result.layers_needed_by_cutoff += 1;
4085 1887 : continue 'outer;
4086 13650 : }
4087 13650 :
4088 13650 : // 2. It is newer than PiTR cutoff point?
4089 13650 : if l.get_lsn_range().end > pitr_cutoff {
4090 0 : debug!(
4091 0 : "keeping {} because it's newer than pitr_cutoff {}",
4092 0 : l.filename(),
4093 0 : pitr_cutoff,
4094 0 : );
4095 152 : result.layers_needed_by_pitr += 1;
4096 152 : continue 'outer;
4097 13498 : }
4098 :
4099 : // 3. Is it needed by a child branch?
4100 : // NOTE With that we would keep data that
4101 : // might be referenced by child branches forever.
4102 : // We can track this in child timeline GC and delete parent layers when
4103 : // they are no longer needed. This might be complicated with long inheritance chains.
4104 : //
4105 : // TODO Vec is not a great choice for `retain_lsns`
4106 13542 : for retain_lsn in &retain_lsns {
4107 : // start_lsn is inclusive
4108 537 : if &l.get_lsn_range().start <= retain_lsn {
4109 0 : debug!(
4110 0 : "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
4111 0 : l.filename(),
4112 0 : retain_lsn,
4113 0 : l.is_incremental(),
4114 0 : );
4115 493 : result.layers_needed_by_branches += 1;
4116 493 : continue 'outer;
4117 44 : }
4118 : }
4119 :
4120 : // 4. Is there a later on-disk layer for this relation?
4121 : //
4122 : // The end-LSN is exclusive, while disk_consistent_lsn is
4123 : // inclusive. For example, if disk_consistent_lsn is 100, it is
4124 : // OK for a delta layer to have end LSN 101, but if the end LSN
4125 : // is 102, then it might not have been fully flushed to disk
4126 : // before crash.
4127 : //
4128 : // For example, imagine that the following layers exist:
4129 : //
4130 : // 1000 - image (A)
4131 : // 1000-2000 - delta (B)
4132 : // 2000 - image (C)
4133 : // 2000-3000 - delta (D)
4134 : // 3000 - image (E)
4135 : //
4136 : // If GC horizon is at 2500, we can remove layers A and B, but
4137 : // we cannot remove C, even though it's older than 2500, because
4138 : // the delta layer 2000-3000 depends on it.
4139 13005 : if !layers
4140 13005 : .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))?
4141 : {
4142 0 : debug!("keeping {} because it is the latest layer", l.filename());
4143 : // Collect delta key ranges that need image layers to allow garbage
4144 : // collecting the layers.
4145 : // It is not so obvious whether we need to propagate information only about
4146 : // delta layers. Image layers can form "stairs" preventing old image from been deleted.
4147 : // But image layers are in any case less sparse than delta layers. Also we need some
4148 : // protection from replacing recent image layers with new one after each GC iteration.
4149 12272 : if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
4150 0 : wanted_image_layers.add_range(l.get_key_range());
4151 12272 : }
4152 12272 : result.layers_not_updated += 1;
4153 12272 : continue 'outer;
4154 733 : }
4155 :
4156 : // We didn't find any reason to keep this file, so remove it.
4157 0 : debug!(
4158 0 : "garbage collecting {} is_dropped: xx is_incremental: {}",
4159 0 : l.filename(),
4160 0 : l.is_incremental(),
4161 0 : );
4162 733 : layers_to_remove.push(Arc::clone(&l));
4163 : }
4164 731 : self.wanted_image_layers
4165 731 : .lock()
4166 731 : .unwrap()
4167 731 : .replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
4168 731 :
4169 731 : if !layers_to_remove.is_empty() {
4170 : // Persist the new GC cutoff value in the metadata file, before
4171 : // we actually remove anything.
4172 12 : self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())
4173 0 : .await?;
4174 :
4175 : // Actually delete the layers from disk and remove them from the map.
4176 : // (couldn't do this in the loop above, because you cannot modify a collection
4177 : // while iterating it. BTreeMap::retain() would be another option)
4178 12 : let mut layer_names_to_delete = Vec::with_capacity(layers_to_remove.len());
4179 12 : let gc_layers = layers_to_remove
4180 12 : .iter()
4181 733 : .map(|x| guard.get_from_desc(x))
4182 12 : .collect();
4183 745 : for doomed_layer in layers_to_remove {
4184 733 : layer_names_to_delete.push(doomed_layer.filename());
4185 733 : result.layers_removed += 1;
4186 733 : }
4187 12 : let apply = guard.finish_gc_timeline(layer_removal_cs, gc_layers, &self.metrics)?;
4188 :
4189 12 : if result.layers_removed != 0 {
4190 12 : fail_point!("after-timeline-gc-removed-layers");
4191 12 : }
4192 :
4193 12 : if let Some(remote_client) = &self.remote_client {
4194 10 : remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
4195 2 : }
4196 :
4197 7 : apply.flush();
4198 719 : }
4199 :
4200 726 : info!(
4201 726 : "GC completed removing {} layers, cutoff {}",
4202 726 : result.layers_removed, new_gc_cutoff
4203 726 : );
4204 :
4205 726 : result.elapsed = now.elapsed()?;
4206 726 : Ok(result)
4207 831 : }
4208 :
4209 : ///
4210 : /// Reconstruct a value, using the given base image and WAL records in 'data'.
4211 : ///
4212 7262202 : async fn reconstruct_value(
4213 7262202 : &self,
4214 7262202 : key: Key,
4215 7262202 : request_lsn: Lsn,
4216 7262202 : mut data: ValueReconstructState,
4217 7262205 : ) -> Result<Bytes, PageReconstructError> {
4218 7262205 : // Perform WAL redo if needed
4219 7262205 : data.records.reverse();
4220 7262205 :
4221 7262205 : // If we have a page image, and no WAL, we're all set
4222 7262205 : if data.records.is_empty() {
4223 4500032 : if let Some((img_lsn, img)) = &data.img {
4224 0 : trace!(
4225 0 : "found page image for key {} at {}, no WAL redo required, req LSN {}",
4226 0 : key,
4227 0 : img_lsn,
4228 0 : request_lsn,
4229 0 : );
4230 4500032 : Ok(img.clone())
4231 : } else {
4232 0 : Err(PageReconstructError::from(anyhow!(
4233 0 : "base image for {key} at {request_lsn} not found"
4234 0 : )))
4235 : }
4236 : } else {
4237 : // We need to do WAL redo.
4238 : //
4239 : // If we don't have a base image, then the oldest WAL record better initialize
4240 : // the page
4241 2762173 : if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
4242 0 : Err(PageReconstructError::from(anyhow!(
4243 0 : "Base image for {} at {} not found, but got {} WAL records",
4244 0 : key,
4245 0 : request_lsn,
4246 0 : data.records.len()
4247 0 : )))
4248 : } else {
4249 2762173 : if data.img.is_some() {
4250 0 : trace!(
4251 0 : "found {} WAL records and a base image for {} at {}, performing WAL redo",
4252 0 : data.records.len(),
4253 0 : key,
4254 0 : request_lsn
4255 0 : );
4256 : } else {
4257 0 : trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
4258 : };
4259 :
4260 2762173 : let last_rec_lsn = data.records.last().unwrap().0;
4261 :
4262 2762173 : let img = match self
4263 2762173 : .walredo_mgr
4264 2762173 : .request_redo(key, request_lsn, data.img, data.records, self.pg_version)
4265 2762173 : .context("Failed to reconstruct a page image:")
4266 : {
4267 2762173 : Ok(img) => img,
4268 0 : Err(e) => return Err(PageReconstructError::from(e)),
4269 : };
4270 :
4271 2762173 : if img.len() == page_cache::PAGE_SZ {
4272 2758635 : let cache = page_cache::get();
4273 2758635 : if let Err(e) = cache
4274 2758635 : .memorize_materialized_page(
4275 2758635 : self.tenant_id,
4276 2758635 : self.timeline_id,
4277 2758635 : key,
4278 2758635 : last_rec_lsn,
4279 2758635 : &img,
4280 2758635 : )
4281 734 : .await
4282 2758635 : .context("Materialized page memoization failed")
4283 : {
4284 0 : return Err(PageReconstructError::from(e));
4285 2758635 : }
4286 3538 : }
4287 :
4288 2762173 : Ok(img)
4289 : }
4290 : }
4291 7262205 : }
4292 :
4293 : /// Download a layer file from remote storage and insert it into the layer map.
4294 : ///
4295 : /// It's safe to call this function for the same layer concurrently. In that case:
4296 : /// - If the layer has already been downloaded, `OK(...)` is returned.
4297 : /// - If the layer is currently being downloaded, we wait until that download succeeded / failed.
4298 : /// - If it succeeded, we return `Ok(...)`.
4299 : /// - If it failed, we or another concurrent caller will initiate a new download attempt.
4300 : ///
4301 : /// Download errors are classified and retried if appropriate by the underlying RemoteTimelineClient function.
4302 : /// It has an internal limit for the maximum number of retries and prints appropriate log messages.
4303 : /// If we exceed the limit, it returns an error, and this function passes it through.
4304 : /// The caller _could_ retry further by themselves by calling this function again, but _should not_ do it.
4305 : /// The reason is that they cannot distinguish permanent errors from temporary ones, whereas
4306 : /// the underlying RemoteTimelineClient can.
4307 : ///
4308 : /// There is no internal timeout or slowness detection.
4309 : /// If the caller has a deadline or needs a timeout, they can simply stop polling:
4310 : /// we're **cancellation-safe** because the download happens in a separate task_mgr task.
4311 : /// So, the current download attempt will run to completion even if we stop polling.
4312 3189 : #[instrument(skip_all, fields(layer=%remote_layer))]
4313 : pub async fn download_remote_layer(
4314 : &self,
4315 : remote_layer: Arc<RemoteLayer>,
4316 : ) -> anyhow::Result<()> {
4317 : span::debug_assert_current_span_has_tenant_and_timeline_id();
4318 :
4319 : use std::sync::atomic::Ordering::Relaxed;
4320 :
4321 : let permit = match Arc::clone(&remote_layer.ongoing_download)
4322 : .acquire_owned()
4323 : .await
4324 : {
4325 : Ok(permit) => permit,
4326 : Err(_closed) => {
4327 : if remote_layer.download_replacement_failure.load(Relaxed) {
4328 : // this path will be hit often, in case there are upper retries. however
4329 : // hitting this error will prevent a busy loop between get_reconstruct_data and
4330 : // download, so an error is prefered.
4331 : //
4332 : // TODO: we really should poison the timeline, but panicking is not yet
4333 : // supported. Related: https://github.com/neondatabase/neon/issues/3621
4334 : anyhow::bail!("an earlier download succeeded but LayerMap::replace failed")
4335 : } else {
4336 18 : info!("download of layer has already finished");
4337 : return Ok(());
4338 : }
4339 : }
4340 : };
4341 :
4342 : let (sender, receiver) = tokio::sync::oneshot::channel();
4343 : // Spawn a task so that download does not outlive timeline when we detach tenant / delete timeline.
4344 : let self_clone = self.myself.upgrade().expect("timeline is gone");
4345 : task_mgr::spawn(
4346 : &tokio::runtime::Handle::current(),
4347 : TaskKind::RemoteDownloadTask,
4348 : Some(self.tenant_id),
4349 : Some(self.timeline_id),
4350 : &format!("download layer {}", remote_layer),
4351 : false,
4352 1043 : async move {
4353 1043 : let remote_client = self_clone.remote_client.as_ref().unwrap();
4354 :
4355 : // Does retries + exponential back-off internally.
4356 : // When this fails, don't layer further retry attempts here.
4357 1043 : let result = remote_client
4358 1043 : .download_layer_file(&remote_layer.filename(), &remote_layer.layer_metadata)
4359 360525 : .await;
4360 :
4361 1040 : if let Ok(size) = &result {
4362 1008 : info!("layer file download finished");
4363 :
4364 : // XXX the temp file is still around in Err() case
4365 : // and consumes space until we clean up upon pageserver restart.
4366 1008 : self_clone.metrics.resident_physical_size_gauge.add(*size);
4367 :
4368 : // Download complete. Replace the RemoteLayer with the corresponding
4369 : // Delta- or ImageLayer in the layer map.
4370 1008 : let mut guard = self_clone.layers.write().await;
4371 1008 : let new_layer =
4372 1008 : remote_layer.create_downloaded_layer(&guard, self_clone.conf, *size);
4373 1008 : {
4374 1008 : let l: Arc<dyn PersistentLayer> = remote_layer.clone();
4375 1008 : let failure = match guard.replace_and_verify(l, new_layer) {
4376 1007 : Ok(()) => false,
4377 1 : Err(e) => {
4378 1 : // this is a precondition failure, the layer filename derived
4379 1 : // attributes didn't match up, which doesn't seem likely.
4380 1 : error!("replacing downloaded layer into layermap failed: {e:#?}");
4381 1 : true
4382 : }
4383 : };
4384 :
4385 1008 : if failure {
4386 1 : // mark the remote layer permanently failed; the timeline is most
4387 1 : // likely unusable after this. sadly we cannot just poison the layermap
4388 1 : // lock with panic, because that would create an issue with shutdown.
4389 1 : //
4390 1 : // this does not change the retry semantics on failed downloads.
4391 1 : //
4392 1 : // use of Relaxed is valid because closing of the semaphore gives
4393 1 : // happens-before and wakes up any waiters; we write this value before
4394 1 : // and any waiters (or would be waiters) will load it after closing
4395 1 : // semaphore.
4396 1 : //
4397 1 : // See: https://github.com/neondatabase/neon/issues/3533
4398 1 : remote_layer
4399 1 : .download_replacement_failure
4400 1 : .store(true, Relaxed);
4401 1007 : }
4402 : }
4403 1008 : drop_wlock(guard);
4404 1008 :
4405 1008 : info!("on-demand download successful");
4406 :
4407 : // Now that we've inserted the download into the layer map,
4408 : // close the semaphore. This will make other waiters for
4409 : // this download return Ok(()).
4410 1008 : assert!(!remote_layer.ongoing_download.is_closed());
4411 1008 : remote_layer.ongoing_download.close();
4412 : } else {
4413 : // Keep semaphore open. We'll drop the permit at the end of the function.
4414 32 : error!(
4415 32 : "layer file download failed: {:?}",
4416 32 : result.as_ref().unwrap_err()
4417 32 : );
4418 : }
4419 :
4420 : // Don't treat it as an error if the task that triggered the download
4421 : // is no longer interested in the result.
4422 1040 : sender.send(result.map(|_sz| ())).ok();
4423 1040 :
4424 1040 : // In case we failed and there are other waiters, this will make one
4425 1040 : // of them retry the download in a new task.
4426 1040 : // XXX: This resets the exponential backoff because it's a new call to
4427 1040 : // download_layer file.
4428 1040 : drop(permit);
4429 1040 :
4430 1040 : Ok(())
4431 1040 : }
4432 : .in_current_span(),
4433 : );
4434 :
4435 : receiver.await.context("download task cancelled")?
4436 : }
4437 :
4438 3 : pub async fn spawn_download_all_remote_layers(
4439 3 : self: Arc<Self>,
4440 3 : request: DownloadRemoteLayersTaskSpawnRequest,
4441 3 : ) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
4442 3 : let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
4443 3 : if let Some(st) = &*status_guard {
4444 1 : match &st.state {
4445 : DownloadRemoteLayersTaskState::Running => {
4446 0 : return Err(st.clone());
4447 : }
4448 : DownloadRemoteLayersTaskState::ShutDown
4449 1 : | DownloadRemoteLayersTaskState::Completed => {
4450 1 : *status_guard = None;
4451 1 : }
4452 : }
4453 2 : }
4454 :
4455 3 : let self_clone = Arc::clone(&self);
4456 3 : let task_id = task_mgr::spawn(
4457 3 : task_mgr::BACKGROUND_RUNTIME.handle(),
4458 3 : task_mgr::TaskKind::DownloadAllRemoteLayers,
4459 3 : Some(self.tenant_id),
4460 3 : Some(self.timeline_id),
4461 3 : "download all remote layers task",
4462 : false,
4463 3 : async move {
4464 36 : self_clone.download_all_remote_layers(request).await;
4465 3 : let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
4466 3 : match &mut *status_guard {
4467 : None => {
4468 0 : warn!("tasks status is supposed to be Some(), since we are running");
4469 : }
4470 3 : Some(st) => {
4471 3 : let exp_task_id = format!("{}", task_mgr::current_task_id().unwrap());
4472 3 : if st.task_id != exp_task_id {
4473 0 : warn!("task id changed while we were still running, expecting {} but have {}", exp_task_id, st.task_id);
4474 3 : } else {
4475 3 : st.state = DownloadRemoteLayersTaskState::Completed;
4476 3 : }
4477 : }
4478 : };
4479 3 : Ok(())
4480 3 : }
4481 3 : .instrument(info_span!(parent: None, "download_all_remote_layers", tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))
4482 : );
4483 :
4484 3 : let initial_info = DownloadRemoteLayersTaskInfo {
4485 3 : task_id: format!("{task_id}"),
4486 3 : state: DownloadRemoteLayersTaskState::Running,
4487 3 : total_layer_count: 0,
4488 3 : successful_download_count: 0,
4489 3 : failed_download_count: 0,
4490 3 : };
4491 3 : *status_guard = Some(initial_info.clone());
4492 3 :
4493 3 : Ok(initial_info)
4494 3 : }
4495 :
4496 3 : async fn download_all_remote_layers(
4497 3 : self: &Arc<Self>,
4498 3 : request: DownloadRemoteLayersTaskSpawnRequest,
4499 3 : ) {
4500 3 : let mut downloads = Vec::new();
4501 : {
4502 3 : let guard = self.layers.read().await;
4503 3 : let layers = guard.layer_map();
4504 3 : layers
4505 3 : .iter_historic_layers()
4506 64 : .map(|l| guard.get_from_desc(&l))
4507 64 : .filter_map(|l| l.downcast_remote_layer())
4508 60 : .map(|l| self.download_remote_layer(l))
4509 60 : .for_each(|dl| downloads.push(dl))
4510 3 : }
4511 3 : let total_layer_count = downloads.len();
4512 3 : // limit download concurrency as specified in request
4513 3 : let downloads = futures::stream::iter(downloads);
4514 3 : let mut downloads = downloads.buffer_unordered(request.max_concurrent_downloads.get());
4515 3 :
4516 3 : macro_rules! lock_status {
4517 3 : ($st:ident) => {
4518 3 : let mut st = self.download_all_remote_layers_task_info.write().unwrap();
4519 3 : let st = st
4520 3 : .as_mut()
4521 3 : .expect("this function is only called after the task has been spawned");
4522 3 : assert_eq!(
4523 3 : st.task_id,
4524 3 : format!(
4525 3 : "{}",
4526 3 : task_mgr::current_task_id().expect("we run inside a task_mgr task")
4527 3 : )
4528 3 : );
4529 3 : let $st = st;
4530 3 : };
4531 3 : }
4532 3 :
4533 3 : {
4534 3 : lock_status!(st);
4535 3 : st.total_layer_count = total_layer_count as u64;
4536 : }
4537 63 : loop {
4538 99 : tokio::select! {
4539 63 : dl = downloads.next() => {
4540 : lock_status!(st);
4541 : match dl {
4542 : None => break,
4543 : Some(Ok(())) => {
4544 : st.successful_download_count += 1;
4545 : },
4546 : Some(Err(e)) => {
4547 30 : error!(error = %e, "layer download failed");
4548 : st.failed_download_count += 1;
4549 : }
4550 : }
4551 : }
4552 : _ = task_mgr::shutdown_watcher() => {
4553 : // Kind of pointless to watch for shutdowns here,
4554 : // as download_remote_layer spawns other task_mgr tasks internally.
4555 : lock_status!(st);
4556 : st.state = DownloadRemoteLayersTaskState::ShutDown;
4557 : }
4558 63 : }
4559 63 : }
4560 : {
4561 3 : lock_status!(st);
4562 3 : st.state = DownloadRemoteLayersTaskState::Completed;
4563 3 : }
4564 3 : }
4565 :
4566 21 : pub fn get_download_all_remote_layers_task_info(&self) -> Option<DownloadRemoteLayersTaskInfo> {
4567 21 : self.download_all_remote_layers_task_info
4568 21 : .read()
4569 21 : .unwrap()
4570 21 : .clone()
4571 21 : }
4572 : }
4573 :
4574 : pub struct DiskUsageEvictionInfo {
4575 : /// Timeline's largest layer (remote or resident)
4576 : pub max_layer_size: Option<u64>,
4577 : /// Timeline's resident layers
4578 : pub resident_layers: Vec<LocalLayerInfoForDiskUsageEviction>,
4579 : }
4580 :
4581 : pub struct LocalLayerInfoForDiskUsageEviction {
4582 : pub layer: Arc<dyn PersistentLayer>,
4583 : pub last_activity_ts: SystemTime,
4584 : }
4585 :
4586 : impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction {
4587 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4588 0 : // format the tv_sec, tv_nsec into rfc3339 in case someone is looking at it
4589 0 : // having to allocate a string to this is bad, but it will rarely be formatted
4590 0 : let ts = chrono::DateTime::<chrono::Utc>::from(self.last_activity_ts);
4591 0 : let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
4592 0 : f.debug_struct("LocalLayerInfoForDiskUsageEviction")
4593 0 : .field("layer", &self.layer)
4594 0 : .field("last_activity", &ts)
4595 0 : .finish()
4596 0 : }
4597 : }
4598 :
4599 : impl LocalLayerInfoForDiskUsageEviction {
4600 250 : pub fn file_size(&self) -> u64 {
4601 250 : self.layer.layer_desc().file_size
4602 250 : }
4603 : }
4604 :
4605 : impl Timeline {
4606 : /// Returns non-remote layers for eviction.
4607 13 : pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
4608 13 : let guard = self.layers.read().await;
4609 13 : let layers = guard.layer_map();
4610 13 :
4611 13 : let mut max_layer_size: Option<u64> = None;
4612 13 : let mut resident_layers = Vec::new();
4613 :
4614 250 : for l in layers.iter_historic_layers() {
4615 250 : let file_size = l.file_size();
4616 250 : max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
4617 250 :
4618 250 : let l = guard.get_from_desc(&l);
4619 250 :
4620 250 : if l.is_remote_layer() {
4621 0 : continue;
4622 250 : }
4623 250 :
4624 250 : let last_activity_ts = l.access_stats().latest_activity().unwrap_or_else(|| {
4625 0 : // We only use this fallback if there's an implementation error.
4626 0 : // `latest_activity` already does rate-limited warn!() log.
4627 0 : debug!(layer=%l, "last_activity returns None, using SystemTime::now");
4628 0 : SystemTime::now()
4629 250 : });
4630 250 :
4631 250 : resident_layers.push(LocalLayerInfoForDiskUsageEviction {
4632 250 : layer: l,
4633 250 : last_activity_ts,
4634 250 : });
4635 : }
4636 :
4637 13 : DiskUsageEvictionInfo {
4638 13 : max_layer_size,
4639 13 : resident_layers,
4640 13 : }
4641 13 : }
4642 : }
4643 :
4644 : type TraversalPathItem = (
4645 : ValueReconstructResult,
4646 : Lsn,
4647 : Box<dyn Send + FnOnce() -> TraversalId>,
4648 : );
4649 :
4650 : /// Helper function for get_reconstruct_data() to add the path of layers traversed
4651 : /// to an error, as anyhow context information.
4652 2 : fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructError {
4653 2 : // We want the original 'msg' to be the outermost context. The outermost context
4654 2 : // is the most high-level information, which also gets propagated to the client.
4655 2 : let mut msg_iter = path
4656 2 : .into_iter()
4657 4 : .map(|(r, c, l)| {
4658 4 : format!(
4659 4 : "layer traversal: result {:?}, cont_lsn {}, layer: {}",
4660 4 : r,
4661 4 : c,
4662 4 : l(),
4663 4 : )
4664 4 : })
4665 2 : .chain(std::iter::once(msg));
4666 2 : // Construct initial message from the first traversed layer
4667 2 : let err = anyhow!(msg_iter.next().unwrap());
4668 2 :
4669 2 : // Append all subsequent traversals, and the error message 'msg', as contexts.
4670 4 : let msg = msg_iter.fold(err, |err, msg| err.context(msg));
4671 2 : PageReconstructError::from(msg)
4672 2 : }
4673 :
4674 : /// Various functions to mutate the timeline.
4675 : // TODO Currently, Deref is used to allow easy access to read methods from this trait.
4676 : // This is probably considered a bad practice in Rust and should be fixed eventually,
4677 : // but will cause large code changes.
4678 : pub struct TimelineWriter<'a> {
4679 : tl: &'a Timeline,
4680 : _write_guard: tokio::sync::MutexGuard<'a, ()>,
4681 : }
4682 :
4683 : impl Deref for TimelineWriter<'_> {
4684 : type Target = Timeline;
4685 :
4686 0 : fn deref(&self) -> &Self::Target {
4687 0 : self.tl
4688 0 : }
4689 : }
4690 :
4691 : impl<'a> TimelineWriter<'a> {
4692 : /// Put a new page version that can be constructed from a WAL record
4693 : ///
4694 : /// This will implicitly extend the relation, if the page is beyond the
4695 : /// current end-of-file.
4696 82545991 : pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
4697 82546079 : self.tl.put_value(key, lsn, value).await
4698 82546068 : }
4699 :
4700 17794 : pub async fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
4701 17794 : self.tl.put_tombstone(key_range, lsn).await
4702 17794 : }
4703 :
4704 : /// Track the end of the latest digested WAL record.
4705 : /// Remember the (end of) last valid WAL record remembered in the timeline.
4706 : ///
4707 : /// Call this after you have finished writing all the WAL up to 'lsn'.
4708 : ///
4709 : /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for
4710 : /// the 'lsn' or anything older. The previous last record LSN is stored alongside
4711 : /// the latest and can be read.
4712 74268468 : pub fn finish_write(&self, new_lsn: Lsn) {
4713 74268468 : self.tl.finish_write(new_lsn);
4714 74268468 : }
4715 :
4716 1094354 : pub fn update_current_logical_size(&self, delta: i64) {
4717 1094354 : self.tl.update_current_logical_size(delta)
4718 1094354 : }
4719 : }
4720 :
4721 : // We need TimelineWriter to be send in upcoming conversion of
4722 : // Timeline::layers to tokio::sync::RwLock.
4723 1 : #[test]
4724 1 : fn is_send() {
4725 1 : fn _assert_send<T: Send>() {}
4726 1 : _assert_send::<TimelineWriter<'_>>();
4727 1 : }
4728 :
4729 : /// Add a suffix to a layer file's name: .{num}.old
4730 : /// Uses the first available num (starts at 0)
4731 2 : fn rename_to_backup(path: &Path) -> anyhow::Result<()> {
4732 2 : let filename = path
4733 2 : .file_name()
4734 2 : .ok_or_else(|| anyhow!("Path {} don't have a file name", path.display()))?
4735 2 : .to_string_lossy();
4736 2 : let mut new_path = path.to_owned();
4737 :
4738 2 : for i in 0u32.. {
4739 2 : new_path.set_file_name(format!("{filename}.{i}.old"));
4740 2 : if !new_path.exists() {
4741 2 : std::fs::rename(path, &new_path)
4742 2 : .with_context(|| format!("rename {path:?} to {new_path:?}"))?;
4743 2 : return Ok(());
4744 0 : }
4745 : }
4746 :
4747 0 : bail!("couldn't find an unused backup number for {:?}", path)
4748 2 : }
4749 :
4750 : /// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
4751 : ///
4752 : /// Returns `true` if the two `Arc` point to the same layer, false otherwise.
4753 : ///
4754 : /// If comparing persistent layers, ALWAYS compare the layer descriptor key.
4755 : #[inline(always)]
4756 7732 : pub fn compare_arced_layers<L: ?Sized>(left: &Arc<L>, right: &Arc<L>) -> bool {
4757 7732 : // "dyn Trait" objects are "fat pointers" in that they have two components:
4758 7732 : // - pointer to the object
4759 7732 : // - pointer to the vtable
4760 7732 : //
4761 7732 : // rust does not provide a guarantee that these vtables are unique, but however
4762 7732 : // `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
4763 7732 : // pointer and the vtable need to be equal.
4764 7732 : //
4765 7732 : // See: https://github.com/rust-lang/rust/issues/103763
4766 7732 : //
4767 7732 : // A future version of rust will most likely use this form below, where we cast each
4768 7732 : // pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
4769 7732 : // not affect the comparison.
4770 7732 : //
4771 7732 : // See: https://github.com/rust-lang/rust/pull/106450
4772 7732 : let left = Arc::as_ptr(left) as *const ();
4773 7732 : let right = Arc::as_ptr(right) as *const ();
4774 7732 :
4775 7732 : left == right
4776 7732 : }
4777 :
4778 : #[cfg(test)]
4779 : mod tests {
4780 : use std::sync::Arc;
4781 :
4782 : use utils::{id::TimelineId, lsn::Lsn};
4783 :
4784 : use crate::tenant::{harness::TenantHarness, storage_layer::PersistentLayer};
4785 :
4786 : use super::{EvictionError, Timeline};
4787 :
4788 1 : #[tokio::test]
4789 1 : async fn two_layer_eviction_attempts_at_the_same_time() {
4790 1 : let harness =
4791 1 : TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
4792 1 :
4793 1 : let ctx = any_context();
4794 1 : let tenant = harness.try_load(&ctx).await.unwrap();
4795 1 : let timeline = tenant
4796 1 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
4797 2 : .await
4798 1 : .unwrap();
4799 1 :
4800 1 : let rc = timeline
4801 1 : .remote_client
4802 1 : .clone()
4803 1 : .expect("just configured this");
4804 :
4805 1 : let layer = find_some_layer(&timeline).await;
4806 :
4807 1 : let cancel = tokio_util::sync::CancellationToken::new();
4808 1 : let batch = [layer];
4809 1 :
4810 1 : let first = {
4811 1 : let cancel = cancel.clone();
4812 1 : async {
4813 1 : timeline
4814 1 : .evict_layer_batch(&rc, &batch, cancel)
4815 0 : .await
4816 1 : .unwrap()
4817 1 : }
4818 : };
4819 1 : let second = async {
4820 1 : timeline
4821 1 : .evict_layer_batch(&rc, &batch, cancel)
4822 0 : .await
4823 1 : .unwrap()
4824 1 : };
4825 :
4826 1 : let (first, second) = tokio::join!(first, second);
4827 :
4828 1 : let (first, second) = (only_one(first), only_one(second));
4829 1 :
4830 1 : match (first, second) {
4831 : (Ok(()), Err(EvictionError::FileNotFound))
4832 1 : | (Err(EvictionError::FileNotFound), Ok(())) => {
4833 1 : // one of the evictions gets to do it,
4834 1 : // other one gets FileNotFound. all is good.
4835 1 : }
4836 0 : other => unreachable!("unexpected {:?}", other),
4837 : }
4838 : }
4839 :
4840 1 : #[tokio::test]
4841 1 : async fn layer_eviction_aba_fails() {
4842 1 : let harness = TenantHarness::create("layer_eviction_aba_fails").unwrap();
4843 1 :
4844 1 : let ctx = any_context();
4845 1 : let tenant = harness.try_load(&ctx).await.unwrap();
4846 1 : let timeline = tenant
4847 1 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
4848 2 : .await
4849 1 : .unwrap();
4850 :
4851 1 : let _e = tracing::info_span!("foobar", tenant_id = %tenant.tenant_id, timeline_id = %timeline.timeline_id).entered();
4852 1 :
4853 1 : let rc = timeline.remote_client.clone().unwrap();
4854 :
4855 : // TenantHarness allows uploads to happen given GenericRemoteStorage is configured
4856 1 : let layer = find_some_layer(&timeline).await;
4857 :
4858 1 : let cancel = tokio_util::sync::CancellationToken::new();
4859 1 : let batch = [layer];
4860 1 :
4861 1 : let first = {
4862 1 : let cancel = cancel.clone();
4863 1 : async {
4864 1 : timeline
4865 1 : .evict_layer_batch(&rc, &batch, cancel)
4866 0 : .await
4867 1 : .unwrap()
4868 1 : }
4869 : };
4870 :
4871 : // lets imagine this is stuck somehow, still referencing the original `Arc<dyn PersistentLayer>`
4872 1 : let second = {
4873 1 : let cancel = cancel.clone();
4874 1 : async {
4875 1 : timeline
4876 1 : .evict_layer_batch(&rc, &batch, cancel)
4877 0 : .await
4878 1 : .unwrap()
4879 1 : }
4880 : };
4881 :
4882 : // while it's stuck, we evict and end up redownloading it
4883 1 : only_one(first.await).expect("eviction succeeded");
4884 :
4885 1 : let layer = find_some_layer(&timeline).await;
4886 1 : let layer = layer.downcast_remote_layer().unwrap();
4887 1 : timeline.download_remote_layer(layer).await.unwrap();
4888 :
4889 1 : let res = only_one(second.await);
4890 :
4891 1 : assert!(
4892 1 : matches!(res, Err(EvictionError::LayerNotFound(_))),
4893 0 : "{res:?}"
4894 : );
4895 :
4896 : // no more specific asserting, outside of preconds this is the only valid replacement
4897 : // failure
4898 : }
4899 :
4900 2 : fn any_context() -> crate::context::RequestContext {
4901 2 : use crate::context::*;
4902 2 : use crate::task_mgr::*;
4903 2 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
4904 2 : }
4905 :
4906 4 : fn only_one<T>(mut input: Vec<Option<T>>) -> T {
4907 4 : assert_eq!(1, input.len());
4908 4 : input
4909 4 : .pop()
4910 4 : .expect("length just checked")
4911 4 : .expect("no cancellation")
4912 4 : }
4913 :
4914 3 : async fn find_some_layer(timeline: &Timeline) -> Arc<dyn PersistentLayer> {
4915 3 : let layers = timeline.layers.read().await;
4916 3 : let desc = layers
4917 3 : .layer_map()
4918 3 : .iter_historic_layers()
4919 3 : .next()
4920 3 : .expect("must find one layer to evict");
4921 3 :
4922 3 : layers.get_from_desc(&desc)
4923 3 : }
4924 : }
|