TLA Line data Source code
1 : pub mod delete;
2 : mod eviction_task;
3 : mod init;
4 : pub mod layer_manager;
5 : pub(crate) mod logical_size;
6 : pub mod span;
7 : pub mod uninit;
8 : mod walreceiver;
9 :
10 : use anyhow::{anyhow, bail, ensure, Context, Result};
11 : use bytes::Bytes;
12 : use camino::{Utf8Path, Utf8PathBuf};
13 : use enumset::EnumSet;
14 : use fail::fail_point;
15 : use itertools::Itertools;
16 : use pageserver_api::{
17 : models::{
18 : DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, LayerMapInfo,
19 : TimelineState,
20 : },
21 : shard::{ShardIdentity, TenantShardId},
22 : };
23 : use rand::Rng;
24 : use serde_with::serde_as;
25 : use storage_broker::BrokerClientChannel;
26 : use tokio::{
27 : runtime::Handle,
28 : sync::{oneshot, watch},
29 : };
30 : use tokio_util::sync::CancellationToken;
31 : use tracing::*;
32 : use utils::sync::gate::Gate;
33 :
34 : use std::collections::{BinaryHeap, HashMap, HashSet};
35 : use std::ops::{Deref, Range};
36 : use std::pin::pin;
37 : use std::sync::atomic::Ordering as AtomicOrdering;
38 : use std::sync::{Arc, Mutex, RwLock, Weak};
39 : use std::time::{Duration, Instant, SystemTime};
40 : use std::{
41 : cmp::{max, min, Ordering},
42 : ops::ControlFlow,
43 : };
44 :
45 : use crate::context::{
46 : AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
47 : };
48 : use crate::tenant::storage_layer::delta_layer::DeltaEntry;
49 : use crate::tenant::storage_layer::{
50 : AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
51 : LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
52 : ValueReconstructState,
53 : };
54 : use crate::tenant::tasks::BackgroundLoopKind;
55 : use crate::tenant::timeline::logical_size::CurrentLogicalSize;
56 : use crate::tenant::{
57 : layer_map::{LayerMap, SearchResult},
58 : metadata::{save_metadata, TimelineMetadata},
59 : par_fsync,
60 : };
61 : use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
62 :
63 : use crate::config::PageServerConf;
64 : use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
65 : use crate::metrics::{
66 : TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
67 : };
68 : use crate::pgdatadir_mapping::LsnForTimestamp;
69 : use crate::pgdatadir_mapping::{is_inherited_key, is_rel_fsm_block_key, is_rel_vm_block_key};
70 : use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
71 : use crate::tenant::config::{EvictionPolicy, TenantConfOpt};
72 : use pageserver_api::reltag::RelTag;
73 : use pageserver_api::shard::ShardIndex;
74 :
75 : use postgres_connection::PgConnectionConfig;
76 : use postgres_ffi::to_pg_timestamp;
77 : use utils::{
78 : completion,
79 : generation::Generation,
80 : id::TimelineId,
81 : lsn::{AtomicLsn, Lsn, RecordLsn},
82 : seqwait::SeqWait,
83 : simple_rcu::{Rcu, RcuReadGuard},
84 : };
85 :
86 : use crate::page_cache;
87 : use crate::repository::GcResult;
88 : use crate::repository::{Key, Value};
89 : use crate::task_mgr;
90 : use crate::task_mgr::TaskKind;
91 : use crate::ZERO_PAGE;
92 :
93 : use self::delete::DeleteTimelineFlow;
94 : pub(super) use self::eviction_task::EvictionTaskTenantState;
95 : use self::eviction_task::EvictionTaskTimelineState;
96 : use self::layer_manager::LayerManager;
97 : use self::logical_size::LogicalSize;
98 : use self::walreceiver::{WalReceiver, WalReceiverConf};
99 :
100 : use super::config::TenantConf;
101 : use super::remote_timeline_client::index::{IndexLayerMetadata, IndexPart};
102 : use super::remote_timeline_client::RemoteTimelineClient;
103 : use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
104 : use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
105 :
106 CBC 42 : #[derive(Debug, PartialEq, Eq, Clone, Copy)]
107 : pub(super) enum FlushLoopState {
108 : NotStarted,
109 : Running {
110 : #[cfg(test)]
111 : expect_initdb_optimization: bool,
112 : #[cfg(test)]
113 : initdb_optimization_count: usize,
114 : },
115 : Exited,
116 : }
117 :
118 : /// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
119 UBC 0 : #[derive(Debug, Clone, PartialEq, Eq)]
120 : pub struct Hole {
121 : key_range: Range<Key>,
122 : coverage_size: usize,
123 : }
124 :
125 : impl Ord for Hole {
126 CBC 228 : fn cmp(&self, other: &Self) -> Ordering {
127 228 : other.coverage_size.cmp(&self.coverage_size) // inverse order
128 228 : }
129 : }
130 :
131 : impl PartialOrd for Hole {
132 228 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
133 228 : Some(self.cmp(other))
134 228 : }
135 : }
136 :
137 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
138 : /// Can be removed after all refactors are done.
139 283 : fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
140 283 : drop(rlock)
141 283 : }
142 :
143 : /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
144 : /// Can be removed after all refactors are done.
145 1690 : fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
146 1690 : drop(rlock)
147 1690 : }
148 :
149 : /// The outward-facing resources required to build a Timeline
150 : pub struct TimelineResources {
151 : pub remote_client: Option<RemoteTimelineClient>,
152 : pub deletion_queue_client: DeletionQueueClient,
153 : }
154 :
155 : pub struct Timeline {
156 : conf: &'static PageServerConf,
157 : tenant_conf: Arc<RwLock<AttachedTenantConf>>,
158 :
159 : myself: Weak<Self>,
160 :
161 : pub(crate) tenant_shard_id: TenantShardId,
162 : pub timeline_id: TimelineId,
163 :
164 : /// The generation of the tenant that instantiated us: this is used for safety when writing remote objects.
165 : /// Never changes for the lifetime of this [`Timeline`] object.
166 : ///
167 : /// This duplicates the generation stored in LocationConf, but that structure is mutable:
168 : /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
169 : pub(crate) generation: Generation,
170 :
171 : /// The detailed sharding information from our parent Tenant. This enables us to map keys
172 : /// to shards, and is constant through the lifetime of this Timeline.
173 : shard_identity: ShardIdentity,
174 :
175 : pub pg_version: u32,
176 :
177 : /// The tuple has two elements.
178 : /// 1. `LayerFileManager` keeps track of the various physical representations of the layer files (inmem, local, remote).
179 : /// 2. `LayerMap`, the acceleration data structure for `get_reconstruct_data`.
180 : ///
181 : /// `LayerMap` maps out the `(PAGE,LSN) / (KEY,LSN)` space, which is composed of `(KeyRange, LsnRange)` rectangles.
182 : /// We describe these rectangles through the `PersistentLayerDesc` struct.
183 : ///
184 : /// When we want to reconstruct a page, we first find the `PersistentLayerDesc`'s that we need for page reconstruction,
185 : /// using `LayerMap`. Then, we use `LayerFileManager` to get the `PersistentLayer`'s that correspond to the
186 : /// `PersistentLayerDesc`'s.
187 : ///
188 : /// Hence, it's important to keep things coherent. The `LayerFileManager` must always have an entry for all
189 : /// `PersistentLayerDesc`'s in the `LayerMap`. If it doesn't, `LayerFileManager::get_from_desc` will panic at
190 : /// runtime, e.g., during page reconstruction.
191 : ///
192 : /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
193 : /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
194 : pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
195 :
196 : /// Set of key ranges which should be covered by image layers to
197 : /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
198 : /// It is used by compaction task when it checks if new image layer should be created.
199 : /// Newly created image layer doesn't help to remove the delta layer, until the
200 : /// newly created image layer falls off the PITR horizon. So on next GC cycle,
201 : /// gc_timeline may still want the new image layer to be created. To avoid redundant
202 : /// image layers creation we should check if image layer exists but beyond PITR horizon.
203 : /// This is why we need remember GC cutoff LSN.
204 : ///
205 : wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
206 :
207 : last_freeze_at: AtomicLsn,
208 : // Atomic would be more appropriate here.
209 : last_freeze_ts: RwLock<Instant>,
210 :
211 : // WAL redo manager
212 : walredo_mgr: Arc<super::WalRedoManager>,
213 :
214 : /// Remote storage client.
215 : /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
216 : pub remote_client: Option<Arc<RemoteTimelineClient>>,
217 :
218 : // What page versions do we hold in the repository? If we get a
219 : // request > last_record_lsn, we need to wait until we receive all
220 : // the WAL up to the request. The SeqWait provides functions for
221 : // that. TODO: If we get a request for an old LSN, such that the
222 : // versions have already been garbage collected away, we should
223 : // throw an error, but we don't track that currently.
224 : //
225 : // last_record_lsn.load().last points to the end of last processed WAL record.
226 : //
227 : // We also remember the starting point of the previous record in
228 : // 'last_record_lsn.load().prev'. It's used to set the xl_prev pointer of the
229 : // first WAL record when the node is started up. But here, we just
230 : // keep track of it.
231 : last_record_lsn: SeqWait<RecordLsn, Lsn>,
232 :
233 : // All WAL records have been processed and stored durably on files on
234 : // local disk, up to this LSN. On crash and restart, we need to re-process
235 : // the WAL starting from this point.
236 : //
237 : // Some later WAL records might have been processed and also flushed to disk
238 : // already, so don't be surprised to see some, but there's no guarantee on
239 : // them yet.
240 : disk_consistent_lsn: AtomicLsn,
241 :
242 : // Parent timeline that this timeline was branched from, and the LSN
243 : // of the branch point.
244 : ancestor_timeline: Option<Arc<Timeline>>,
245 : ancestor_lsn: Lsn,
246 :
247 : pub(super) metrics: TimelineMetrics,
248 :
249 : /// Ensures layers aren't frozen by checkpointer between
250 : /// [`Timeline::get_layer_for_write`] and layer reads.
251 : /// Locked automatically by [`TimelineWriter`] and checkpointer.
252 : /// Must always be acquired before the layer map/individual layer lock
253 : /// to avoid deadlock.
254 : write_lock: tokio::sync::Mutex<()>,
255 :
256 : /// Used to avoid multiple `flush_loop` tasks running
257 : pub(super) flush_loop_state: Mutex<FlushLoopState>,
258 :
259 : /// layer_flush_start_tx can be used to wake up the layer-flushing task.
260 : /// The value is a counter, incremented every time a new flush cycle is requested.
261 : /// The flush cycle counter is sent back on the layer_flush_done channel when
262 : /// the flush finishes. You can use that to wait for the flush to finish.
263 : layer_flush_start_tx: tokio::sync::watch::Sender<u64>,
264 : /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
265 : layer_flush_done_tx: tokio::sync::watch::Sender<(u64, Result<(), FlushLayerError>)>,
266 :
267 : // Needed to ensure that we can't create a branch at a point that was already garbage collected
268 : pub latest_gc_cutoff_lsn: Rcu<Lsn>,
269 :
270 : // List of child timelines and their branch points. This is needed to avoid
271 : // garbage collecting data that is still needed by the child timelines.
272 : pub gc_info: std::sync::RwLock<GcInfo>,
273 :
274 : // It may change across major versions so for simplicity
275 : // keep it after running initdb for a timeline.
276 : // It is needed in checks when we want to error on some operations
277 : // when they are requested for pre-initdb lsn.
278 : // It can be unified with latest_gc_cutoff_lsn under some "first_valid_lsn",
279 : // though let's keep them both for better error visibility.
280 : pub initdb_lsn: Lsn,
281 :
282 : /// When did we last calculate the partitioning?
283 : partitioning: Mutex<(KeyPartitioning, Lsn)>,
284 :
285 : /// Configuration: how often should the partitioning be recalculated.
286 : repartition_threshold: u64,
287 :
288 : /// Current logical size of the "datadir", at the last LSN.
289 : current_logical_size: LogicalSize,
290 :
291 : /// Information about the last processed message by the WAL receiver,
292 : /// or None if WAL receiver has not received anything for this timeline
293 : /// yet.
294 : pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
295 : pub walreceiver: Mutex<Option<WalReceiver>>,
296 :
297 : /// Relation size cache
298 : pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
299 :
300 : download_all_remote_layers_task_info: RwLock<Option<DownloadRemoteLayersTaskInfo>>,
301 :
302 : state: watch::Sender<TimelineState>,
303 :
304 : /// Prevent two tasks from deleting the timeline at the same time. If held, the
305 : /// timeline is being deleted. If 'true', the timeline has already been deleted.
306 : pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
307 :
308 : eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
309 :
310 : /// Load or creation time information about the disk_consistent_lsn and when the loading
311 : /// happened. Used for consumption metrics.
312 : pub(crate) loaded_at: (Lsn, SystemTime),
313 :
314 : /// Gate to prevent shutdown completing while I/O is still happening to this timeline's data
315 : pub(crate) gate: Gate,
316 :
317 : /// Cancellation token scoped to this timeline: anything doing long-running work relating
318 : /// to the timeline should drop out when this token fires.
319 : pub(crate) cancel: CancellationToken,
320 :
321 : /// Make sure we only have one running compaction at a time in tests.
322 : ///
323 : /// Must only be taken in two places:
324 : /// - [`Timeline::compact`] (this file)
325 : /// - [`delete::delete_local_layer_files`]
326 : ///
327 : /// Timeline deletion will acquire both compaction and gc locks in whatever order.
328 : compaction_lock: tokio::sync::Mutex<()>,
329 :
330 : /// Make sure we only have one running gc at a time.
331 : ///
332 : /// Must only be taken in two places:
333 : /// - [`Timeline::gc`] (this file)
334 : /// - [`delete::delete_local_layer_files`]
335 : ///
336 : /// Timeline deletion will acquire both compaction and gc locks in whatever order.
337 : gc_lock: tokio::sync::Mutex<()>,
338 : }
339 :
340 : pub struct WalReceiverInfo {
341 : pub wal_source_connconf: PgConnectionConfig,
342 : pub last_received_msg_lsn: Lsn,
343 : pub last_received_msg_ts: u128,
344 : }
345 :
346 : ///
347 : /// Information about how much history needs to be retained, needed by
348 : /// Garbage Collection.
349 : ///
350 : pub struct GcInfo {
351 : /// Specific LSNs that are needed.
352 : ///
353 : /// Currently, this includes all points where child branches have
354 : /// been forked off from. In the future, could also include
355 : /// explicit user-defined snapshot points.
356 : pub retain_lsns: Vec<Lsn>,
357 :
358 : /// In addition to 'retain_lsns', keep everything newer than this
359 : /// point.
360 : ///
361 : /// This is calculated by subtracting 'gc_horizon' setting from
362 : /// last-record LSN
363 : ///
364 : /// FIXME: is this inclusive or exclusive?
365 : pub horizon_cutoff: Lsn,
366 :
367 : /// In addition to 'retain_lsns' and 'horizon_cutoff', keep everything newer than this
368 : /// point.
369 : ///
370 : /// This is calculated by finding a number such that a record is needed for PITR
371 : /// if only if its LSN is larger than 'pitr_cutoff'.
372 : pub pitr_cutoff: Lsn,
373 : }
374 :
375 : /// An error happened in a get() operation.
376 31 : #[derive(thiserror::Error, Debug)]
377 : pub(crate) enum PageReconstructError {
378 : #[error(transparent)]
379 : Other(#[from] anyhow::Error),
380 :
381 : #[error("Ancestor LSN wait error: {0}")]
382 : AncestorLsnTimeout(#[from] WaitLsnError),
383 :
384 : /// The operation was cancelled
385 : #[error("Cancelled")]
386 : Cancelled,
387 :
388 : /// The ancestor of this is being stopped
389 : #[error("ancestor timeline {0} is being stopped")]
390 : AncestorStopping(TimelineId),
391 :
392 : /// An error happened replaying WAL records
393 : #[error(transparent)]
394 : WalRedo(anyhow::Error),
395 : }
396 :
397 UBC 0 : #[derive(thiserror::Error, Debug)]
398 : enum FlushLayerError {
399 : /// Timeline cancellation token was cancelled
400 : #[error("timeline shutting down")]
401 : Cancelled,
402 :
403 : #[error(transparent)]
404 : PageReconstructError(#[from] PageReconstructError),
405 :
406 : #[error(transparent)]
407 : Other(#[from] anyhow::Error),
408 : }
409 :
410 0 : #[derive(Clone, Copy)]
411 : pub enum LogicalSizeCalculationCause {
412 : Initial,
413 : ConsumptionMetricsSyntheticSize,
414 : EvictionTaskImitation,
415 : TenantSizeHandler,
416 : }
417 :
418 : pub enum GetLogicalSizePriority {
419 : User,
420 : Background,
421 : }
422 :
423 CBC 711 : #[derive(enumset::EnumSetType)]
424 : pub(crate) enum CompactFlags {
425 : ForceRepartition,
426 : }
427 :
428 : impl std::fmt::Debug for Timeline {
429 UBC 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
430 0 : write!(f, "Timeline<{}>", self.timeline_id)
431 0 : }
432 : }
433 :
434 CBC 8 : #[derive(thiserror::Error, Debug)]
435 : pub(crate) enum WaitLsnError {
436 : // Called on a timeline which is shutting down
437 : #[error("Shutdown")]
438 : Shutdown,
439 :
440 : // Called on an timeline not in active state or shutting down
441 : #[error("Bad state (not active)")]
442 : BadState,
443 :
444 : // Timeout expired while waiting for LSN to catch up with goal.
445 : #[error("{0}")]
446 : Timeout(String),
447 : }
448 :
449 : /// Public interface functions
450 : impl Timeline {
451 : /// Get the LSN where this branch was created
452 3643 : pub fn get_ancestor_lsn(&self) -> Lsn {
453 3643 : self.ancestor_lsn
454 3643 : }
455 :
456 : /// Get the ancestor's timeline id
457 5046 : pub fn get_ancestor_timeline_id(&self) -> Option<TimelineId> {
458 5046 : self.ancestor_timeline
459 5046 : .as_ref()
460 5046 : .map(|ancestor| ancestor.timeline_id)
461 5046 : }
462 :
463 : /// Lock and get timeline's GC cutoff
464 3646466 : pub fn get_latest_gc_cutoff_lsn(&self) -> RcuReadGuard<Lsn> {
465 3646466 : self.latest_gc_cutoff_lsn.read()
466 3646466 : }
467 :
468 : /// Look up given page version.
469 : ///
470 : /// If a remote layer file is needed, it is downloaded as part of this
471 : /// call.
472 : ///
473 : /// NOTE: It is considered an error to 'get' a key that doesn't exist. The
474 : /// abstraction above this needs to store suitable metadata to track what
475 : /// data exists with what keys, in separate metadata entries. If a
476 : /// non-existent key is requested, we may incorrectly return a value from
477 : /// an ancestor branch, for example, or waste a lot of cycles chasing the
478 : /// non-existing key.
479 : ///
480 : /// # Cancel-Safety
481 : ///
482 : /// This method is cancellation-safe.
483 5939237 : pub(crate) async fn get(
484 5939237 : &self,
485 5939237 : key: Key,
486 5939237 : lsn: Lsn,
487 5939237 : ctx: &RequestContext,
488 5939237 : ) -> Result<Bytes, PageReconstructError> {
489 5939223 : if !lsn.is_valid() {
490 1 : return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN")));
491 5939222 : }
492 :
493 : // This check is debug-only because of the cost of hashing, and because it's a double-check: we
494 : // already checked the key against the shard_identity when looking up the Timeline from
495 : // page_service.
496 5939222 : debug_assert!(!self.shard_identity.is_key_disposable(&key));
497 :
498 : // XXX: structured stats collection for layer eviction here.
499 UBC 0 : trace!(
500 0 : "get page request for {}@{} from task kind {:?}",
501 0 : key,
502 0 : lsn,
503 0 : ctx.task_kind()
504 0 : );
505 :
506 : // Check the page cache. We will get back the most recent page with lsn <= `lsn`.
507 : // The cached image can be returned directly if there is no WAL between the cached image
508 : // and requested LSN. The cached image can also be used to reduce the amount of WAL needed
509 : // for redo.
510 CBC 5939222 : let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
511 1342450 : Some((cached_lsn, cached_img)) => {
512 1342450 : match cached_lsn.cmp(&lsn) {
513 1265902 : Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
514 : Ordering::Equal => {
515 76548 : MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
516 76548 : return Ok(cached_img); // exact LSN match, return the image
517 : }
518 : Ordering::Greater => {
519 UBC 0 : unreachable!("the returned lsn should never be after the requested lsn")
520 : }
521 : }
522 CBC 1265902 : Some((cached_lsn, cached_img))
523 : }
524 4596772 : None => None,
525 : };
526 :
527 5862674 : let mut reconstruct_state = ValueReconstructState {
528 5862674 : records: Vec::new(),
529 5862674 : img: cached_page_img,
530 5862674 : };
531 5862674 :
532 5862674 : let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
533 5862674 : let path = self
534 5862674 : .get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
535 2017647 : .await?;
536 5861444 : timer.stop_and_record();
537 5861444 :
538 5861444 : let start = Instant::now();
539 5861444 : let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
540 5861444 : let elapsed = start.elapsed();
541 5861444 : crate::metrics::RECONSTRUCT_TIME
542 5861444 : .for_result(&res)
543 5861444 : .observe(elapsed.as_secs_f64());
544 5861444 :
545 5861444 : if cfg!(feature = "testing") && res.is_err() {
546 : // it can only be walredo issue
547 : use std::fmt::Write;
548 :
549 UBC 0 : let mut msg = String::new();
550 0 :
551 0 : path.into_iter().for_each(|(res, cont_lsn, layer)| {
552 0 : writeln!(
553 0 : msg,
554 0 : "- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
555 0 : layer(),
556 0 : )
557 0 : .expect("string grows")
558 0 : });
559 :
560 : // this is to rule out or provide evidence that we could in some cases read a duplicate
561 : // walrecord
562 0 : tracing::info!("walredo failed, path:\n{msg}");
563 CBC 5861444 : }
564 :
565 5861444 : res
566 5939180 : }
567 :
568 : /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
569 6341670 : pub fn get_last_record_lsn(&self) -> Lsn {
570 6341670 : self.last_record_lsn.load().last
571 6341670 : }
572 :
573 2993 : pub fn get_prev_record_lsn(&self) -> Lsn {
574 2993 : self.last_record_lsn.load().prev
575 2993 : }
576 :
577 : /// Atomically get both last and prev.
578 917 : pub fn get_last_record_rlsn(&self) -> RecordLsn {
579 917 : self.last_record_lsn.load()
580 917 : }
581 :
582 570784 : pub fn get_disk_consistent_lsn(&self) -> Lsn {
583 570784 : self.disk_consistent_lsn.load()
584 570784 : }
585 :
586 : /// remote_consistent_lsn from the perspective of the tenant's current generation,
587 : /// not validated with control plane yet.
588 : /// See [`Self::get_remote_consistent_lsn_visible`].
589 2993 : pub fn get_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
590 2993 : if let Some(remote_client) = &self.remote_client {
591 2993 : remote_client.remote_consistent_lsn_projected()
592 : } else {
593 UBC 0 : None
594 : }
595 CBC 2993 : }
596 :
597 : /// remote_consistent_lsn which the tenant is guaranteed not to go backward from,
598 : /// i.e. a value of remote_consistent_lsn_projected which has undergone
599 : /// generation validation in the deletion queue.
600 568864 : pub fn get_remote_consistent_lsn_visible(&self) -> Option<Lsn> {
601 568864 : if let Some(remote_client) = &self.remote_client {
602 568864 : remote_client.remote_consistent_lsn_visible()
603 : } else {
604 UBC 0 : None
605 : }
606 CBC 568864 : }
607 :
608 : /// The sum of the file size of all historic layers in the layer map.
609 : /// This method makes no distinction between local and remote layers.
610 : /// Hence, the result **does not represent local filesystem usage**.
611 3443 : pub async fn layer_size_sum(&self) -> u64 {
612 3443 : let guard = self.layers.read().await;
613 3443 : let layer_map = guard.layer_map();
614 3443 : let mut size = 0;
615 363345 : for l in layer_map.iter_historic_layers() {
616 363345 : size += l.file_size();
617 363345 : }
618 3443 : size
619 3443 : }
620 :
621 18 : pub fn resident_physical_size(&self) -> u64 {
622 18 : self.metrics.resident_physical_size_get()
623 18 : }
624 :
625 : ///
626 : /// Wait until WAL has been received and processed up to this LSN.
627 : ///
628 : /// You should call this before any of the other get_* or list_* functions. Calling
629 : /// those functions with an LSN that has been processed yet is an error.
630 : ///
631 1419461 : pub(crate) async fn wait_lsn(
632 1419461 : &self,
633 1419461 : lsn: Lsn,
634 1419461 : _ctx: &RequestContext, /* Prepare for use by cancellation */
635 1419461 : ) -> Result<(), WaitLsnError> {
636 1419426 : if self.cancel.is_cancelled() {
637 UBC 0 : return Err(WaitLsnError::Shutdown);
638 CBC 1419426 : } else if !self.is_active() {
639 UBC 0 : return Err(WaitLsnError::BadState);
640 CBC 1419426 : }
641 :
642 : // This should never be called from the WAL receiver, because that could lead
643 : // to a deadlock.
644 : debug_assert!(
645 1419426 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverManager),
646 UBC 0 : "wait_lsn cannot be called in WAL receiver"
647 : );
648 : debug_assert!(
649 CBC 1419426 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionHandler),
650 UBC 0 : "wait_lsn cannot be called in WAL receiver"
651 : );
652 : debug_assert!(
653 CBC 1419426 : task_mgr::current_task_kind() != Some(TaskKind::WalReceiverConnectionPoller),
654 UBC 0 : "wait_lsn cannot be called in WAL receiver"
655 : );
656 :
657 CBC 1419426 : let _timer = crate::metrics::WAIT_LSN_TIME.start_timer();
658 1419426 :
659 1419426 : match self
660 1419426 : .last_record_lsn
661 1419426 : .wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
662 103451 : .await
663 : {
664 1419422 : Ok(()) => Ok(()),
665 4 : Err(e) => {
666 4 : use utils::seqwait::SeqWaitError::*;
667 4 : match e {
668 UBC 0 : Shutdown => Err(WaitLsnError::Shutdown),
669 : Timeout => {
670 : // don't count the time spent waiting for lock below, and also in walreceiver.status(), towards the wait_lsn_time_histo
671 CBC 4 : drop(_timer);
672 4 : let walreceiver_status = self.walreceiver_status();
673 4 : Err(WaitLsnError::Timeout(format!(
674 4 : "Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}, WalReceiver status: {}",
675 4 : lsn,
676 4 : self.get_last_record_lsn(),
677 4 : self.get_disk_consistent_lsn(),
678 4 : walreceiver_status,
679 4 : )))
680 : }
681 : }
682 : }
683 : }
684 1419426 : }
685 :
686 2997 : pub(crate) fn walreceiver_status(&self) -> String {
687 2997 : match &*self.walreceiver.lock().unwrap() {
688 110 : None => "stopping or stopped".to_string(),
689 2887 : Some(walreceiver) => match walreceiver.status() {
690 1812 : Some(status) => status.to_human_readable_string(),
691 1075 : None => "Not active".to_string(),
692 : },
693 : }
694 2997 : }
695 :
696 : /// Check that it is valid to request operations with that lsn.
697 538 : pub fn check_lsn_is_in_scope(
698 538 : &self,
699 538 : lsn: Lsn,
700 538 : latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
701 538 : ) -> anyhow::Result<()> {
702 538 : ensure!(
703 538 : lsn >= **latest_gc_cutoff_lsn,
704 9 : "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
705 9 : lsn,
706 9 : **latest_gc_cutoff_lsn,
707 : );
708 529 : Ok(())
709 538 : }
710 :
711 : /// Flush to disk all data that was written with the put_* functions
712 1561 : #[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
713 : pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
714 : self.freeze_inmem_layer(false).await;
715 : self.flush_frozen_layers_and_wait().await
716 : }
717 :
718 : /// Outermost timeline compaction operation; downloads needed layers.
719 1375 : pub(crate) async fn compact(
720 1375 : self: &Arc<Self>,
721 1375 : cancel: &CancellationToken,
722 1375 : flags: EnumSet<CompactFlags>,
723 1375 : ctx: &RequestContext,
724 1375 : ) -> Result<(), CompactionError> {
725 1375 : // most likely the cancellation token is from background task, but in tests it could be the
726 1375 : // request task as well.
727 1375 :
728 1375 : let prepare = async move {
729 1375 : let guard = self.compaction_lock.lock().await;
730 :
731 1375 : let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
732 1375 : BackgroundLoopKind::Compaction,
733 1375 : ctx,
734 1375 : )
735 UBC 0 : .await;
736 :
737 CBC 1375 : (guard, permit)
738 1375 : };
739 :
740 : // this wait probably never needs any "long time spent" logging, because we already nag if
741 : // compaction task goes over it's period (20s) which is quite often in production.
742 1375 : let (_guard, _permit) = tokio::select! {
743 1375 : tuple = prepare => { tuple },
744 : _ = self.cancel.cancelled() => return Ok(()),
745 : _ = cancel.cancelled() => return Ok(()),
746 : };
747 :
748 1375 : let last_record_lsn = self.get_last_record_lsn();
749 1375 :
750 1375 : // Last record Lsn could be zero in case the timeline was just created
751 1375 : if !last_record_lsn.is_valid() {
752 UBC 0 : warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
753 0 : return Ok(());
754 CBC 1375 : }
755 1375 :
756 1375 : // High level strategy for compaction / image creation:
757 1375 : //
758 1375 : // 1. First, calculate the desired "partitioning" of the
759 1375 : // currently in-use key space. The goal is to partition the
760 1375 : // key space into roughly fixed-size chunks, but also take into
761 1375 : // account any existing image layers, and try to align the
762 1375 : // chunk boundaries with the existing image layers to avoid
763 1375 : // too much churn. Also try to align chunk boundaries with
764 1375 : // relation boundaries. In principle, we don't know about
765 1375 : // relation boundaries here, we just deal with key-value
766 1375 : // pairs, and the code in pgdatadir_mapping.rs knows how to
767 1375 : // map relations into key-value pairs. But in practice we know
768 1375 : // that 'field6' is the block number, and the fields 1-5
769 1375 : // identify a relation. This is just an optimization,
770 1375 : // though.
771 1375 : //
772 1375 : // 2. Once we know the partitioning, for each partition,
773 1375 : // decide if it's time to create a new image layer. The
774 1375 : // criteria is: there has been too much "churn" since the last
775 1375 : // image layer? The "churn" is fuzzy concept, it's a
776 1375 : // combination of too many delta files, or too much WAL in
777 1375 : // total in the delta file. Or perhaps: if creating an image
778 1375 : // file would allow to delete some older files.
779 1375 : //
780 1375 : // 3. After that, we compact all level0 delta files if there
781 1375 : // are too many of them. While compacting, we also garbage
782 1375 : // collect any page versions that are no longer needed because
783 1375 : // of the new image layers we created in step 2.
784 1375 : //
785 1375 : // TODO: This high level strategy hasn't been implemented yet.
786 1375 : // Below are functions compact_level0() and create_image_layers()
787 1375 : // but they are a bit ad hoc and don't quite work like it's explained
788 1375 : // above. Rewrite it.
789 1375 :
790 1375 : // Is the timeline being deleted?
791 1375 : if self.is_stopping() {
792 UBC 0 : trace!("Dropping out of compaction on timeline shutdown");
793 0 : return Err(CompactionError::ShuttingDown);
794 CBC 1375 : }
795 1375 :
796 1375 : let target_file_size = self.get_checkpoint_distance();
797 1375 :
798 1375 : // Define partitioning schema if needed
799 1375 :
800 1375 : // FIXME: the match should only cover repartitioning, not the next steps
801 1375 : match self
802 1375 : .repartition(
803 1375 : self.get_last_record_lsn(),
804 1375 : self.get_compaction_target_size(),
805 1375 : flags,
806 1375 : ctx,
807 1375 : )
808 139100 : .await
809 : {
810 1372 : Ok((partitioning, lsn)) => {
811 1372 : // Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
812 1372 : let image_ctx = RequestContextBuilder::extend(ctx)
813 1372 : .access_stats_behavior(AccessStatsBehavior::Skip)
814 1372 : .build();
815 1372 :
816 1372 : // 2. Compact
817 1372 : let timer = self.metrics.compact_time_histo.start_timer();
818 270040 : self.compact_level0(target_file_size, ctx).await?;
819 1368 : timer.stop_and_record();
820 :
821 : // 3. Create new image layers for partitions that have been modified
822 : // "enough".
823 1368 : let layers = self
824 1368 : .create_image_layers(&partitioning, lsn, false, &image_ctx)
825 49448 : .await
826 1364 : .map_err(anyhow::Error::from)?;
827 1364 : if let Some(remote_client) = &self.remote_client {
828 6958 : for layer in layers {
829 5594 : remote_client.schedule_layer_file_upload(layer)?;
830 : }
831 UBC 0 : }
832 :
833 CBC 1364 : if let Some(remote_client) = &self.remote_client {
834 : // should any new image layer been created, not uploading index_part will
835 : // result in a mismatch between remote_physical_size and layermap calculated
836 : // size, which will fail some tests, but should not be an issue otherwise.
837 1364 : remote_client.schedule_index_upload_for_file_changes()?;
838 UBC 0 : }
839 : }
840 CBC 1 : Err(err) => {
841 1 : // no partitioning? This is normal, if the timeline was just created
842 1 : // as an empty timeline. Also in unit tests, when we use the timeline
843 1 : // as a simple key-value store, ignoring the datadir layout. Log the
844 1 : // error but continue.
845 1 : //
846 1 : // Suppress error when it's due to cancellation
847 1 : if !self.cancel.is_cancelled() {
848 LBC (2) : error!("could not compact, repartitioning keyspace failed: {err:?}");
849 CBC 1 : }
850 : }
851 : };
852 :
853 1365 : Ok(())
854 1365 : }
855 :
856 : /// Mutate the timeline with a [`TimelineWriter`].
857 1940253 : pub async fn writer(&self) -> TimelineWriter<'_> {
858 1940252 : TimelineWriter {
859 1940252 : tl: self,
860 1940252 : _write_guard: self.write_lock.lock().await,
861 : }
862 1940252 : }
863 :
864 : /// Check if more than 'checkpoint_distance' of WAL has been accumulated in
865 : /// the in-memory layer, and initiate flushing it if so.
866 : ///
867 : /// Also flush after a period of time without new data -- it helps
868 : /// safekeepers to regard pageserver as caught up and suspend activity.
869 565871 : pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
870 565871 : let last_lsn = self.get_last_record_lsn();
871 565114 : let open_layer_size = {
872 565871 : let guard = self.layers.read().await;
873 565871 : let layers = guard.layer_map();
874 565871 : let Some(open_layer) = layers.open_layer.as_ref() else {
875 757 : return Ok(());
876 : };
877 565114 : open_layer.size().await?
878 : };
879 565114 : let last_freeze_at = self.last_freeze_at.load();
880 565114 : let last_freeze_ts = *(self.last_freeze_ts.read().unwrap());
881 565114 : let distance = last_lsn.widening_sub(last_freeze_at);
882 565114 : // Checkpointing the open layer can be triggered by layer size or LSN range.
883 565114 : // S3 has a 5 GB limit on the size of one upload (without multi-part upload), and
884 565114 : // we want to stay below that with a big margin. The LSN distance determines how
885 565114 : // much WAL the safekeepers need to store.
886 565114 : if distance >= self.get_checkpoint_distance().into()
887 563672 : || open_layer_size > self.get_checkpoint_distance()
888 562012 : || (distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout())
889 : {
890 3102 : info!(
891 3102 : "check_checkpoint_distance {}, layer size {}, elapsed since last flush {:?}",
892 3102 : distance,
893 3102 : open_layer_size,
894 3102 : last_freeze_ts.elapsed()
895 3102 : );
896 :
897 3102 : self.freeze_inmem_layer(true).await;
898 3102 : self.last_freeze_at.store(last_lsn);
899 3102 : *(self.last_freeze_ts.write().unwrap()) = Instant::now();
900 3102 :
901 3102 : // Wake up the layer flusher
902 3102 : self.flush_frozen_layers();
903 562012 : }
904 565114 : Ok(())
905 565871 : }
906 :
907 1102 : pub fn activate(
908 1102 : self: &Arc<Self>,
909 1102 : broker_client: BrokerClientChannel,
910 1102 : background_jobs_can_start: Option<&completion::Barrier>,
911 1102 : ctx: &RequestContext,
912 1102 : ) {
913 1102 : self.spawn_initial_logical_size_computation_task(ctx);
914 1102 : self.launch_wal_receiver(ctx, broker_client);
915 1102 : self.set_state(TimelineState::Active);
916 1102 : self.launch_eviction_task(background_jobs_can_start);
917 1102 : }
918 :
919 : /// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
920 : /// also to remote storage. This method can easily take multiple seconds for a busy timeline.
921 : ///
922 : /// While we are flushing, we continue to accept read I/O.
923 243 : #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
924 : pub(crate) async fn flush_and_shutdown(&self) {
925 : debug_assert_current_span_has_tenant_and_timeline_id();
926 :
927 : // Stop ingesting data, so that we are not still writing to an InMemoryLayer while
928 : // trying to flush
929 UBC 0 : tracing::debug!("Waiting for WalReceiverManager...");
930 : task_mgr::shutdown_tasks(
931 : Some(TaskKind::WalReceiverManager),
932 : Some(self.tenant_shard_id),
933 : Some(self.timeline_id),
934 : )
935 : .await;
936 :
937 : // Since we have shut down WAL ingest, we should not let anyone start waiting for the LSN to advance
938 : self.last_record_lsn.shutdown();
939 :
940 : // now all writers to InMemory layer are gone, do the final flush if requested
941 : match self.freeze_and_flush().await {
942 : Ok(_) => {
943 : // drain the upload queue
944 : if let Some(client) = self.remote_client.as_ref() {
945 : // if we did not wait for completion here, it might be our shutdown process
946 : // didn't wait for remote uploads to complete at all, as new tasks can forever
947 : // be spawned.
948 : //
949 : // what is problematic is the shutting down of RemoteTimelineClient, because
950 : // obviously it does not make sense to stop while we wait for it, but what
951 : // about corner cases like s3 suddenly hanging up?
952 : if let Err(e) = client.shutdown().await {
953 : // Non-fatal. Shutdown is infallible. Failures to flush just mean that
954 : // we have some extra WAL replay to do next time the timeline starts.
955 0 : warn!("failed to flush to remote storage: {e:#}");
956 : }
957 : }
958 : }
959 : Err(e) => {
960 : // Non-fatal. Shutdown is infallible. Failures to flush just mean that
961 : // we have some extra WAL replay to do next time the timeline starts.
962 CBC 42 : warn!("failed to freeze and flush: {e:#}");
963 : }
964 : }
965 :
966 : self.shutdown().await;
967 : }
968 :
969 : /// Shut down immediately, without waiting for any open layers to flush to disk. This is a subset of
970 : /// the graceful [`Timeline::flush_and_shutdown`] function.
971 500 : pub(crate) async fn shutdown(&self) {
972 : // Signal any subscribers to our cancellation token to drop out
973 UBC 0 : tracing::debug!("Cancelling CancellationToken");
974 CBC 500 : self.cancel.cancel();
975 500 :
976 500 : // Page request handlers might be waiting for LSN to advance: they do not respect Timeline::cancel
977 500 : // while doing so.
978 500 : self.last_record_lsn.shutdown();
979 500 :
980 500 : // Shut down the layer flush task before the remote client, as one depends on the other
981 500 : task_mgr::shutdown_tasks(
982 500 : Some(TaskKind::LayerFlushTask),
983 500 : Some(self.tenant_shard_id),
984 500 : Some(self.timeline_id),
985 500 : )
986 425 : .await;
987 :
988 : // Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
989 : // case our caller wants to use that for a deletion
990 500 : if let Some(remote_client) = self.remote_client.as_ref() {
991 500 : match remote_client.stop() {
992 500 : Ok(()) => {}
993 UBC 0 : Err(StopError::QueueUninitialized) => {
994 0 : // Shutting down during initialization is legal
995 0 : }
996 : }
997 0 : }
998 :
999 0 : tracing::debug!("Waiting for tasks...");
1000 :
1001 CBC 598 : task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;
1002 :
1003 : // Finally wait until any gate-holders are complete
1004 500 : self.gate.close().await;
1005 500 : }
1006 :
1007 1872 : pub fn set_state(&self, new_state: TimelineState) {
1008 1872 : match (self.current_state(), new_state) {
1009 1872 : (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => {
1010 109 : info!("Ignoring new state, equal to the existing one: {equal_state_2:?}");
1011 : }
1012 UBC 0 : (st, TimelineState::Loading) => {
1013 0 : error!("ignoring transition from {st:?} into Loading state");
1014 : }
1015 CBC 7 : (TimelineState::Broken { .. }, new_state) => {
1016 7 : error!("Ignoring state update {new_state:?} for broken timeline");
1017 : }
1018 : (TimelineState::Stopping, TimelineState::Active) => {
1019 UBC 0 : error!("Not activating a Stopping timeline");
1020 : }
1021 CBC 1756 : (_, new_state) => {
1022 1756 : self.state.send_replace(new_state);
1023 1756 : }
1024 : }
1025 1872 : }
1026 :
1027 18 : pub fn set_broken(&self, reason: String) {
1028 18 : let backtrace_str: String = format!("{}", std::backtrace::Backtrace::force_capture());
1029 18 : let broken_state = TimelineState::Broken {
1030 18 : reason,
1031 18 : backtrace: backtrace_str,
1032 18 : };
1033 18 : self.set_state(broken_state);
1034 18 :
1035 18 : // Although the Broken state is not equivalent to shutdown() (shutdown will be called
1036 18 : // later when this tenant is detach or the process shuts down), firing the cancellation token
1037 18 : // here avoids the need for other tasks to watch for the Broken state explicitly.
1038 18 : self.cancel.cancel();
1039 18 : }
1040 :
1041 1959846 : pub fn current_state(&self) -> TimelineState {
1042 1959846 : self.state.borrow().clone()
1043 1959846 : }
1044 :
1045 780 : pub fn is_broken(&self) -> bool {
1046 780 : matches!(&*self.state.borrow(), TimelineState::Broken { .. })
1047 780 : }
1048 :
1049 1431185 : pub fn is_active(&self) -> bool {
1050 1431185 : self.current_state() == TimelineState::Active
1051 1431185 : }
1052 :
1053 2363 : pub fn is_stopping(&self) -> bool {
1054 2363 : self.current_state() == TimelineState::Stopping
1055 2363 : }
1056 :
1057 1102 : pub fn subscribe_for_state_updates(&self) -> watch::Receiver<TimelineState> {
1058 1102 : self.state.subscribe()
1059 1102 : }
1060 :
1061 1199007 : pub async fn wait_to_become_active(
1062 1199007 : &self,
1063 1199007 : _ctx: &RequestContext, // Prepare for use by cancellation
1064 1199007 : ) -> Result<(), TimelineState> {
1065 1198972 : let mut receiver = self.state.subscribe();
1066 1198999 : loop {
1067 1198999 : let current_state = receiver.borrow().clone();
1068 1198999 : match current_state {
1069 : TimelineState::Loading => {
1070 27 : receiver
1071 27 : .changed()
1072 27 : .await
1073 27 : .expect("holding a reference to self");
1074 : }
1075 : TimelineState::Active { .. } => {
1076 1198968 : return Ok(());
1077 : }
1078 : TimelineState::Broken { .. } | TimelineState::Stopping => {
1079 : // There's no chance the timeline can transition back into ::Active
1080 4 : return Err(current_state);
1081 : }
1082 : }
1083 : }
1084 1198972 : }
1085 :
1086 83 : pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
1087 83 : let guard = self.layers.read().await;
1088 83 : let layer_map = guard.layer_map();
1089 83 : let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
1090 83 : if let Some(open_layer) = &layer_map.open_layer {
1091 8 : in_memory_layers.push(open_layer.info());
1092 75 : }
1093 83 : for frozen_layer in &layer_map.frozen_layers {
1094 UBC 0 : in_memory_layers.push(frozen_layer.info());
1095 0 : }
1096 :
1097 CBC 83 : let mut historic_layers = Vec::new();
1098 2974 : for historic_layer in layer_map.iter_historic_layers() {
1099 2974 : let historic_layer = guard.get_from_desc(&historic_layer);
1100 2974 : historic_layers.push(historic_layer.info(reset));
1101 2974 : }
1102 :
1103 83 : LayerMapInfo {
1104 83 : in_memory_layers,
1105 83 : historic_layers,
1106 83 : }
1107 83 : }
1108 :
1109 2 : #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
1110 : pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
1111 : let Some(layer) = self.find_layer(layer_file_name).await else {
1112 : return Ok(None);
1113 : };
1114 :
1115 : if self.remote_client.is_none() {
1116 : return Ok(Some(false));
1117 : }
1118 :
1119 : layer.download().await?;
1120 :
1121 : Ok(Some(true))
1122 : }
1123 :
1124 : /// Evict just one layer.
1125 : ///
1126 : /// Returns `Ok(None)` in the case where the layer could not be found by its `layer_file_name`.
1127 2245 : pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
1128 2245 : let _gate = self
1129 2245 : .gate
1130 2245 : .enter()
1131 2245 : .map_err(|_| anyhow::anyhow!("Shutting down"))?;
1132 :
1133 2245 : let Some(local_layer) = self.find_layer(layer_file_name).await else {
1134 UBC 0 : return Ok(None);
1135 : };
1136 :
1137 CBC 2245 : let rtc = self
1138 2245 : .remote_client
1139 2245 : .as_ref()
1140 2245 : .ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?;
1141 :
1142 2245 : match local_layer.evict_and_wait(rtc).await {
1143 2245 : Ok(()) => Ok(Some(true)),
1144 UBC 0 : Err(EvictionError::NotFound) => Ok(Some(false)),
1145 0 : Err(EvictionError::Downloaded) => Ok(Some(false)),
1146 : }
1147 CBC 2245 : }
1148 : }
1149 :
1150 : /// Number of times we will compute partition within a checkpoint distance.
1151 : const REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE: u64 = 10;
1152 :
1153 : // Private functions
1154 : impl Timeline {
1155 1131451 : fn get_checkpoint_distance(&self) -> u64 {
1156 1131451 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1157 1131451 : tenant_conf
1158 1131451 : .checkpoint_distance
1159 1131451 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_distance)
1160 1131451 : }
1161 :
1162 562012 : fn get_checkpoint_timeout(&self) -> Duration {
1163 562012 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1164 562012 : tenant_conf
1165 562012 : .checkpoint_timeout
1166 562012 : .unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
1167 562012 : }
1168 :
1169 1419 : fn get_compaction_target_size(&self) -> u64 {
1170 1419 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1171 1419 : tenant_conf
1172 1419 : .compaction_target_size
1173 1419 : .unwrap_or(self.conf.default_tenant_conf.compaction_target_size)
1174 1419 : }
1175 :
1176 1372 : fn get_compaction_threshold(&self) -> usize {
1177 1372 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1178 1372 : tenant_conf
1179 1372 : .compaction_threshold
1180 1372 : .unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
1181 1372 : }
1182 :
1183 35656 : fn get_image_creation_threshold(&self) -> usize {
1184 35656 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1185 35656 : tenant_conf
1186 35656 : .image_creation_threshold
1187 35656 : .unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
1188 35656 : }
1189 :
1190 2484 : fn get_eviction_policy(&self) -> EvictionPolicy {
1191 2484 : let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf;
1192 2484 : tenant_conf
1193 2484 : .eviction_policy
1194 2484 : .unwrap_or(self.conf.default_tenant_conf.eviction_policy)
1195 2484 : }
1196 :
1197 1342 : fn get_evictions_low_residence_duration_metric_threshold(
1198 1342 : tenant_conf: &TenantConfOpt,
1199 1342 : default_tenant_conf: &TenantConf,
1200 1342 : ) -> Duration {
1201 1342 : tenant_conf
1202 1342 : .evictions_low_residence_duration_metric_threshold
1203 1342 : .unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
1204 1342 : }
1205 :
1206 13131 : fn get_gc_feedback(&self) -> bool {
1207 13131 : let tenant_conf = &self.tenant_conf.read().unwrap().tenant_conf;
1208 13131 : tenant_conf
1209 13131 : .gc_feedback
1210 13131 : .unwrap_or(self.conf.default_tenant_conf.gc_feedback)
1211 13131 : }
1212 :
1213 52 : pub(super) fn tenant_conf_updated(&self) {
1214 52 : // NB: Most tenant conf options are read by background loops, so,
1215 52 : // changes will automatically be picked up.
1216 52 :
1217 52 : // The threshold is embedded in the metric. So, we need to update it.
1218 52 : {
1219 52 : let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
1220 52 : &self.tenant_conf.read().unwrap().tenant_conf,
1221 52 : &self.conf.default_tenant_conf,
1222 52 : );
1223 52 :
1224 52 : let tenant_id_str = self.tenant_shard_id.tenant_id.to_string();
1225 52 : let shard_id_str = format!("{}", self.tenant_shard_id.shard_slug());
1226 52 :
1227 52 : let timeline_id_str = self.timeline_id.to_string();
1228 52 : self.metrics
1229 52 : .evictions_with_low_residence_duration
1230 52 : .write()
1231 52 : .unwrap()
1232 52 : .change_threshold(
1233 52 : &tenant_id_str,
1234 52 : &shard_id_str,
1235 52 : &timeline_id_str,
1236 52 : new_threshold,
1237 52 : );
1238 52 : }
1239 52 : }
1240 :
1241 : /// Open a Timeline handle.
1242 : ///
1243 : /// Loads the metadata for the timeline into memory, but not the layer map.
1244 : #[allow(clippy::too_many_arguments)]
1245 1290 : pub(super) fn new(
1246 1290 : conf: &'static PageServerConf,
1247 1290 : tenant_conf: Arc<RwLock<AttachedTenantConf>>,
1248 1290 : metadata: &TimelineMetadata,
1249 1290 : ancestor: Option<Arc<Timeline>>,
1250 1290 : timeline_id: TimelineId,
1251 1290 : tenant_shard_id: TenantShardId,
1252 1290 : generation: Generation,
1253 1290 : shard_identity: ShardIdentity,
1254 1290 : walredo_mgr: Arc<super::WalRedoManager>,
1255 1290 : resources: TimelineResources,
1256 1290 : pg_version: u32,
1257 1290 : state: TimelineState,
1258 1290 : cancel: CancellationToken,
1259 1290 : ) -> Arc<Self> {
1260 1290 : let disk_consistent_lsn = metadata.disk_consistent_lsn();
1261 1290 : let (state, _) = watch::channel(state);
1262 1290 :
1263 1290 : let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
1264 1290 : let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
1265 1290 :
1266 1290 : let tenant_conf_guard = tenant_conf.read().unwrap();
1267 1290 :
1268 1290 : let evictions_low_residence_duration_metric_threshold =
1269 1290 : Self::get_evictions_low_residence_duration_metric_threshold(
1270 1290 : &tenant_conf_guard.tenant_conf,
1271 1290 : &conf.default_tenant_conf,
1272 1290 : );
1273 1290 : drop(tenant_conf_guard);
1274 1290 :
1275 1290 : Arc::new_cyclic(|myself| {
1276 1290 : let mut result = Timeline {
1277 1290 : conf,
1278 1290 : tenant_conf,
1279 1290 : myself: myself.clone(),
1280 1290 : timeline_id,
1281 1290 : tenant_shard_id,
1282 1290 : generation,
1283 1290 : shard_identity,
1284 1290 : pg_version,
1285 1290 : layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())),
1286 1290 : wanted_image_layers: Mutex::new(None),
1287 1290 :
1288 1290 : walredo_mgr,
1289 1290 : walreceiver: Mutex::new(None),
1290 1290 :
1291 1290 : remote_client: resources.remote_client.map(Arc::new),
1292 1290 :
1293 1290 : // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
1294 1290 : last_record_lsn: SeqWait::new(RecordLsn {
1295 1290 : last: disk_consistent_lsn,
1296 1290 : prev: metadata.prev_record_lsn().unwrap_or(Lsn(0)),
1297 1290 : }),
1298 1290 : disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
1299 1290 :
1300 1290 : last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
1301 1290 : last_freeze_ts: RwLock::new(Instant::now()),
1302 1290 :
1303 1290 : loaded_at: (disk_consistent_lsn, SystemTime::now()),
1304 1290 :
1305 1290 : ancestor_timeline: ancestor,
1306 1290 : ancestor_lsn: metadata.ancestor_lsn(),
1307 1290 :
1308 1290 : metrics: TimelineMetrics::new(
1309 1290 : &tenant_shard_id,
1310 1290 : &timeline_id,
1311 1290 : crate::metrics::EvictionsWithLowResidenceDurationBuilder::new(
1312 1290 : "mtime",
1313 1290 : evictions_low_residence_duration_metric_threshold,
1314 1290 : ),
1315 1290 : ),
1316 1290 :
1317 1290 : flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
1318 1290 :
1319 1290 : layer_flush_start_tx,
1320 1290 : layer_flush_done_tx,
1321 1290 :
1322 1290 : write_lock: tokio::sync::Mutex::new(()),
1323 1290 :
1324 1290 : gc_info: std::sync::RwLock::new(GcInfo {
1325 1290 : retain_lsns: Vec::new(),
1326 1290 : horizon_cutoff: Lsn(0),
1327 1290 : pitr_cutoff: Lsn(0),
1328 1290 : }),
1329 1290 :
1330 1290 : latest_gc_cutoff_lsn: Rcu::new(metadata.latest_gc_cutoff_lsn()),
1331 1290 : initdb_lsn: metadata.initdb_lsn(),
1332 1290 :
1333 1290 : current_logical_size: if disk_consistent_lsn.is_valid() {
1334 : // we're creating timeline data with some layer files existing locally,
1335 : // need to recalculate timeline's logical size based on data in the layers.
1336 718 : LogicalSize::deferred_initial(disk_consistent_lsn)
1337 : } else {
1338 : // we're creating timeline data without any layers existing locally,
1339 : // initial logical size is 0.
1340 572 : LogicalSize::empty_initial()
1341 : },
1342 1290 : partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
1343 1290 : repartition_threshold: 0,
1344 1290 :
1345 1290 : last_received_wal: Mutex::new(None),
1346 1290 : rel_size_cache: RwLock::new(HashMap::new()),
1347 1290 :
1348 1290 : download_all_remote_layers_task_info: RwLock::new(None),
1349 1290 :
1350 1290 : state,
1351 1290 :
1352 1290 : eviction_task_timeline_state: tokio::sync::Mutex::new(
1353 1290 : EvictionTaskTimelineState::default(),
1354 1290 : ),
1355 1290 : delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
1356 1290 :
1357 1290 : cancel,
1358 1290 : gate: Gate::new(format!("Timeline<{tenant_shard_id}/{timeline_id}>")),
1359 1290 :
1360 1290 : compaction_lock: tokio::sync::Mutex::default(),
1361 1290 : gc_lock: tokio::sync::Mutex::default(),
1362 1290 : };
1363 1290 : result.repartition_threshold =
1364 1290 : result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
1365 1290 : result
1366 1290 : .metrics
1367 1290 : .last_record_gauge
1368 1290 : .set(disk_consistent_lsn.0 as i64);
1369 1290 : result
1370 1290 : })
1371 1290 : }
1372 :
1373 1837 : pub(super) fn maybe_spawn_flush_loop(self: &Arc<Self>) {
1374 1837 : let Ok(guard) = self.gate.enter() else {
1375 UBC 0 : info!("cannot start flush loop when the timeline gate has already been closed");
1376 0 : return;
1377 : };
1378 CBC 1837 : let mut flush_loop_state = self.flush_loop_state.lock().unwrap();
1379 1837 : match *flush_loop_state {
1380 1274 : FlushLoopState::NotStarted => (),
1381 : FlushLoopState::Running { .. } => {
1382 563 : info!(
1383 563 : "skipping attempt to start flush_loop twice {}/{}",
1384 563 : self.tenant_shard_id, self.timeline_id
1385 563 : );
1386 563 : return;
1387 : }
1388 : FlushLoopState::Exited => {
1389 UBC 0 : warn!(
1390 0 : "ignoring attempt to restart exited flush_loop {}/{}",
1391 0 : self.tenant_shard_id, self.timeline_id
1392 0 : );
1393 0 : return;
1394 : }
1395 : }
1396 :
1397 CBC 1274 : let layer_flush_start_rx = self.layer_flush_start_tx.subscribe();
1398 1274 : let self_clone = Arc::clone(self);
1399 1274 :
1400 1274 : debug!("spawning flush loop");
1401 1274 : *flush_loop_state = FlushLoopState::Running {
1402 1274 : #[cfg(test)]
1403 1274 : expect_initdb_optimization: false,
1404 1274 : #[cfg(test)]
1405 1274 : initdb_optimization_count: 0,
1406 1274 : };
1407 1274 : task_mgr::spawn(
1408 1274 : task_mgr::BACKGROUND_RUNTIME.handle(),
1409 1274 : task_mgr::TaskKind::LayerFlushTask,
1410 1274 : Some(self.tenant_shard_id),
1411 1274 : Some(self.timeline_id),
1412 1274 : "layer flush task",
1413 : false,
1414 1274 : async move {
1415 1274 : let _guard = guard;
1416 1274 : let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error);
1417 22229 : self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await;
1418 497 : let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap();
1419 497 : assert!(matches!(*flush_loop_state, FlushLoopState::Running{ ..}));
1420 497 : *flush_loop_state = FlushLoopState::Exited;
1421 497 : Ok(())
1422 497 : }
1423 1274 : .instrument(info_span!(parent: None, "layer flush task", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
1424 : );
1425 1837 : }
1426 :
1427 : /// Creates and starts the wal receiver.
1428 : ///
1429 : /// This function is expected to be called at most once per Timeline's lifecycle
1430 : /// when the timeline is activated.
1431 1102 : fn launch_wal_receiver(
1432 1102 : self: &Arc<Self>,
1433 1102 : ctx: &RequestContext,
1434 1102 : broker_client: BrokerClientChannel,
1435 1102 : ) {
1436 1102 : info!(
1437 1102 : "launching WAL receiver for timeline {} of tenant {}",
1438 1102 : self.timeline_id, self.tenant_shard_id
1439 1102 : );
1440 :
1441 1102 : let tenant_conf_guard = self.tenant_conf.read().unwrap();
1442 1102 : let wal_connect_timeout = tenant_conf_guard
1443 1102 : .tenant_conf
1444 1102 : .walreceiver_connect_timeout
1445 1102 : .unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
1446 1102 : let lagging_wal_timeout = tenant_conf_guard
1447 1102 : .tenant_conf
1448 1102 : .lagging_wal_timeout
1449 1102 : .unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
1450 1102 : let max_lsn_wal_lag = tenant_conf_guard
1451 1102 : .tenant_conf
1452 1102 : .max_lsn_wal_lag
1453 1102 : .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
1454 1102 : drop(tenant_conf_guard);
1455 1102 :
1456 1102 : let mut guard = self.walreceiver.lock().unwrap();
1457 1102 : assert!(
1458 1102 : guard.is_none(),
1459 UBC 0 : "multiple launches / re-launches of WAL receiver are not supported"
1460 : );
1461 CBC 1102 : *guard = Some(WalReceiver::start(
1462 1102 : Arc::clone(self),
1463 1102 : WalReceiverConf {
1464 1102 : wal_connect_timeout,
1465 1102 : lagging_wal_timeout,
1466 1102 : max_lsn_wal_lag,
1467 1102 : auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
1468 1102 : availability_zone: self.conf.availability_zone.clone(),
1469 1102 : ingest_batch_size: self.conf.ingest_batch_size,
1470 1102 : },
1471 1102 : broker_client,
1472 1102 : ctx,
1473 1102 : ));
1474 1102 : }
1475 :
1476 : /// Initialize with an empty layer map. Used when creating a new timeline.
1477 921 : pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
1478 921 : let mut layers = self.layers.try_write().expect(
1479 921 : "in the context where we call this function, no other task has access to the object",
1480 921 : );
1481 921 : layers.initialize_empty(Lsn(start_lsn.0));
1482 921 : }
1483 :
1484 : /// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
1485 : /// files.
1486 357 : pub(super) async fn load_layer_map(
1487 357 : &self,
1488 357 : disk_consistent_lsn: Lsn,
1489 357 : index_part: Option<IndexPart>,
1490 357 : ) -> anyhow::Result<()> {
1491 : use init::{Decision::*, Discovered, DismissedLayer};
1492 : use LayerFileName::*;
1493 :
1494 357 : let mut guard = self.layers.write().await;
1495 :
1496 357 : let timer = self.metrics.load_layer_map_histo.start_timer();
1497 357 :
1498 357 : // Scan timeline directory and create ImageFileName and DeltaFilename
1499 357 : // structs representing all files on disk
1500 357 : let timeline_path = self
1501 357 : .conf
1502 357 : .timeline_path(&self.tenant_shard_id, &self.timeline_id);
1503 357 : let conf = self.conf;
1504 357 : let span = tracing::Span::current();
1505 357 :
1506 357 : // Copy to move into the task we're about to spawn
1507 357 : let generation = self.generation;
1508 357 : let shard = self.get_shard_index();
1509 357 : let this = self.myself.upgrade().expect("&self method holds the arc");
1510 :
1511 357 : let (loaded_layers, needs_cleanup, total_physical_size) = tokio::task::spawn_blocking({
1512 357 : move || {
1513 357 : let _g = span.entered();
1514 357 : let discovered = init::scan_timeline_dir(&timeline_path)?;
1515 357 : let mut discovered_layers = Vec::with_capacity(discovered.len());
1516 357 : let mut unrecognized_files = Vec::new();
1517 357 :
1518 357 : let mut path = timeline_path;
1519 :
1520 15490 : for discovered in discovered {
1521 15133 : let (name, kind) = match discovered {
1522 14717 : Discovered::Layer(file_name, file_size) => {
1523 14717 : discovered_layers.push((file_name, file_size));
1524 14717 : continue;
1525 : }
1526 : Discovered::Metadata | Discovered::IgnoredBackup => {
1527 357 : continue;
1528 : }
1529 UBC 0 : Discovered::Unknown(file_name) => {
1530 0 : // we will later error if there are any
1531 0 : unrecognized_files.push(file_name);
1532 0 : continue;
1533 : }
1534 CBC 51 : Discovered::Ephemeral(name) => (name, "old ephemeral file"),
1535 6 : Discovered::Temporary(name) => (name, "temporary timeline file"),
1536 2 : Discovered::TemporaryDownload(name) => (name, "temporary download"),
1537 : };
1538 59 : path.push(Utf8Path::new(&name));
1539 59 : init::cleanup(&path, kind)?;
1540 59 : path.pop();
1541 : }
1542 :
1543 357 : if !unrecognized_files.is_empty() {
1544 : // assume that if there are any there are many many.
1545 UBC 0 : let n = unrecognized_files.len();
1546 0 : let first = &unrecognized_files[..n.min(10)];
1547 0 : anyhow::bail!(
1548 0 : "unrecognized files in timeline dir (total {n}), first 10: {first:?}"
1549 0 : );
1550 CBC 357 : }
1551 357 :
1552 357 : let decided = init::reconcile(
1553 357 : discovered_layers,
1554 357 : index_part.as_ref(),
1555 357 : disk_consistent_lsn,
1556 357 : generation,
1557 357 : shard,
1558 357 : );
1559 357 :
1560 357 : let mut loaded_layers = Vec::new();
1561 357 : let mut needs_cleanup = Vec::new();
1562 357 : let mut total_physical_size = 0;
1563 :
1564 58794 : for (name, decision) in decided {
1565 58437 : let decision = match decision {
1566 13559 : Ok(UseRemote { local, remote }) => {
1567 13559 : // Remote is authoritative, but we may still choose to retain
1568 13559 : // the local file if the contents appear to match
1569 13559 : if local.file_size() == remote.file_size() {
1570 : // Use the local file, but take the remote metadata so that we pick up
1571 : // the correct generation.
1572 13558 : UseLocal(remote)
1573 : } else {
1574 1 : path.push(name.file_name());
1575 1 : init::cleanup_local_file_for_remote(&path, &local, &remote)?;
1576 1 : path.pop();
1577 1 : UseRemote { local, remote }
1578 : }
1579 : }
1580 44242 : Ok(decision) => decision,
1581 3 : Err(DismissedLayer::Future { local }) => {
1582 3 : if local.is_some() {
1583 2 : path.push(name.file_name());
1584 2 : init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
1585 2 : path.pop();
1586 1 : }
1587 3 : needs_cleanup.push(name);
1588 3 : continue;
1589 : }
1590 633 : Err(DismissedLayer::LocalOnly(local)) => {
1591 633 : path.push(name.file_name());
1592 633 : init::cleanup_local_only_file(&path, &name, &local)?;
1593 633 : path.pop();
1594 633 : // this file never existed remotely, we will have to do rework
1595 633 : continue;
1596 : }
1597 : };
1598 :
1599 57801 : match &name {
1600 24440 : Delta(d) => assert!(d.lsn_range.end <= disk_consistent_lsn + 1),
1601 33361 : Image(i) => assert!(i.lsn <= disk_consistent_lsn),
1602 : }
1603 :
1604 57801 : tracing::debug!(layer=%name, ?decision, "applied");
1605 :
1606 57801 : let layer = match decision {
1607 14081 : UseLocal(m) => {
1608 14081 : total_physical_size += m.file_size();
1609 14081 : Layer::for_resident(conf, &this, name, m).drop_eviction_guard()
1610 : }
1611 43719 : Evicted(remote) | UseRemote { remote, .. } => {
1612 43720 : Layer::for_evicted(conf, &this, name, remote)
1613 : }
1614 : };
1615 :
1616 57801 : loaded_layers.push(layer);
1617 : }
1618 357 : Ok((loaded_layers, needs_cleanup, total_physical_size))
1619 357 : }
1620 357 : })
1621 351 : .await
1622 357 : .map_err(anyhow::Error::new)
1623 357 : .and_then(|x| x)?;
1624 :
1625 357 : let num_layers = loaded_layers.len();
1626 357 :
1627 357 : guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
1628 :
1629 357 : if let Some(rtc) = self.remote_client.as_ref() {
1630 357 : rtc.schedule_layer_file_deletion(&needs_cleanup)?;
1631 357 : rtc.schedule_index_upload_for_file_changes()?;
1632 : // This barrier orders above DELETEs before any later operations.
1633 : // This is critical because code executing after the barrier might
1634 : // create again objects with the same key that we just scheduled for deletion.
1635 : // For example, if we just scheduled deletion of an image layer "from the future",
1636 : // later compaction might run again and re-create the same image layer.
1637 : // "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn.
1638 : // "same" here means same key range and LSN.
1639 : //
1640 : // Without a barrier between above DELETEs and the re-creation's PUTs,
1641 : // the upload queue may execute the PUT first, then the DELETE.
1642 : // In our example, we will end up with an IndexPart referencing a non-existent object.
1643 : //
1644 : // 1. a future image layer is created and uploaded
1645 : // 2. ps restart
1646 : // 3. the future layer from (1) is deleted during load layer map
1647 : // 4. image layer is re-created and uploaded
1648 : // 5. deletion queue would like to delete (1) but actually deletes (4)
1649 : // 6. delete by name works as expected, but it now deletes the wrong (later) version
1650 : //
1651 : // See https://github.com/neondatabase/neon/issues/5878
1652 : //
1653 : // NB: generation numbers naturally protect against this because they disambiguate
1654 : // (1) and (4)
1655 357 : rtc.schedule_barrier()?;
1656 : // Tenant::create_timeline will wait for these uploads to happen before returning, or
1657 : // on retry.
1658 UBC 0 : }
1659 :
1660 CBC 357 : info!(
1661 357 : "loaded layer map with {} layers at {}, total physical size: {}",
1662 357 : num_layers, disk_consistent_lsn, total_physical_size
1663 357 : );
1664 :
1665 357 : timer.stop_and_record();
1666 357 : Ok(())
1667 357 : }
1668 :
1669 : /// Retrieve current logical size of the timeline.
1670 : ///
1671 : /// The size could be lagging behind the actual number, in case
1672 : /// the initial size calculation has not been run (gets triggered on the first size access).
1673 : ///
1674 : /// return size and boolean flag that shows if the size is exact
1675 568881 : pub(crate) fn get_current_logical_size(
1676 568881 : self: &Arc<Self>,
1677 568881 : priority: GetLogicalSizePriority,
1678 568881 : ctx: &RequestContext,
1679 568881 : ) -> logical_size::CurrentLogicalSize {
1680 568881 : let current_size = self.current_logical_size.current_size();
1681 568881 : debug!("Current size: {current_size:?}");
1682 :
1683 568881 : match (current_size.accuracy(), priority) {
1684 568015 : (logical_size::Accuracy::Exact, _) => (), // nothing to do
1685 UBC 0 : (logical_size::Accuracy::Approximate, GetLogicalSizePriority::Background) => {
1686 0 : // background task will eventually deliver an exact value, we're in no rush
1687 0 : }
1688 : (logical_size::Accuracy::Approximate, GetLogicalSizePriority::User) => {
1689 : // background task is not ready, but user is asking for it now;
1690 : // => make the background task skip the line
1691 : // (The alternative would be to calculate the size here, but,
1692 : // it can actually take a long time if the user has a lot of rels.
1693 : // And we'll inevitable need it again; So, let the background task do the work.)
1694 CBC 866 : match self
1695 866 : .current_logical_size
1696 866 : .cancel_wait_for_background_loop_concurrency_limit_semaphore
1697 866 : .get()
1698 : {
1699 853 : Some(cancel) => cancel.cancel(),
1700 : None => {
1701 13 : let state = self.current_state();
1702 UBC 0 : if matches!(
1703 CBC 13 : state,
1704 : TimelineState::Broken { .. } | TimelineState::Stopping
1705 13 : ) {
1706 13 :
1707 13 : // Can happen when timeline detail endpoint is used when deletion is ongoing (or its broken).
1708 13 : // Don't make noise.
1709 13 : } else {
1710 UBC 0 : warn!("unexpected: cancel_wait_for_background_loop_concurrency_limit_semaphore not set, priority-boosting of logical size calculation will not work");
1711 : }
1712 : }
1713 : };
1714 : }
1715 : }
1716 :
1717 CBC 568881 : if let CurrentLogicalSize::Approximate(_) = ¤t_size {
1718 866 : if ctx.task_kind() == TaskKind::WalReceiverConnectionHandler {
1719 439 : let first = self
1720 439 : .current_logical_size
1721 439 : .did_return_approximate_to_walreceiver
1722 439 : .compare_exchange(
1723 439 : false,
1724 439 : true,
1725 439 : AtomicOrdering::Relaxed,
1726 439 : AtomicOrdering::Relaxed,
1727 439 : )
1728 439 : .is_ok();
1729 439 : if first {
1730 82 : crate::metrics::initial_logical_size::TIMELINES_WHERE_WALRECEIVER_GOT_APPROXIMATE_SIZE.inc();
1731 357 : }
1732 427 : }
1733 568015 : }
1734 :
1735 568881 : current_size
1736 568881 : }
1737 :
1738 1102 : fn spawn_initial_logical_size_computation_task(self: &Arc<Self>, ctx: &RequestContext) {
1739 1102 : let Some(initial_part_end) = self.current_logical_size.initial_part_end else {
1740 : // nothing to do for freshly created timelines;
1741 526 : assert_eq!(
1742 526 : self.current_logical_size.current_size().accuracy(),
1743 526 : logical_size::Accuracy::Exact,
1744 526 : );
1745 526 : self.current_logical_size.initialized.add_permits(1);
1746 526 : return;
1747 : };
1748 :
1749 576 : let cancel_wait_for_background_loop_concurrency_limit_semaphore = CancellationToken::new();
1750 576 : let token = cancel_wait_for_background_loop_concurrency_limit_semaphore.clone();
1751 576 : self.current_logical_size
1752 576 : .cancel_wait_for_background_loop_concurrency_limit_semaphore.set(token)
1753 576 : .expect("initial logical size calculation task must be spawned exactly once per Timeline object");
1754 576 :
1755 576 : let self_clone = Arc::clone(self);
1756 576 : let background_ctx = ctx.detached_child(
1757 576 : TaskKind::InitialLogicalSizeCalculation,
1758 576 : DownloadBehavior::Download,
1759 576 : );
1760 576 : task_mgr::spawn(
1761 576 : task_mgr::BACKGROUND_RUNTIME.handle(),
1762 576 : task_mgr::TaskKind::InitialLogicalSizeCalculation,
1763 576 : Some(self.tenant_shard_id),
1764 576 : Some(self.timeline_id),
1765 576 : "initial size calculation",
1766 : false,
1767 : // NB: don't log errors here, task_mgr will do that.
1768 576 : async move {
1769 576 : let cancel = task_mgr::shutdown_token();
1770 576 : self_clone
1771 576 : .initial_logical_size_calculation_task(
1772 576 : initial_part_end,
1773 576 : cancel_wait_for_background_loop_concurrency_limit_semaphore,
1774 576 : cancel,
1775 576 : background_ctx,
1776 576 : )
1777 66696 : .await;
1778 570 : Ok(())
1779 570 : }
1780 576 : .instrument(info_span!(parent: None, "initial_size_calculation", tenant_id=%self.tenant_shard_id.tenant_id, timeline_id=%self.timeline_id)),
1781 : );
1782 1102 : }
1783 :
1784 576 : async fn initial_logical_size_calculation_task(
1785 576 : self: Arc<Self>,
1786 576 : initial_part_end: Lsn,
1787 576 : skip_concurrency_limiter: CancellationToken,
1788 576 : cancel: CancellationToken,
1789 576 : background_ctx: RequestContext,
1790 576 : ) {
1791 570 : scopeguard::defer! {
1792 570 : // Irrespective of the outcome of this operation, we should unblock anyone waiting for it.
1793 570 : self.current_logical_size.initialized.add_permits(1);
1794 570 : }
1795 :
1796 : enum BackgroundCalculationError {
1797 : Cancelled,
1798 : Other(anyhow::Error),
1799 : }
1800 :
1801 576 : let try_once = |attempt: usize| {
1802 576 : let background_ctx = &background_ctx;
1803 576 : let self_ref = &self;
1804 576 : let skip_concurrency_limiter = &skip_concurrency_limiter;
1805 576 : async move {
1806 576 : let cancel = task_mgr::shutdown_token();
1807 576 : let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
1808 576 : BackgroundLoopKind::InitialLogicalSizeCalculation,
1809 576 : background_ctx,
1810 576 : );
1811 :
1812 : use crate::metrics::initial_logical_size::StartCircumstances;
1813 576 : let (_maybe_permit, circumstances) = tokio::select! {
1814 516 : permit = wait_for_permit => {
1815 : (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit)
1816 : }
1817 : _ = self_ref.cancel.cancelled() => {
1818 : return Err(BackgroundCalculationError::Cancelled);
1819 : }
1820 : _ = cancel.cancelled() => {
1821 : return Err(BackgroundCalculationError::Cancelled);
1822 : },
1823 : () = skip_concurrency_limiter.cancelled() => {
1824 : // Some action that is part of a end user interaction requested logical size
1825 : // => break out of the rate limit
1826 : // TODO: ideally we'd not run on BackgroundRuntime but the requester's runtime;
1827 : // but then again what happens if they cancel; also, we should just be using
1828 : // one runtime across the entire process, so, let's leave this for now.
1829 : (None, StartCircumstances::SkippedConcurrencyLimiter)
1830 : }
1831 : };
1832 :
1833 576 : let metrics_guard = if attempt == 1 {
1834 576 : crate::metrics::initial_logical_size::START_CALCULATION.first(circumstances)
1835 : } else {
1836 UBC 0 : crate::metrics::initial_logical_size::START_CALCULATION.retry(circumstances)
1837 : };
1838 :
1839 CBC 576 : match self_ref
1840 576 : .logical_size_calculation_task(
1841 576 : initial_part_end,
1842 576 : LogicalSizeCalculationCause::Initial,
1843 576 : background_ctx,
1844 576 : )
1845 66693 : .await
1846 : {
1847 549 : Ok(calculated_size) => Ok((calculated_size, metrics_guard)),
1848 : Err(CalculateLogicalSizeError::Cancelled) => {
1849 15 : Err(BackgroundCalculationError::Cancelled)
1850 : }
1851 2 : Err(CalculateLogicalSizeError::Other(err)) => {
1852 1 : if let Some(PageReconstructError::AncestorStopping(_)) =
1853 2 : err.root_cause().downcast_ref()
1854 : {
1855 UBC 0 : Err(BackgroundCalculationError::Cancelled)
1856 : } else {
1857 CBC 2 : Err(BackgroundCalculationError::Other(err))
1858 : }
1859 : }
1860 : }
1861 566 : }
1862 576 : };
1863 :
1864 576 : let retrying = async {
1865 576 : let mut attempt = 0;
1866 576 : loop {
1867 576 : attempt += 1;
1868 576 :
1869 66693 : match try_once(attempt).await {
1870 549 : Ok(res) => return ControlFlow::Continue(res),
1871 15 : Err(BackgroundCalculationError::Cancelled) => return ControlFlow::Break(()),
1872 2 : Err(BackgroundCalculationError::Other(e)) => {
1873 2 : warn!(attempt, "initial size calculation failed: {e:?}");
1874 : // exponential back-off doesn't make sense at these long intervals;
1875 : // use fixed retry interval with generous jitter instead
1876 2 : let sleep_duration = Duration::from_secs(
1877 2 : u64::try_from(
1878 2 : // 1hour base
1879 2 : (60_i64 * 60_i64)
1880 2 : // 10min jitter
1881 2 : + rand::thread_rng().gen_range(-10 * 60..10 * 60),
1882 2 : )
1883 2 : .expect("10min < 1hour"),
1884 2 : );
1885 2 : tokio::time::sleep(sleep_duration).await;
1886 : }
1887 : }
1888 : }
1889 564 : };
1890 :
1891 576 : let (calculated_size, metrics_guard) = tokio::select! {
1892 564 : res = retrying => {
1893 : match res {
1894 : ControlFlow::Continue(calculated_size) => calculated_size,
1895 : ControlFlow::Break(()) => return,
1896 : }
1897 : }
1898 : _ = cancel.cancelled() => {
1899 : return;
1900 : }
1901 : };
1902 :
1903 : // we cannot query current_logical_size.current_size() to know the current
1904 : // *negative* value, only truncated to u64.
1905 549 : let added = self
1906 549 : .current_logical_size
1907 549 : .size_added_after_initial
1908 549 : .load(AtomicOrdering::Relaxed);
1909 549 :
1910 549 : let sum = calculated_size.saturating_add_signed(added);
1911 549 :
1912 549 : // set the gauge value before it can be set in `update_current_logical_size`.
1913 549 : self.metrics.current_logical_size_gauge.set(sum);
1914 549 :
1915 549 : self.current_logical_size
1916 549 : .initial_logical_size
1917 549 : .set((calculated_size, metrics_guard.calculation_result_saved()))
1918 549 : .ok()
1919 549 : .expect("only this task sets it");
1920 570 : }
1921 :
1922 39 : pub fn spawn_ondemand_logical_size_calculation(
1923 39 : self: &Arc<Self>,
1924 39 : lsn: Lsn,
1925 39 : cause: LogicalSizeCalculationCause,
1926 39 : ctx: RequestContext,
1927 39 : ) -> oneshot::Receiver<Result<u64, CalculateLogicalSizeError>> {
1928 39 : let (sender, receiver) = oneshot::channel();
1929 39 : let self_clone = Arc::clone(self);
1930 39 : // XXX if our caller loses interest, i.e., ctx is cancelled,
1931 39 : // we should stop the size calculation work and return an error.
1932 39 : // That would require restructuring this function's API to
1933 39 : // return the result directly, instead of a Receiver for the result.
1934 39 : let ctx = ctx.detached_child(
1935 39 : TaskKind::OndemandLogicalSizeCalculation,
1936 39 : DownloadBehavior::Download,
1937 39 : );
1938 39 : task_mgr::spawn(
1939 39 : task_mgr::BACKGROUND_RUNTIME.handle(),
1940 39 : task_mgr::TaskKind::OndemandLogicalSizeCalculation,
1941 39 : Some(self.tenant_shard_id),
1942 39 : Some(self.timeline_id),
1943 39 : "ondemand logical size calculation",
1944 39 : false,
1945 39 : async move {
1946 39 : let res = self_clone
1947 39 : .logical_size_calculation_task(lsn, cause, &ctx)
1948 2243 : .await;
1949 39 : let _ = sender.send(res).ok();
1950 39 : Ok(()) // Receiver is responsible for handling errors
1951 39 : }
1952 39 : .in_current_span(),
1953 39 : );
1954 39 : receiver
1955 39 : }
1956 :
1957 : /// # Cancel-Safety
1958 : ///
1959 : /// This method is cancellation-safe.
1960 UBC 0 : #[instrument(skip_all)]
1961 : async fn logical_size_calculation_task(
1962 : self: &Arc<Self>,
1963 : lsn: Lsn,
1964 : cause: LogicalSizeCalculationCause,
1965 : ctx: &RequestContext,
1966 : ) -> Result<u64, CalculateLogicalSizeError> {
1967 : span::debug_assert_current_span_has_tenant_and_timeline_id();
1968 :
1969 : let _guard = self.gate.enter();
1970 :
1971 : let self_calculation = Arc::clone(self);
1972 :
1973 CBC 615 : let mut calculation = pin!(async {
1974 615 : let ctx = ctx.attached_child();
1975 615 : self_calculation
1976 615 : .calculate_logical_size(lsn, cause, &ctx)
1977 68939 : .await
1978 605 : });
1979 :
1980 69544 : tokio::select! {
1981 603 : res = &mut calculation => { res }
1982 : _ = self.cancel.cancelled() => {
1983 UBC 0 : debug!("cancelling logical size calculation for timeline shutdown");
1984 : calculation.await
1985 : }
1986 : _ = task_mgr::shutdown_watcher() => {
1987 0 : debug!("cancelling logical size calculation for task shutdown");
1988 : calculation.await
1989 : }
1990 : }
1991 : }
1992 :
1993 : /// Calculate the logical size of the database at the latest LSN.
1994 : ///
1995 : /// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
1996 : /// especially if we need to download remote layers.
1997 : ///
1998 : /// # Cancel-Safety
1999 : ///
2000 : /// This method is cancellation-safe.
2001 CBC 623 : pub async fn calculate_logical_size(
2002 623 : &self,
2003 623 : up_to_lsn: Lsn,
2004 623 : cause: LogicalSizeCalculationCause,
2005 623 : ctx: &RequestContext,
2006 623 : ) -> Result<u64, CalculateLogicalSizeError> {
2007 623 : info!(
2008 623 : "Calculating logical size for timeline {} at {}",
2009 623 : self.timeline_id, up_to_lsn
2010 623 : );
2011 : // These failpoints are used by python tests to ensure that we don't delete
2012 : // the timeline while the logical size computation is ongoing.
2013 : // The first failpoint is used to make this function pause.
2014 : // Then the python test initiates timeline delete operation in a thread.
2015 : // It waits for a few seconds, then arms the second failpoint and disables
2016 : // the first failpoint. The second failpoint prints an error if the timeline
2017 : // delete code has deleted the on-disk state while we're still running here.
2018 : // It shouldn't do that. If it does it anyway, the error will be caught
2019 : // by the test suite, highlighting the problem.
2020 UBC 0 : fail::fail_point!("timeline-calculate-logical-size-pause");
2021 CBC 623 : fail::fail_point!("timeline-calculate-logical-size-check-dir-exists", |_| {
2022 2 : if !self
2023 2 : .conf
2024 2 : .metadata_path(&self.tenant_shard_id, &self.timeline_id)
2025 2 : .exists()
2026 : {
2027 UBC 0 : error!("timeline-calculate-logical-size-pre metadata file does not exist")
2028 CBC 2 : }
2029 : // need to return something
2030 2 : Ok(0)
2031 623 : });
2032 : // See if we've already done the work for initial size calculation.
2033 : // This is a short-cut for timelines that are mostly unused.
2034 621 : if let Some(size) = self.current_logical_size.initialized_size(up_to_lsn) {
2035 5 : return Ok(size);
2036 616 : }
2037 616 : let storage_time_metrics = match cause {
2038 : LogicalSizeCalculationCause::Initial
2039 : | LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
2040 601 : | LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
2041 : LogicalSizeCalculationCause::EvictionTaskImitation => {
2042 15 : &self.metrics.imitate_logical_size_histo
2043 : }
2044 : };
2045 616 : let timer = storage_time_metrics.start_timer();
2046 616 : let logical_size = self
2047 616 : .get_current_logical_size_non_incremental(up_to_lsn, ctx)
2048 70856 : .await?;
2049 UBC 0 : debug!("calculated logical size: {logical_size}");
2050 CBC 587 : timer.stop_and_record();
2051 587 : Ok(logical_size)
2052 613 : }
2053 :
2054 : /// Update current logical size, adding `delta' to the old value.
2055 427894 : fn update_current_logical_size(&self, delta: i64) {
2056 427894 : let logical_size = &self.current_logical_size;
2057 427894 : logical_size.increment_size(delta);
2058 427894 :
2059 427894 : // Also set the value in the prometheus gauge. Note that
2060 427894 : // there is a race condition here: if this is is called by two
2061 427894 : // threads concurrently, the prometheus gauge might be set to
2062 427894 : // one value while current_logical_size is set to the
2063 427894 : // other.
2064 427894 : match logical_size.current_size() {
2065 427745 : CurrentLogicalSize::Exact(ref new_current_size) => self
2066 427745 : .metrics
2067 427745 : .current_logical_size_gauge
2068 427745 : .set(new_current_size.into()),
2069 149 : CurrentLogicalSize::Approximate(_) => {
2070 149 : // don't update the gauge yet, this allows us not to update the gauge back and
2071 149 : // forth between the initial size calculation task.
2072 149 : }
2073 : }
2074 427894 : }
2075 :
2076 2247 : async fn find_layer(&self, layer_file_name: &str) -> Option<Layer> {
2077 2247 : let guard = self.layers.read().await;
2078 799949 : for historic_layer in guard.layer_map().iter_historic_layers() {
2079 799949 : let historic_layer_name = historic_layer.filename().file_name();
2080 799949 : if layer_file_name == historic_layer_name {
2081 2247 : return Some(guard.get_from_desc(&historic_layer));
2082 797702 : }
2083 : }
2084 :
2085 UBC 0 : None
2086 CBC 2247 : }
2087 :
2088 : /// The timeline heatmap is a hint to secondary locations from the primary location,
2089 : /// indicating which layers are currently on-disk on the primary.
2090 : ///
2091 : /// None is returned if the Timeline is in a state where uploading a heatmap
2092 : /// doesn't make sense, such as shutting down or initializing. The caller
2093 : /// should treat this as a cue to simply skip doing any heatmap uploading
2094 : /// for this timeline.
2095 6 : pub(crate) async fn generate_heatmap(&self) -> Option<HeatMapTimeline> {
2096 6 : let eviction_info = self.get_local_layers_for_disk_usage_eviction().await;
2097 :
2098 6 : let remote_client = match &self.remote_client {
2099 6 : Some(c) => c,
2100 UBC 0 : None => return None,
2101 : };
2102 :
2103 CBC 6 : let layer_file_names = eviction_info
2104 6 : .resident_layers
2105 6 : .iter()
2106 2453 : .map(|l| l.layer.layer_desc().filename())
2107 6 : .collect::<Vec<_>>();
2108 :
2109 6 : let decorated = match remote_client.get_layers_metadata(layer_file_names) {
2110 6 : Ok(d) => d,
2111 : Err(_) => {
2112 : // Getting metadata only fails on Timeline in bad state.
2113 UBC 0 : return None;
2114 : }
2115 : };
2116 :
2117 CBC 6 : let heatmap_layers = std::iter::zip(
2118 6 : eviction_info.resident_layers.into_iter(),
2119 6 : decorated.into_iter(),
2120 6 : )
2121 2453 : .filter_map(|(layer, remote_info)| {
2122 2453 : remote_info.map(|remote_info| {
2123 2453 : HeatMapLayer::new(
2124 2453 : layer.layer.layer_desc().filename(),
2125 2453 : IndexLayerMetadata::from(remote_info),
2126 2453 : layer.last_activity_ts,
2127 2453 : )
2128 2453 : })
2129 2453 : });
2130 6 :
2131 6 : Some(HeatMapTimeline::new(
2132 6 : self.timeline_id,
2133 6 : heatmap_layers.collect(),
2134 6 : ))
2135 6 : }
2136 : }
2137 :
2138 : type TraversalId = String;
2139 :
2140 : trait TraversalLayerExt {
2141 : fn traversal_id(&self) -> TraversalId;
2142 : }
2143 :
2144 : impl TraversalLayerExt for Layer {
2145 895 : fn traversal_id(&self) -> TraversalId {
2146 895 : self.local_path().to_string()
2147 895 : }
2148 : }
2149 :
2150 : impl TraversalLayerExt for Arc<InMemoryLayer> {
2151 322 : fn traversal_id(&self) -> TraversalId {
2152 322 : format!("timeline {} in-memory {self}", self.get_timeline_id())
2153 322 : }
2154 : }
2155 :
2156 : impl Timeline {
2157 : ///
2158 : /// Get a handle to a Layer for reading.
2159 : ///
2160 : /// The returned Layer might be from an ancestor timeline, if the
2161 : /// segment hasn't been updated on this timeline yet.
2162 : ///
2163 : /// This function takes the current timeline's locked LayerMap as an argument,
2164 : /// so callers can avoid potential race conditions.
2165 : ///
2166 : /// # Cancel-Safety
2167 : ///
2168 : /// This method is cancellation-safe.
2169 5862688 : async fn get_reconstruct_data(
2170 5862688 : &self,
2171 5862688 : key: Key,
2172 5862688 : request_lsn: Lsn,
2173 5862688 : reconstruct_state: &mut ValueReconstructState,
2174 5862688 : ctx: &RequestContext,
2175 5862688 : ) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
2176 5862674 : // Start from the current timeline.
2177 5862674 : let mut timeline_owned;
2178 5862674 : let mut timeline = self;
2179 5862674 :
2180 5862674 : let mut read_count = scopeguard::guard(0, |cnt| {
2181 5862637 : crate::metrics::READ_NUM_FS_LAYERS.observe(cnt as f64)
2182 5862674 : });
2183 5862674 :
2184 5862674 : // For debugging purposes, collect the path of layers that we traversed
2185 5862674 : // through. It's included in the error message if we fail to find the key.
2186 5862674 : let mut traversal_path = Vec::<TraversalPathItem>::new();
2187 :
2188 5862674 : let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
2189 1265902 : *cached_lsn
2190 : } else {
2191 4596772 : Lsn(0)
2192 : };
2193 :
2194 : // 'prev_lsn' tracks the last LSN that we were at in our search. It's used
2195 : // to check that each iteration make some progress, to break infinite
2196 : // looping if something goes wrong.
2197 5862674 : let mut prev_lsn = Lsn(u64::MAX);
2198 5862674 :
2199 5862674 : let mut result = ValueReconstructResult::Continue;
2200 5862674 : let mut cont_lsn = Lsn(request_lsn.0 + 1);
2201 :
2202 27327554 : 'outer: loop {
2203 27327554 : if self.cancel.is_cancelled() {
2204 24 : return Err(PageReconstructError::Cancelled);
2205 27327530 : }
2206 27327530 :
2207 27327530 : // The function should have updated 'state'
2208 27327530 : //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
2209 27327530 : match result {
2210 4626402 : ValueReconstructResult::Complete => return Ok(traversal_path),
2211 : ValueReconstructResult::Continue => {
2212 : // If we reached an earlier cached page image, we're done.
2213 22701125 : if cont_lsn == cached_lsn + 1 {
2214 1235042 : MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
2215 1235042 : return Ok(traversal_path);
2216 21466083 : }
2217 21466083 : if prev_lsn <= cont_lsn {
2218 : // Didn't make any progress in last iteration. Error out to avoid
2219 : // getting stuck in the loop.
2220 1146 : return Err(layer_traversal_error(format!(
2221 1146 : "could not find layer with more data for key {} at LSN {}, request LSN {}, ancestor {}",
2222 1146 : key,
2223 1146 : Lsn(cont_lsn.0 - 1),
2224 1146 : request_lsn,
2225 1146 : timeline.ancestor_lsn
2226 1146 : ), traversal_path));
2227 21464937 : }
2228 21464937 : prev_lsn = cont_lsn;
2229 : }
2230 : ValueReconstructResult::Missing => {
2231 : return Err(layer_traversal_error(
2232 3 : if cfg!(test) {
2233 2 : format!(
2234 2 : "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}\n{}",
2235 2 : key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn, std::backtrace::Backtrace::force_capture(),
2236 2 : )
2237 : } else {
2238 1 : format!(
2239 1 : "could not find data for key {} (shard {:?}) at LSN {}, for request at LSN {}",
2240 1 : key, self.shard_identity.get_shard_number(&key), cont_lsn, request_lsn
2241 1 : )
2242 : },
2243 3 : traversal_path,
2244 : ));
2245 : }
2246 : }
2247 :
2248 : // Recurse into ancestor if needed
2249 21464937 : if is_inherited_key(key) && Lsn(cont_lsn.0 - 1) <= timeline.ancestor_lsn {
2250 UBC 0 : trace!(
2251 0 : "going into ancestor {}, cont_lsn is {}",
2252 0 : timeline.ancestor_lsn,
2253 0 : cont_lsn
2254 0 : );
2255 CBC 1197870 : let ancestor = match timeline.get_ancestor_timeline() {
2256 1197870 : Ok(timeline) => timeline,
2257 UBC 0 : Err(e) => return Err(PageReconstructError::from(e)),
2258 : };
2259 :
2260 : // It's possible that the ancestor timeline isn't active yet, or
2261 : // is active but hasn't yet caught up to the branch point. Wait
2262 : // for it.
2263 : //
2264 : // This cannot happen while the pageserver is running normally,
2265 : // because you cannot create a branch from a point that isn't
2266 : // present in the pageserver yet. However, we don't wait for the
2267 : // branch point to be uploaded to cloud storage before creating
2268 : // a branch. I.e., the branch LSN need not be remote consistent
2269 : // for the branching operation to succeed.
2270 : //
2271 : // Hence, if we try to load a tenant in such a state where
2272 : // 1. the existence of the branch was persisted (in IndexPart and/or locally)
2273 : // 2. but the ancestor state is behind branch_lsn because it was not yet persisted
2274 : // then we will need to wait for the ancestor timeline to
2275 : // re-stream WAL up to branch_lsn before we access it.
2276 : //
2277 : // How can a tenant get in such a state?
2278 : // - ungraceful pageserver process exit
2279 : // - detach+attach => this is a bug, https://github.com/neondatabase/neon/issues/4219
2280 : //
2281 : // NB: this could be avoided by requiring
2282 : // branch_lsn >= remote_consistent_lsn
2283 : // during branch creation.
2284 CBC 1197870 : match ancestor.wait_to_become_active(ctx).await {
2285 1197866 : Ok(()) => {}
2286 : Err(TimelineState::Stopping) => {
2287 3 : return Err(PageReconstructError::AncestorStopping(ancestor.timeline_id));
2288 : }
2289 1 : Err(state) => {
2290 1 : return Err(PageReconstructError::Other(anyhow::anyhow!(
2291 1 : "Timeline {} will not become active. Current state: {:?}",
2292 1 : ancestor.timeline_id,
2293 1 : &state,
2294 1 : )));
2295 : }
2296 : }
2297 1197866 : ancestor
2298 1197866 : .wait_lsn(timeline.ancestor_lsn, ctx)
2299 5 : .await
2300 1197866 : .map_err(|e| match e {
2301 UBC 0 : e @ WaitLsnError::Timeout(_) => PageReconstructError::AncestorLsnTimeout(e),
2302 0 : WaitLsnError::Shutdown => PageReconstructError::Cancelled,
2303 0 : e @ WaitLsnError::BadState => {
2304 0 : PageReconstructError::Other(anyhow::anyhow!(e))
2305 : }
2306 CBC 1197866 : })?;
2307 :
2308 1197866 : timeline_owned = ancestor;
2309 1197866 : timeline = &*timeline_owned;
2310 1197866 : prev_lsn = Lsn(u64::MAX);
2311 1197866 : continue 'outer;
2312 20267066 : }
2313 :
2314 20267066 : let guard = timeline.layers.read().await;
2315 20267036 : let layers = guard.layer_map();
2316 :
2317 : // Check the open and frozen in-memory layers first, in order from newest
2318 : // to oldest.
2319 20267036 : if let Some(open_layer) = &layers.open_layer {
2320 16065430 : let start_lsn = open_layer.get_lsn_range().start;
2321 16065430 : if cont_lsn > start_lsn {
2322 : //info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
2323 : // Get all the data needed to reconstruct the page version from this layer.
2324 : // But if we have an older cached page image, no need to go past that.
2325 4411185 : let lsn_floor = max(cached_lsn + 1, start_lsn);
2326 4411185 : result = match open_layer
2327 4411185 : .get_value_reconstruct_data(
2328 4411185 : key,
2329 4411185 : lsn_floor..cont_lsn,
2330 4411185 : reconstruct_state,
2331 4411185 : ctx,
2332 4411185 : )
2333 577321 : .await
2334 : {
2335 4411185 : Ok(result) => result,
2336 UBC 0 : Err(e) => return Err(PageReconstructError::from(e)),
2337 : };
2338 CBC 4411185 : cont_lsn = lsn_floor;
2339 4411185 : // metrics: open_layer does not count as fs access, so we are not updating `read_count`
2340 4411185 : traversal_path.push((
2341 4411185 : result,
2342 4411185 : cont_lsn,
2343 4411185 : Box::new({
2344 4411185 : let open_layer = Arc::clone(open_layer);
2345 4411185 : move || open_layer.traversal_id()
2346 4411185 : }),
2347 4411185 : ));
2348 4411185 : continue 'outer;
2349 11654245 : }
2350 4201606 : }
2351 15855851 : for frozen_layer in layers.frozen_layers.iter().rev() {
2352 823242 : let start_lsn = frozen_layer.get_lsn_range().start;
2353 823242 : if cont_lsn > start_lsn {
2354 : //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
2355 257271 : let lsn_floor = max(cached_lsn + 1, start_lsn);
2356 257271 : result = match frozen_layer
2357 257271 : .get_value_reconstruct_data(
2358 257271 : key,
2359 257271 : lsn_floor..cont_lsn,
2360 257271 : reconstruct_state,
2361 257271 : ctx,
2362 257271 : )
2363 17207 : .await
2364 : {
2365 257271 : Ok(result) => result,
2366 UBC 0 : Err(e) => return Err(PageReconstructError::from(e)),
2367 : };
2368 CBC 257271 : cont_lsn = lsn_floor;
2369 257271 : // metrics: open_layer does not count as fs access, so we are not updating `read_count`
2370 257271 : traversal_path.push((
2371 257271 : result,
2372 257271 : cont_lsn,
2373 257271 : Box::new({
2374 257271 : let frozen_layer = Arc::clone(frozen_layer);
2375 257271 : move || frozen_layer.traversal_id()
2376 257271 : }),
2377 257271 : ));
2378 257271 : continue 'outer;
2379 565971 : }
2380 : }
2381 :
2382 15598580 : if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
2383 15454851 : let layer = guard.get_from_desc(&layer);
2384 15454851 : // Get all the data needed to reconstruct the page version from this layer.
2385 15454851 : // But if we have an older cached page image, no need to go past that.
2386 15454851 : let lsn_floor = max(cached_lsn + 1, lsn_floor);
2387 15454851 : result = match layer
2388 15454851 : .get_value_reconstruct_data(key, lsn_floor..cont_lsn, reconstruct_state, ctx)
2389 891600 : .await
2390 : {
2391 15454829 : Ok(result) => result,
2392 10 : Err(e) => return Err(PageReconstructError::from(e)),
2393 : };
2394 15454829 : cont_lsn = lsn_floor;
2395 15454829 : *read_count += 1;
2396 15454829 : traversal_path.push((
2397 15454829 : result,
2398 15454829 : cont_lsn,
2399 15454829 : Box::new({
2400 15454829 : let layer = layer.to_owned();
2401 15454829 : move || layer.traversal_id()
2402 15454829 : }),
2403 15454829 : ));
2404 15454829 : continue 'outer;
2405 143729 : } else if timeline.ancestor_timeline.is_some() {
2406 : // Nothing on this timeline. Traverse to parent
2407 143728 : result = ValueReconstructResult::Continue;
2408 143728 : cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
2409 143728 : continue 'outer;
2410 : } else {
2411 : // Nothing found
2412 1 : result = ValueReconstructResult::Missing;
2413 1 : continue 'outer;
2414 : }
2415 : }
2416 5862631 : }
2417 :
2418 : /// # Cancel-safety
2419 : ///
2420 : /// This method is cancellation-safe.
2421 5939236 : async fn lookup_cached_page(
2422 5939236 : &self,
2423 5939236 : key: &Key,
2424 5939236 : lsn: Lsn,
2425 5939236 : ctx: &RequestContext,
2426 5939236 : ) -> Option<(Lsn, Bytes)> {
2427 5939222 : let cache = page_cache::get();
2428 :
2429 : // FIXME: It's pointless to check the cache for things that are not 8kB pages.
2430 : // We should look at the key to determine if it's a cacheable object
2431 5939222 : let (lsn, read_guard) = cache
2432 5939222 : .lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
2433 4596772 : .await?;
2434 1342450 : let img = Bytes::from(read_guard.to_vec());
2435 1342450 : Some((lsn, img))
2436 5939222 : }
2437 :
2438 1197905 : fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
2439 1197905 : let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
2440 UBC 0 : format!(
2441 0 : "Ancestor is missing. Timeline id: {} Ancestor id {:?}",
2442 0 : self.timeline_id,
2443 0 : self.get_ancestor_timeline_id(),
2444 0 : )
2445 CBC 1197905 : })?;
2446 1197905 : Ok(Arc::clone(ancestor))
2447 1197905 : }
2448 :
2449 3584257 : pub(crate) fn get_shard_identity(&self) -> &ShardIdentity {
2450 3584257 : &self.shard_identity
2451 3584257 : }
2452 :
2453 : ///
2454 : /// Get a handle to the latest layer for appending.
2455 : ///
2456 1781990 : async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
2457 1781989 : let mut guard = self.layers.write().await;
2458 1781989 : let layer = guard
2459 1781989 : .get_layer_for_write(
2460 1781989 : lsn,
2461 1781989 : self.get_last_record_lsn(),
2462 1781989 : self.conf,
2463 1781989 : self.timeline_id,
2464 1781989 : self.tenant_shard_id,
2465 1781989 : )
2466 UBC 0 : .await?;
2467 CBC 1781989 : Ok(layer)
2468 1781989 : }
2469 :
2470 607051 : async fn put_value(
2471 607051 : &self,
2472 607051 : key: Key,
2473 607051 : lsn: Lsn,
2474 607051 : val: &Value,
2475 607051 : ctx: &RequestContext,
2476 607051 : ) -> anyhow::Result<()> {
2477 : //info!("PUT: key {} at {}", key, lsn);
2478 607051 : let layer = self.get_layer_for_write(lsn).await?;
2479 607051 : layer.put_value(key, lsn, val, ctx).await?;
2480 607051 : Ok(())
2481 607051 : }
2482 :
2483 1168370 : async fn put_values(
2484 1168370 : &self,
2485 1168370 : values: &HashMap<Key, Vec<(Lsn, Value)>>,
2486 1168370 : ctx: &RequestContext,
2487 1168370 : ) -> anyhow::Result<()> {
2488 : // Pick the first LSN in the batch to get the layer to write to.
2489 1168369 : for lsns in values.values() {
2490 1168369 : if let Some((lsn, _)) = lsns.first() {
2491 1168369 : let layer = self.get_layer_for_write(*lsn).await?;
2492 1168369 : layer.put_values(values, ctx).await?;
2493 1168369 : break;
2494 UBC 0 : }
2495 : }
2496 CBC 1168369 : Ok(())
2497 1168369 : }
2498 :
2499 6569 : async fn put_tombstones(&self, tombstones: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
2500 6569 : if let Some((_, lsn)) = tombstones.first() {
2501 6569 : let layer = self.get_layer_for_write(*lsn).await?;
2502 6569 : layer.put_tombstones(tombstones).await?;
2503 UBC 0 : }
2504 CBC 6569 : Ok(())
2505 6569 : }
2506 :
2507 49362591 : pub(crate) fn finish_write(&self, new_lsn: Lsn) {
2508 49362591 : assert!(new_lsn.is_aligned());
2509 :
2510 49362591 : self.metrics.last_record_gauge.set(new_lsn.0 as i64);
2511 49362591 : self.last_record_lsn.advance(new_lsn);
2512 49362591 : }
2513 :
2514 4663 : async fn freeze_inmem_layer(&self, write_lock_held: bool) {
2515 : // Freeze the current open in-memory layer. It will be written to disk on next
2516 : // iteration.
2517 4663 : let _write_guard = if write_lock_held {
2518 3102 : None
2519 : } else {
2520 1561 : Some(self.write_lock.lock().await)
2521 : };
2522 4663 : let mut guard = self.layers.write().await;
2523 4663 : guard
2524 4663 : .try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at)
2525 11 : .await;
2526 4663 : }
2527 :
2528 : /// Layer flusher task's main loop.
2529 1274 : async fn flush_loop(
2530 1274 : self: &Arc<Self>,
2531 1274 : mut layer_flush_start_rx: tokio::sync::watch::Receiver<u64>,
2532 1274 : ctx: &RequestContext,
2533 1274 : ) {
2534 1274 : info!("started flush loop");
2535 : loop {
2536 5745 : tokio::select! {
2537 10454 : _ = self.cancel.cancelled() => {
2538 10454 : info!("shutting down layer flush task");
2539 10454 : break;
2540 10454 : },
2541 10454 : _ = task_mgr::shutdown_watcher() => {
2542 10454 : info!("shutting down layer flush task");
2543 10454 : break;
2544 10454 : },
2545 10454 : _ = layer_flush_start_rx.changed() => {}
2546 10454 : }
2547 :
2548 UBC 0 : trace!("waking up");
2549 CBC 4478 : let timer = self.metrics.flush_time_histo.start_timer();
2550 4478 : let flush_counter = *layer_flush_start_rx.borrow();
2551 8914 : let result = loop {
2552 8914 : if self.cancel.is_cancelled() {
2553 UBC 0 : info!("dropping out of flush loop for timeline shutdown");
2554 : // Note: we do not bother transmitting into [`layer_flush_done_tx`], because
2555 : // anyone waiting on that will respect self.cancel as well: they will stop
2556 : // waiting at the same time we as drop out of this loop.
2557 0 : return;
2558 CBC 8914 : }
2559 :
2560 8914 : let layer_to_flush = {
2561 8914 : let guard = self.layers.read().await;
2562 8914 : guard.layer_map().frozen_layers.front().cloned()
2563 : // drop 'layers' lock to allow concurrent reads and writes
2564 : };
2565 8914 : let Some(layer_to_flush) = layer_to_flush else {
2566 4471 : break Ok(());
2567 : };
2568 17245 : match self.flush_frozen_layer(layer_to_flush, ctx).await {
2569 4436 : Ok(()) => {}
2570 : Err(FlushLayerError::Cancelled) => {
2571 1 : info!("dropping out of flush loop for timeline shutdown");
2572 1 : return;
2573 : }
2574 UBC 0 : err @ Err(
2575 : FlushLayerError::Other(_) | FlushLayerError::PageReconstructError(_),
2576 : ) => {
2577 0 : error!("could not flush frozen layer: {err:?}");
2578 0 : break err;
2579 : }
2580 : }
2581 : };
2582 : // Notify any listeners that we're done
2583 CBC 4471 : let _ = self
2584 4471 : .layer_flush_done_tx
2585 4471 : .send_replace((flush_counter, result));
2586 4471 :
2587 4471 : timer.stop_and_record();
2588 : }
2589 497 : }
2590 :
2591 1561 : async fn flush_frozen_layers_and_wait(&self) -> anyhow::Result<()> {
2592 1561 : let mut rx = self.layer_flush_done_tx.subscribe();
2593 1561 :
2594 1561 : // Increment the flush cycle counter and wake up the flush task.
2595 1561 : // Remember the new value, so that when we listen for the flush
2596 1561 : // to finish, we know when the flush that we initiated has
2597 1561 : // finished, instead of some other flush that was started earlier.
2598 1561 : let mut my_flush_request = 0;
2599 1561 :
2600 1561 : let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
2601 1561 : if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
2602 42 : anyhow::bail!("cannot flush frozen layers when flush_loop is not running, state is {flush_loop_state:?}")
2603 1519 : }
2604 1519 :
2605 1519 : self.layer_flush_start_tx.send_modify(|counter| {
2606 1519 : my_flush_request = *counter + 1;
2607 1519 : *counter = my_flush_request;
2608 1519 : });
2609 :
2610 3037 : loop {
2611 3037 : {
2612 3037 : let (last_result_counter, last_result) = &*rx.borrow();
2613 3037 : if *last_result_counter >= my_flush_request {
2614 1518 : if let Err(_err) = last_result {
2615 : // We already logged the original error in
2616 : // flush_loop. We cannot propagate it to the caller
2617 : // here, because it might not be Cloneable
2618 UBC 0 : anyhow::bail!(
2619 0 : "Could not flush frozen layer. Request id: {}",
2620 0 : my_flush_request
2621 0 : );
2622 : } else {
2623 CBC 1518 : return Ok(());
2624 : }
2625 1519 : }
2626 : }
2627 UBC 0 : trace!("waiting for flush to complete");
2628 CBC 1519 : tokio::select! {
2629 1518 : rx_e = rx.changed() => {
2630 : rx_e?;
2631 : },
2632 : // Cancellation safety: we are not leaving an I/O in-flight for the flush, we're just ignoring
2633 : // the notification from [`flush_loop`] that it completed.
2634 : _ = self.cancel.cancelled() => {
2635 1 : tracing::info!("Cancelled layer flush due on timeline shutdown");
2636 : return Ok(())
2637 : }
2638 : };
2639 UBC 0 : trace!("done")
2640 : }
2641 CBC 1561 : }
2642 :
2643 3102 : fn flush_frozen_layers(&self) {
2644 3102 : self.layer_flush_start_tx.send_modify(|val| *val += 1);
2645 3102 : }
2646 :
2647 : /// Flush one frozen in-memory layer to disk, as a new delta layer.
2648 4397 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id, layer=%frozen_layer))]
2649 : async fn flush_frozen_layer(
2650 : self: &Arc<Self>,
2651 : frozen_layer: Arc<InMemoryLayer>,
2652 : ctx: &RequestContext,
2653 : ) -> Result<(), FlushLayerError> {
2654 : // As a special case, when we have just imported an image into the repository,
2655 : // instead of writing out a L0 delta layer, we directly write out image layer
2656 : // files instead. This is possible as long as *all* the data imported into the
2657 : // repository have the same LSN.
2658 : let lsn_range = frozen_layer.get_lsn_range();
2659 : let (layers_to_upload, delta_layer_to_add) =
2660 : if lsn_range.start == self.initdb_lsn && lsn_range.end == Lsn(self.initdb_lsn.0 + 1) {
2661 : #[cfg(test)]
2662 : match &mut *self.flush_loop_state.lock().unwrap() {
2663 : FlushLoopState::NotStarted | FlushLoopState::Exited => {
2664 : panic!("flush loop not running")
2665 : }
2666 : FlushLoopState::Running {
2667 : initdb_optimization_count,
2668 : ..
2669 : } => {
2670 : *initdb_optimization_count += 1;
2671 : }
2672 : }
2673 : // Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
2674 : // require downloading anything during initial import.
2675 : let (partitioning, _lsn) = self
2676 : .repartition(
2677 : self.initdb_lsn,
2678 : self.get_compaction_target_size(),
2679 : EnumSet::empty(),
2680 : ctx,
2681 : )
2682 : .await?;
2683 :
2684 : if self.cancel.is_cancelled() {
2685 : return Err(FlushLayerError::Cancelled);
2686 : }
2687 :
2688 : // For image layers, we add them immediately into the layer map.
2689 : (
2690 : self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
2691 : .await?,
2692 : None,
2693 : )
2694 : } else {
2695 : #[cfg(test)]
2696 : match &mut *self.flush_loop_state.lock().unwrap() {
2697 : FlushLoopState::NotStarted | FlushLoopState::Exited => {
2698 : panic!("flush loop not running")
2699 : }
2700 : FlushLoopState::Running {
2701 : expect_initdb_optimization,
2702 : ..
2703 : } => {
2704 : assert!(!*expect_initdb_optimization, "expected initdb optimization");
2705 : }
2706 : }
2707 : // Normal case, write out a L0 delta layer file.
2708 : // `create_delta_layer` will not modify the layer map.
2709 : // We will remove frozen layer and add delta layer in one atomic operation later.
2710 : let layer = self.create_delta_layer(&frozen_layer, ctx).await?;
2711 : (
2712 : // FIXME: even though we have a single image and single delta layer assumption
2713 : // we push them to vec
2714 : vec![layer.clone()],
2715 : Some(layer),
2716 : )
2717 : };
2718 :
2719 4441 : pausable_failpoint!("flush-layer-cancel-after-writing-layer-out-pausable");
2720 :
2721 : if self.cancel.is_cancelled() {
2722 : return Err(FlushLayerError::Cancelled);
2723 : }
2724 :
2725 : let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
2726 : let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
2727 :
2728 : // The new on-disk layers are now in the layer map. We can remove the
2729 : // in-memory layer from the map now. The flushed layer is stored in
2730 : // the mapping in `create_delta_layer`.
2731 : let metadata = {
2732 : let mut guard = self.layers.write().await;
2733 :
2734 : if self.cancel.is_cancelled() {
2735 : return Err(FlushLayerError::Cancelled);
2736 : }
2737 :
2738 : guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
2739 :
2740 : if disk_consistent_lsn != old_disk_consistent_lsn {
2741 : assert!(disk_consistent_lsn > old_disk_consistent_lsn);
2742 : self.disk_consistent_lsn.store(disk_consistent_lsn);
2743 :
2744 : // Schedule remote uploads that will reflect our new disk_consistent_lsn
2745 : Some(self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?)
2746 : } else {
2747 : None
2748 : }
2749 : // release lock on 'layers'
2750 : };
2751 :
2752 : // FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
2753 : // a compaction can delete the file and then it won't be available for uploads any more.
2754 : // We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
2755 : // race situation.
2756 : // See https://github.com/neondatabase/neon/issues/4526
2757 4440 : pausable_failpoint!("flush-frozen-pausable");
2758 :
2759 : // This failpoint is used by another test case `test_pageserver_recovery`.
2760 UBC 0 : fail_point!("flush-frozen-exit");
2761 :
2762 : // Update the metadata file, with new 'disk_consistent_lsn'
2763 : //
2764 : // TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
2765 : // *all* the layers, to avoid fsyncing the file multiple times.
2766 :
2767 : // If we updated our disk_consistent_lsn, persist the updated metadata to local disk.
2768 : if let Some(metadata) = metadata {
2769 : save_metadata(
2770 : self.conf,
2771 : &self.tenant_shard_id,
2772 : &self.timeline_id,
2773 : &metadata,
2774 : )
2775 : .await
2776 : .context("save_metadata")?;
2777 : }
2778 : Ok(())
2779 : }
2780 :
2781 : /// Update metadata file
2782 CBC 4464 : fn schedule_uploads(
2783 4464 : &self,
2784 4464 : disk_consistent_lsn: Lsn,
2785 4464 : layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
2786 4464 : ) -> anyhow::Result<TimelineMetadata> {
2787 4464 : // We can only save a valid 'prev_record_lsn' value on disk if we
2788 4464 : // flushed *all* in-memory changes to disk. We only track
2789 4464 : // 'prev_record_lsn' in memory for the latest processed record, so we
2790 4464 : // don't remember what the correct value that corresponds to some old
2791 4464 : // LSN is. But if we flush everything, then the value corresponding
2792 4464 : // current 'last_record_lsn' is correct and we can store it on disk.
2793 4464 : let RecordLsn {
2794 4464 : last: last_record_lsn,
2795 4464 : prev: prev_record_lsn,
2796 4464 : } = self.last_record_lsn.load();
2797 4464 : let ondisk_prev_record_lsn = if disk_consistent_lsn == last_record_lsn {
2798 1283 : Some(prev_record_lsn)
2799 : } else {
2800 3181 : None
2801 : };
2802 :
2803 4464 : let ancestor_timeline_id = self
2804 4464 : .ancestor_timeline
2805 4464 : .as_ref()
2806 4464 : .map(|ancestor| ancestor.timeline_id);
2807 4464 :
2808 4464 : let metadata = TimelineMetadata::new(
2809 4464 : disk_consistent_lsn,
2810 4464 : ondisk_prev_record_lsn,
2811 4464 : ancestor_timeline_id,
2812 4464 : self.ancestor_lsn,
2813 4464 : *self.latest_gc_cutoff_lsn.read(),
2814 4464 : self.initdb_lsn,
2815 4464 : self.pg_version,
2816 4464 : );
2817 4464 :
2818 4464 : fail_point!("checkpoint-before-saving-metadata", |x| bail!(
2819 UBC 0 : "{}",
2820 0 : x.unwrap()
2821 CBC 4464 : ));
2822 :
2823 4464 : if let Some(remote_client) = &self.remote_client {
2824 8904 : for layer in layers_to_upload {
2825 4440 : remote_client.schedule_layer_file_upload(layer)?;
2826 : }
2827 4464 : remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
2828 UBC 0 : }
2829 :
2830 CBC 4464 : Ok(metadata)
2831 4464 : }
2832 :
2833 24 : async fn update_metadata_file(
2834 24 : &self,
2835 24 : disk_consistent_lsn: Lsn,
2836 24 : layers_to_upload: impl IntoIterator<Item = ResidentLayer>,
2837 24 : ) -> anyhow::Result<()> {
2838 24 : let metadata = self.schedule_uploads(disk_consistent_lsn, layers_to_upload)?;
2839 :
2840 24 : save_metadata(
2841 24 : self.conf,
2842 24 : &self.tenant_shard_id,
2843 24 : &self.timeline_id,
2844 24 : &metadata,
2845 24 : )
2846 UBC 0 : .await
2847 CBC 24 : .context("save_metadata")?;
2848 :
2849 24 : Ok(())
2850 24 : }
2851 :
2852 : // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked
2853 : // in layer map immediately. The caller is responsible to put it into the layer map.
2854 4399 : async fn create_delta_layer(
2855 4399 : self: &Arc<Self>,
2856 4399 : frozen_layer: &Arc<InMemoryLayer>,
2857 4399 : ctx: &RequestContext,
2858 4399 : ) -> anyhow::Result<ResidentLayer> {
2859 4399 : let span = tracing::info_span!("blocking");
2860 4399 : let new_delta: ResidentLayer = tokio::task::spawn_blocking({
2861 4399 : let self_clone = Arc::clone(self);
2862 4399 : let frozen_layer = Arc::clone(frozen_layer);
2863 4399 : let ctx = ctx.attached_child();
2864 4399 : move || {
2865 4399 : // Write it out
2866 4399 : // Keep this inside `spawn_blocking` and `Handle::current`
2867 4399 : // as long as the write path is still sync and the read impl
2868 4399 : // is still not fully async. Otherwise executor threads would
2869 4399 : // be blocked.
2870 4399 : let _g = span.entered();
2871 4399 : let new_delta =
2872 4399 : Handle::current().block_on(frozen_layer.write_to_disk(&self_clone, &ctx))?;
2873 4399 : let new_delta_path = new_delta.local_path().to_owned();
2874 4399 :
2875 4399 : // Sync it to disk.
2876 4399 : //
2877 4399 : // We must also fsync the timeline dir to ensure the directory entries for
2878 4399 : // new layer files are durable.
2879 4399 : //
2880 4399 : // NB: timeline dir must be synced _after_ the file contents are durable.
2881 4399 : // So, two separate fsyncs are required, they mustn't be batched.
2882 4399 : //
2883 4399 : // TODO: If we're running inside 'flush_frozen_layers' and there are multiple
2884 4399 : // files to flush, the fsync overhead can be reduces as follows:
2885 4399 : // 1. write them all to temporary file names
2886 4399 : // 2. fsync them
2887 4399 : // 3. rename to the final name
2888 4399 : // 4. fsync the parent directory.
2889 4399 : // Note that (1),(2),(3) today happen inside write_to_disk().
2890 4399 : //
2891 4399 : // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
2892 4399 : par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?;
2893 4399 : par_fsync::par_fsync(&[self_clone
2894 4399 : .conf
2895 4399 : .timeline_path(&self_clone.tenant_shard_id, &self_clone.timeline_id)])
2896 4399 : .context("fsync of timeline dir")?;
2897 :
2898 4397 : anyhow::Ok(new_delta)
2899 4399 : }
2900 4399 : })
2901 4397 : .await
2902 4397 : .context("spawn_blocking")
2903 4397 : .and_then(|x| x)?;
2904 :
2905 4397 : Ok(new_delta)
2906 4397 : }
2907 :
2908 1419 : async fn repartition(
2909 1419 : &self,
2910 1419 : lsn: Lsn,
2911 1419 : partition_size: u64,
2912 1419 : flags: EnumSet<CompactFlags>,
2913 1419 : ctx: &RequestContext,
2914 1419 : ) -> anyhow::Result<(KeyPartitioning, Lsn)> {
2915 1419 : {
2916 1419 : let partitioning_guard = self.partitioning.lock().unwrap();
2917 1419 : let distance = lsn.0 - partitioning_guard.1 .0;
2918 1419 : if partitioning_guard.1 != Lsn(0)
2919 851 : && distance <= self.repartition_threshold
2920 703 : && !flags.contains(CompactFlags::ForceRepartition)
2921 : {
2922 UBC 0 : debug!(
2923 0 : distance,
2924 0 : threshold = self.repartition_threshold,
2925 0 : "no repartitioning needed"
2926 0 : );
2927 CBC 701 : return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
2928 718 : }
2929 : }
2930 139421 : let keyspace = self.collect_keyspace(lsn, ctx).await?;
2931 715 : let partitioning = keyspace.partition(partition_size);
2932 715 :
2933 715 : let mut partitioning_guard = self.partitioning.lock().unwrap();
2934 715 : if lsn > partitioning_guard.1 {
2935 715 : *partitioning_guard = (partitioning, lsn);
2936 715 : } else {
2937 UBC 0 : warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
2938 : }
2939 CBC 715 : Ok((partitioning_guard.0.clone(), partitioning_guard.1))
2940 1417 : }
2941 :
2942 : // Is it time to create a new image layer for the given partition?
2943 35656 : async fn time_for_new_image_layer(
2944 35656 : &self,
2945 35656 : partition: &KeySpace,
2946 35656 : lsn: Lsn,
2947 35656 : ) -> anyhow::Result<bool> {
2948 35656 : let threshold = self.get_image_creation_threshold();
2949 :
2950 35656 : let guard = self.layers.read().await;
2951 35656 : let layers = guard.layer_map();
2952 35656 :
2953 35656 : let mut max_deltas = 0;
2954 35656 : {
2955 35656 : let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
2956 35656 : if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
2957 4970 : let img_range =
2958 4970 : partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
2959 4970 : if wanted.overlaps(&img_range) {
2960 : //
2961 : // gc_timeline only pays attention to image layers that are older than the GC cutoff,
2962 : // but create_image_layers creates image layers at last-record-lsn.
2963 : // So it's possible that gc_timeline wants a new image layer to be created for a key range,
2964 : // but the range is already covered by image layers at more recent LSNs. Before we
2965 : // create a new image layer, check if the range is already covered at more recent LSNs.
2966 UBC 0 : if !layers
2967 0 : .image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))?
2968 : {
2969 0 : debug!(
2970 0 : "Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
2971 0 : img_range.start, img_range.end, cutoff_lsn, lsn
2972 0 : );
2973 0 : return Ok(true);
2974 0 : }
2975 CBC 4970 : }
2976 30686 : }
2977 : }
2978 :
2979 2021409 : for part_range in &partition.ranges {
2980 1991352 : let image_coverage = layers.image_coverage(part_range, lsn)?;
2981 3777583 : for (img_range, last_img) in image_coverage {
2982 1791830 : let img_lsn = if let Some(last_img) = last_img {
2983 348730 : last_img.get_lsn_range().end
2984 : } else {
2985 1443100 : Lsn(0)
2986 : };
2987 : // Let's consider an example:
2988 : //
2989 : // delta layer with LSN range 71-81
2990 : // delta layer with LSN range 81-91
2991 : // delta layer with LSN range 91-101
2992 : // image layer at LSN 100
2993 : //
2994 : // If 'lsn' is still 100, i.e. no new WAL has been processed since the last image layer,
2995 : // there's no need to create a new one. We check this case explicitly, to avoid passing
2996 : // a bogus range to count_deltas below, with start > end. It's even possible that there
2997 : // are some delta layers *later* than current 'lsn', if more WAL was processed and flushed
2998 : // after we read last_record_lsn, which is passed here in the 'lsn' argument.
2999 1791830 : if img_lsn < lsn {
3000 1770176 : let num_deltas =
3001 1770176 : layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?;
3002 :
3003 1770176 : max_deltas = max_deltas.max(num_deltas);
3004 1770176 : if num_deltas >= threshold {
3005 UBC 0 : debug!(
3006 0 : "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",
3007 0 : img_range.start, img_range.end, num_deltas, img_lsn, lsn
3008 0 : );
3009 CBC 5599 : return Ok(true);
3010 1764577 : }
3011 21654 : }
3012 : }
3013 : }
3014 :
3015 UBC 0 : debug!(
3016 0 : max_deltas,
3017 0 : "none of the partitioned ranges had >= {threshold} deltas"
3018 0 : );
3019 CBC 30057 : Ok(false)
3020 35656 : }
3021 :
3022 UBC 0 : #[tracing::instrument(skip_all, fields(%lsn, %force))]
3023 : async fn create_image_layers(
3024 : self: &Arc<Timeline>,
3025 : partitioning: &KeyPartitioning,
3026 : lsn: Lsn,
3027 : force: bool,
3028 : ctx: &RequestContext,
3029 : ) -> Result<Vec<ResidentLayer>, PageReconstructError> {
3030 : let timer = self.metrics.create_images_time_histo.start_timer();
3031 : let mut image_layers = Vec::new();
3032 :
3033 : // We need to avoid holes between generated image layers.
3034 : // Otherwise LayerMap::image_layer_exists will return false if key range of some layer is covered by more than one
3035 : // image layer with hole between them. In this case such layer can not be utilized by GC.
3036 : //
3037 : // How such hole between partitions can appear?
3038 : // if we have relation with relid=1 and size 100 and relation with relid=2 with size 200 then result of
3039 : // KeySpace::partition may contain partitions <100000000..100000099> and <200000000..200000199>.
3040 : // If there is delta layer <100000000..300000000> then it never be garbage collected because
3041 : // image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
3042 : let mut start = Key::MIN;
3043 :
3044 : for partition in partitioning.parts.iter() {
3045 : let img_range = start..partition.ranges.last().unwrap().end;
3046 : start = img_range.end;
3047 : if force || self.time_for_new_image_layer(partition, lsn).await? {
3048 : let mut image_layer_writer = ImageLayerWriter::new(
3049 : self.conf,
3050 : self.timeline_id,
3051 : self.tenant_shard_id,
3052 : &img_range,
3053 : lsn,
3054 : )
3055 : .await?;
3056 :
3057 0 : fail_point!("image-layer-writer-fail-before-finish", |_| {
3058 0 : Err(PageReconstructError::Other(anyhow::anyhow!(
3059 0 : "failpoint image-layer-writer-fail-before-finish"
3060 0 : )))
3061 0 : });
3062 : for range in &partition.ranges {
3063 : let mut key = range.start;
3064 : while key < range.end {
3065 : if self.shard_identity.is_key_disposable(&key) {
3066 0 : debug!(
3067 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
3068 0 : key,
3069 0 : self.shard_identity.get_shard_number(&key)
3070 0 : );
3071 : key = key.next();
3072 : continue;
3073 : }
3074 : let img = match self.get(key, lsn, ctx).await {
3075 : Ok(img) => img,
3076 : Err(err) => {
3077 : // If we fail to reconstruct a VM or FSM page, we can zero the
3078 : // page without losing any actual user data. That seems better
3079 : // than failing repeatedly and getting stuck.
3080 : //
3081 : // We had a bug at one point, where we truncated the FSM and VM
3082 : // in the pageserver, but the Postgres didn't know about that
3083 : // and continued to generate incremental WAL records for pages
3084 : // that didn't exist in the pageserver. Trying to replay those
3085 : // WAL records failed to find the previous image of the page.
3086 : // This special case allows us to recover from that situation.
3087 : // See https://github.com/neondatabase/neon/issues/2601.
3088 : //
3089 : // Unfortunately we cannot do this for the main fork, or for
3090 : // any metadata keys, keys, as that would lead to actual data
3091 : // loss.
3092 : if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
3093 0 : warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
3094 : ZERO_PAGE.clone()
3095 : } else {
3096 : return Err(err);
3097 : }
3098 : }
3099 : };
3100 :
3101 : image_layer_writer.put_image(key, &img).await?;
3102 : key = key.next();
3103 : }
3104 : }
3105 : let image_layer = image_layer_writer.finish(self).await?;
3106 : image_layers.push(image_layer);
3107 : }
3108 : }
3109 : // All layers that the GC wanted us to create have now been created.
3110 : //
3111 : // It's possible that another GC cycle happened while we were compacting, and added
3112 : // something new to wanted_image_layers, and we now clear that before processing it.
3113 : // That's OK, because the next GC iteration will put it back in.
3114 : *self.wanted_image_layers.lock().unwrap() = None;
3115 :
3116 : // Sync the new layer to disk before adding it to the layer map, to make sure
3117 : // we don't garbage collect something based on the new layer, before it has
3118 : // reached the disk.
3119 : //
3120 : // We must also fsync the timeline dir to ensure the directory entries for
3121 : // new layer files are durable
3122 : //
3123 : // Compaction creates multiple image layers. It would be better to create them all
3124 : // and fsync them all in parallel.
3125 : let all_paths = image_layers
3126 : .iter()
3127 CBC 5638 : .map(|layer| layer.local_path().to_owned())
3128 : .collect::<Vec<_>>();
3129 :
3130 : par_fsync::par_fsync_async(&all_paths)
3131 : .await
3132 : .context("fsync of newly created layer files")?;
3133 :
3134 : par_fsync::par_fsync_async(&[self
3135 : .conf
3136 : .timeline_path(&self.tenant_shard_id, &self.timeline_id)])
3137 : .await
3138 : .context("fsync of timeline dir")?;
3139 :
3140 : let mut guard = self.layers.write().await;
3141 :
3142 : // FIXME: we could add the images to be uploaded *before* returning from here, but right
3143 : // now they are being scheduled outside of write lock
3144 : guard.track_new_image_layers(&image_layers, &self.metrics);
3145 : drop_wlock(guard);
3146 : timer.stop_and_record();
3147 :
3148 : Ok(image_layers)
3149 : }
3150 :
3151 : /// Wait until the background initial logical size calculation is complete, or
3152 : /// this Timeline is shut down. Calling this function will cause the initial
3153 : /// logical size calculation to skip waiting for the background jobs barrier.
3154 231 : pub(crate) async fn await_initial_logical_size(self: Arc<Self>) {
3155 231 : if let Some(await_bg_cancel) = self
3156 231 : .current_logical_size
3157 231 : .cancel_wait_for_background_loop_concurrency_limit_semaphore
3158 231 : .get()
3159 225 : {
3160 225 : await_bg_cancel.cancel();
3161 225 : } else {
3162 : // We should not wait if we were not able to explicitly instruct
3163 : // the logical size cancellation to skip the concurrency limit semaphore.
3164 : // TODO: this is an unexpected case. We should restructure so that it
3165 : // can't happen.
3166 6 : tracing::info!(
3167 6 : "await_initial_logical_size: can't get semaphore cancel token, skipping"
3168 6 : );
3169 : }
3170 :
3171 231 : tokio::select!(
3172 454 : _ = self.current_logical_size.initialized.acquire() => {},
3173 454 : _ = self.cancel.cancelled() => {}
3174 454 : )
3175 223 : }
3176 : }
3177 :
3178 1086 : #[derive(Default)]
3179 : struct CompactLevel0Phase1Result {
3180 : new_layers: Vec<ResidentLayer>,
3181 : deltas_to_compact: Vec<Layer>,
3182 : }
3183 :
3184 : /// Top-level failure to compact.
3185 UBC 0 : #[derive(Debug, thiserror::Error)]
3186 : pub(crate) enum CompactionError {
3187 : #[error("The timeline or pageserver is shutting down")]
3188 : ShuttingDown,
3189 : /// Compaction cannot be done right now; page reconstruction and so on.
3190 : #[error(transparent)]
3191 : Other(#[from] anyhow::Error),
3192 : }
3193 :
3194 : #[serde_as]
3195 CBC 3906 : #[derive(serde::Serialize)]
3196 : struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
3197 :
3198 9604 : #[derive(Default)]
3199 : enum DurationRecorder {
3200 : #[default]
3201 : NotStarted,
3202 : Recorded(RecordedDuration, tokio::time::Instant),
3203 : }
3204 :
3205 : impl DurationRecorder {
3206 2783 : pub fn till_now(&self) -> DurationRecorder {
3207 2783 : match self {
3208 : DurationRecorder::NotStarted => {
3209 UBC 0 : panic!("must only call on recorded measurements")
3210 : }
3211 CBC 2783 : DurationRecorder::Recorded(_, ended) => {
3212 2783 : let now = tokio::time::Instant::now();
3213 2783 : DurationRecorder::Recorded(RecordedDuration(now - *ended), now)
3214 2783 : }
3215 2783 : }
3216 2783 : }
3217 1953 : pub fn into_recorded(self) -> Option<RecordedDuration> {
3218 1953 : match self {
3219 UBC 0 : DurationRecorder::NotStarted => None,
3220 CBC 1953 : DurationRecorder::Recorded(recorded, _) => Some(recorded),
3221 : }
3222 1953 : }
3223 : }
3224 :
3225 1372 : #[derive(Default)]
3226 : struct CompactLevel0Phase1StatsBuilder {
3227 : version: Option<u64>,
3228 : tenant_id: Option<TenantShardId>,
3229 : timeline_id: Option<TimelineId>,
3230 : read_lock_acquisition_micros: DurationRecorder,
3231 : read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
3232 : read_lock_held_key_sort_micros: DurationRecorder,
3233 : read_lock_held_prerequisites_micros: DurationRecorder,
3234 : read_lock_held_compute_holes_micros: DurationRecorder,
3235 : read_lock_drop_micros: DurationRecorder,
3236 : write_layer_files_micros: DurationRecorder,
3237 : level0_deltas_count: Option<usize>,
3238 : new_deltas_count: Option<usize>,
3239 : new_deltas_size: Option<u64>,
3240 : }
3241 :
3242 279 : #[derive(serde::Serialize)]
3243 : struct CompactLevel0Phase1Stats {
3244 : version: u64,
3245 : tenant_id: TenantShardId,
3246 : timeline_id: TimelineId,
3247 : read_lock_acquisition_micros: RecordedDuration,
3248 : read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
3249 : read_lock_held_key_sort_micros: RecordedDuration,
3250 : read_lock_held_prerequisites_micros: RecordedDuration,
3251 : read_lock_held_compute_holes_micros: RecordedDuration,
3252 : read_lock_drop_micros: RecordedDuration,
3253 : write_layer_files_micros: RecordedDuration,
3254 : level0_deltas_count: usize,
3255 : new_deltas_count: usize,
3256 : new_deltas_size: u64,
3257 : }
3258 :
3259 : impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
3260 : type Error = anyhow::Error;
3261 :
3262 279 : fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
3263 279 : Ok(Self {
3264 279 : version: value.version.ok_or_else(|| anyhow!("version not set"))?,
3265 279 : tenant_id: value
3266 279 : .tenant_id
3267 279 : .ok_or_else(|| anyhow!("tenant_id not set"))?,
3268 279 : timeline_id: value
3269 279 : .timeline_id
3270 279 : .ok_or_else(|| anyhow!("timeline_id not set"))?,
3271 279 : read_lock_acquisition_micros: value
3272 279 : .read_lock_acquisition_micros
3273 279 : .into_recorded()
3274 279 : .ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
3275 279 : read_lock_held_spawn_blocking_startup_micros: value
3276 279 : .read_lock_held_spawn_blocking_startup_micros
3277 279 : .into_recorded()
3278 279 : .ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
3279 279 : read_lock_held_key_sort_micros: value
3280 279 : .read_lock_held_key_sort_micros
3281 279 : .into_recorded()
3282 279 : .ok_or_else(|| anyhow!("read_lock_held_key_sort_micros not set"))?,
3283 279 : read_lock_held_prerequisites_micros: value
3284 279 : .read_lock_held_prerequisites_micros
3285 279 : .into_recorded()
3286 279 : .ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
3287 279 : read_lock_held_compute_holes_micros: value
3288 279 : .read_lock_held_compute_holes_micros
3289 279 : .into_recorded()
3290 279 : .ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
3291 279 : read_lock_drop_micros: value
3292 279 : .read_lock_drop_micros
3293 279 : .into_recorded()
3294 279 : .ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
3295 279 : write_layer_files_micros: value
3296 279 : .write_layer_files_micros
3297 279 : .into_recorded()
3298 279 : .ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
3299 279 : level0_deltas_count: value
3300 279 : .level0_deltas_count
3301 279 : .ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
3302 279 : new_deltas_count: value
3303 279 : .new_deltas_count
3304 279 : .ok_or_else(|| anyhow!("new_deltas_count not set"))?,
3305 279 : new_deltas_size: value
3306 279 : .new_deltas_size
3307 279 : .ok_or_else(|| anyhow!("new_deltas_size not set"))?,
3308 : })
3309 279 : }
3310 : }
3311 :
3312 : impl Timeline {
3313 : /// Level0 files first phase of compaction, explained in the [`Self::compact`] comment.
3314 1372 : async fn compact_level0_phase1(
3315 1372 : self: &Arc<Self>,
3316 1372 : guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
3317 1372 : mut stats: CompactLevel0Phase1StatsBuilder,
3318 1372 : target_file_size: u64,
3319 1372 : ctx: &RequestContext,
3320 1372 : ) -> Result<CompactLevel0Phase1Result, CompactionError> {
3321 1372 : stats.read_lock_held_spawn_blocking_startup_micros =
3322 1372 : stats.read_lock_acquisition_micros.till_now(); // set by caller
3323 1372 : let layers = guard.layer_map();
3324 1372 : let level0_deltas = layers.get_level0_deltas()?;
3325 1372 : let mut level0_deltas = level0_deltas
3326 1372 : .into_iter()
3327 6135 : .map(|x| guard.get_from_desc(&x))
3328 1372 : .collect_vec();
3329 1372 : stats.level0_deltas_count = Some(level0_deltas.len());
3330 1372 : // Only compact if enough layers have accumulated.
3331 1372 : let threshold = self.get_compaction_threshold();
3332 1372 : if level0_deltas.is_empty() || level0_deltas.len() < threshold {
3333 UBC 0 : debug!(
3334 0 : level0_deltas = level0_deltas.len(),
3335 0 : threshold, "too few deltas to compact"
3336 0 : );
3337 CBC 1086 : return Ok(CompactLevel0Phase1Result::default());
3338 286 : }
3339 286 :
3340 286 : // This failpoint is used together with `test_duplicate_layers` integration test.
3341 286 : // It returns the compaction result exactly the same layers as input to compaction.
3342 286 : // We want to ensure that this will not cause any problem when updating the layer map
3343 286 : // after the compaction is finished.
3344 286 : //
3345 286 : // Currently, there are two rare edge cases that will cause duplicated layers being
3346 286 : // inserted.
3347 286 : // 1. The compaction job is inturrupted / did not finish successfully. Assume we have file 1, 2, 3, 4, which
3348 286 : // is compacted to 5, but the page server is shut down, next time we start page server we will get a layer
3349 286 : // map containing 1, 2, 3, 4, and 5, whereas 5 has the same content as 4. If we trigger L0 compation at this
3350 286 : // point again, it is likely that we will get a file 6 which has the same content and the key range as 5,
3351 286 : // and this causes an overwrite. This is acceptable because the content is the same, and we should do a
3352 286 : // layer replace instead of the normal remove / upload process.
3353 286 : // 2. The input workload pattern creates exactly n files that are sorted, non-overlapping and is of target file
3354 286 : // size length. Compaction will likely create the same set of n files afterwards.
3355 286 : //
3356 286 : // This failpoint is a superset of both of the cases.
3357 286 : if cfg!(feature = "testing") {
3358 286 : let active = (|| {
3359 286 : ::fail::fail_point!("compact-level0-phase1-return-same", |_| true);
3360 286 : false
3361 286 : })();
3362 286 :
3363 286 : if active {
3364 3 : let mut new_layers = Vec::with_capacity(level0_deltas.len());
3365 43 : for delta in &level0_deltas {
3366 : // we are just faking these layers as being produced again for this failpoint
3367 40 : new_layers.push(
3368 40 : delta
3369 40 : .download_and_keep_resident()
3370 UBC 0 : .await
3371 CBC 40 : .context("download layer for failpoint")?,
3372 : );
3373 : }
3374 3 : tracing::info!("compact-level0-phase1-return-same"); // so that we can check if we hit the failpoint
3375 3 : return Ok(CompactLevel0Phase1Result {
3376 3 : new_layers,
3377 3 : deltas_to_compact: level0_deltas,
3378 3 : });
3379 283 : }
3380 UBC 0 : }
3381 :
3382 : // Gather the files to compact in this iteration.
3383 : //
3384 : // Start with the oldest Level 0 delta file, and collect any other
3385 : // level 0 files that form a contiguous sequence, such that the end
3386 : // LSN of previous file matches the start LSN of the next file.
3387 : //
3388 : // Note that if the files don't form such a sequence, we might
3389 : // "compact" just a single file. That's a bit pointless, but it allows
3390 : // us to get rid of the level 0 file, and compact the other files on
3391 : // the next iteration. This could probably made smarter, but such
3392 : // "gaps" in the sequence of level 0 files should only happen in case
3393 : // of a crash, partial download from cloud storage, or something like
3394 : // that, so it's not a big deal in practice.
3395 CBC 7212 : level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
3396 283 : let mut level0_deltas_iter = level0_deltas.iter();
3397 283 :
3398 283 : let first_level0_delta = level0_deltas_iter.next().unwrap();
3399 283 : let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
3400 283 : let mut deltas_to_compact = Vec::with_capacity(level0_deltas.len());
3401 283 :
3402 283 : deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
3403 3670 : for l in level0_deltas_iter {
3404 3387 : let lsn_range = &l.layer_desc().lsn_range;
3405 3387 :
3406 3387 : if lsn_range.start != prev_lsn_end {
3407 UBC 0 : break;
3408 CBC 3387 : }
3409 3387 : deltas_to_compact.push(l.download_and_keep_resident().await?);
3410 3387 : prev_lsn_end = lsn_range.end;
3411 : }
3412 283 : let lsn_range = Range {
3413 283 : start: deltas_to_compact
3414 283 : .first()
3415 283 : .unwrap()
3416 283 : .layer_desc()
3417 283 : .lsn_range
3418 283 : .start,
3419 283 : end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
3420 283 : };
3421 :
3422 283 : info!(
3423 283 : "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)",
3424 283 : lsn_range.start,
3425 283 : lsn_range.end,
3426 283 : deltas_to_compact.len(),
3427 283 : level0_deltas.len()
3428 283 : );
3429 :
3430 3670 : for l in deltas_to_compact.iter() {
3431 3670 : info!("compact includes {l}");
3432 : }
3433 :
3434 : // We don't need the original list of layers anymore. Drop it so that
3435 : // we don't accidentally use it later in the function.
3436 283 : drop(level0_deltas);
3437 283 :
3438 283 : stats.read_lock_held_prerequisites_micros = stats
3439 283 : .read_lock_held_spawn_blocking_startup_micros
3440 283 : .till_now();
3441 283 :
3442 283 : // Determine N largest holes where N is number of compacted layers.
3443 283 : let max_holes = deltas_to_compact.len();
3444 283 : let last_record_lsn = self.get_last_record_lsn();
3445 283 : let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
3446 283 : let min_hole_coverage_size = 3; // TODO: something more flexible?
3447 283 :
3448 283 : // min-heap (reserve space for one more element added before eviction)
3449 283 : let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
3450 283 : let mut prev: Option<Key> = None;
3451 283 :
3452 283 : let mut all_keys = Vec::new();
3453 :
3454 3670 : for l in deltas_to_compact.iter() {
3455 3670 : all_keys.extend(l.load_keys(ctx).await?);
3456 : }
3457 :
3458 : // FIXME: should spawn_blocking the rest of this function
3459 :
3460 : // The current stdlib sorting implementation is designed in a way where it is
3461 : // particularly fast where the slice is made up of sorted sub-ranges.
3462 161096304 : all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
3463 283 :
3464 283 : stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
3465 :
3466 16792830 : for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
3467 16792830 : if let Some(prev_key) = prev {
3468 : // just first fast filter
3469 16792547 : if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
3470 193991 : let key_range = prev_key..next_key;
3471 : // Measuring hole by just subtraction of i128 representation of key range boundaries
3472 : // has not so much sense, because largest holes will corresponds field1/field2 changes.
3473 : // But we are mostly interested to eliminate holes which cause generation of excessive image layers.
3474 : // That is why it is better to measure size of hole as number of covering image layers.
3475 193991 : let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
3476 193991 : if coverage_size >= min_hole_coverage_size {
3477 192 : heap.push(Hole {
3478 192 : key_range,
3479 192 : coverage_size,
3480 192 : });
3481 192 : if heap.len() > max_holes {
3482 107 : heap.pop(); // remove smallest hole
3483 107 : }
3484 193799 : }
3485 16598556 : }
3486 283 : }
3487 16792830 : prev = Some(next_key.next());
3488 : }
3489 283 : stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
3490 283 : drop_rlock(guard);
3491 283 : stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
3492 283 : let mut holes = heap.into_vec();
3493 283 : holes.sort_unstable_by_key(|hole| hole.key_range.start);
3494 283 : let mut next_hole = 0; // index of next hole in holes vector
3495 283 :
3496 283 : // This iterator walks through all key-value pairs from all the layers
3497 283 : // we're compacting, in key, LSN order.
3498 283 : let all_values_iter = all_keys.iter();
3499 283 :
3500 283 : // This iterator walks through all keys and is needed to calculate size used by each key
3501 283 : let mut all_keys_iter = all_keys
3502 283 : .iter()
3503 16394012 : .map(|DeltaEntry { key, lsn, size, .. }| (*key, *lsn, *size))
3504 16393729 : .coalesce(|mut prev, cur| {
3505 16393729 : // Coalesce keys that belong to the same key pair.
3506 16393729 : // This ensures that compaction doesn't put them
3507 16393729 : // into different layer files.
3508 16393729 : // Still limit this by the target file size,
3509 16393729 : // so that we keep the size of the files in
3510 16393729 : // check.
3511 16393729 : if prev.0 == cur.0 && prev.2 < target_file_size {
3512 14636511 : prev.2 += cur.2;
3513 14636511 : Ok(prev)
3514 : } else {
3515 1757218 : Err((prev, cur))
3516 : }
3517 16393729 : });
3518 283 :
3519 283 : // Merge the contents of all the input delta layers into a new set
3520 283 : // of delta layers, based on the current partitioning.
3521 283 : //
3522 283 : // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one.
3523 283 : // It's possible that there is a single key with so many page versions that storing all of them in a single layer file
3524 283 : // would be too large. In that case, we also split on the LSN dimension.
3525 283 : //
3526 283 : // LSN
3527 283 : // ^
3528 283 : // |
3529 283 : // | +-----------+ +--+--+--+--+
3530 283 : // | | | | | | | |
3531 283 : // | +-----------+ | | | | |
3532 283 : // | | | | | | | |
3533 283 : // | +-----------+ ==> | | | | |
3534 283 : // | | | | | | | |
3535 283 : // | +-----------+ | | | | |
3536 283 : // | | | | | | | |
3537 283 : // | +-----------+ +--+--+--+--+
3538 283 : // |
3539 283 : // +--------------> key
3540 283 : //
3541 283 : //
3542 283 : // If one key (X) has a lot of page versions:
3543 283 : //
3544 283 : // LSN
3545 283 : // ^
3546 283 : // | (X)
3547 283 : // | +-----------+ +--+--+--+--+
3548 283 : // | | | | | | | |
3549 283 : // | +-----------+ | | +--+ |
3550 283 : // | | | | | | | |
3551 283 : // | +-----------+ ==> | | | | |
3552 283 : // | | | | | +--+ |
3553 283 : // | +-----------+ | | | | |
3554 283 : // | | | | | | | |
3555 283 : // | +-----------+ +--+--+--+--+
3556 283 : // |
3557 283 : // +--------------> key
3558 283 : // TODO: this actually divides the layers into fixed-size chunks, not
3559 283 : // based on the partitioning.
3560 283 : //
3561 283 : // TODO: we should also opportunistically materialize and
3562 283 : // garbage collect what we can.
3563 283 : let mut new_layers = Vec::new();
3564 283 : let mut prev_key: Option<Key> = None;
3565 283 : let mut writer: Option<DeltaLayerWriter> = None;
3566 283 : let mut key_values_total_size = 0u64;
3567 283 : let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
3568 283 : let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
3569 :
3570 : for &DeltaEntry {
3571 16392715 : key, lsn, ref val, ..
3572 16392994 : } in all_values_iter
3573 : {
3574 16392715 : let value = val.load(ctx).await?;
3575 16392713 : let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
3576 16392713 : // We need to check key boundaries once we reach next key or end of layer with the same key
3577 16392713 : if !same_key || lsn == dup_end_lsn {
3578 1757493 : let mut next_key_size = 0u64;
3579 1757493 : let is_dup_layer = dup_end_lsn.is_valid();
3580 1757493 : dup_start_lsn = Lsn::INVALID;
3581 1757493 : if !same_key {
3582 1757464 : dup_end_lsn = Lsn::INVALID;
3583 1757464 : }
3584 : // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size
3585 1757497 : for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() {
3586 1757497 : next_key_size = next_size;
3587 1757497 : if key != next_key {
3588 1757185 : if dup_end_lsn.is_valid() {
3589 12 : // We are writting segment with duplicates:
3590 12 : // place all remaining values of this key in separate segment
3591 12 : dup_start_lsn = dup_end_lsn; // new segments starts where old stops
3592 12 : dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range
3593 1757173 : }
3594 1757185 : break;
3595 312 : }
3596 312 : key_values_total_size += next_size;
3597 312 : // Check if it is time to split segment: if total keys size is larger than target file size.
3598 312 : // We need to avoid generation of empty segments if next_size > target_file_size.
3599 312 : if key_values_total_size > target_file_size && lsn != next_lsn {
3600 : // Split key between multiple layers: such layer can contain only single key
3601 29 : dup_start_lsn = if dup_end_lsn.is_valid() {
3602 17 : dup_end_lsn // new segment with duplicates starts where old one stops
3603 : } else {
3604 12 : lsn // start with the first LSN for this key
3605 : };
3606 29 : dup_end_lsn = next_lsn; // upper LSN boundary is exclusive
3607 29 : break;
3608 283 : }
3609 : }
3610 : // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set.
3611 1757493 : if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() {
3612 UBC 0 : dup_start_lsn = dup_end_lsn;
3613 0 : dup_end_lsn = lsn_range.end;
3614 CBC 1757493 : }
3615 1757493 : if writer.is_some() {
3616 1757210 : let written_size = writer.as_mut().unwrap().size();
3617 1757210 : let contains_hole =
3618 1757210 : next_hole < holes.len() && key >= holes[next_hole].key_range.end;
3619 : // check if key cause layer overflow or contains hole...
3620 1757210 : if is_dup_layer
3621 1757169 : || dup_end_lsn.is_valid()
3622 1757159 : || written_size + key_values_total_size > target_file_size
3623 1747160 : || contains_hole
3624 : {
3625 : // ... if so, flush previous layer and prepare to write new one
3626 10133 : new_layers.push(
3627 10133 : writer
3628 10133 : .take()
3629 10133 : .unwrap()
3630 10133 : .finish(prev_key.unwrap().next(), self)
3631 UBC 0 : .await?,
3632 : );
3633 CBC 10133 : writer = None;
3634 10133 :
3635 10133 : if contains_hole {
3636 85 : // skip hole
3637 85 : next_hole += 1;
3638 10048 : }
3639 1747077 : }
3640 283 : }
3641 : // Remember size of key value because at next iteration we will access next item
3642 1757493 : key_values_total_size = next_key_size;
3643 14635220 : }
3644 16392713 : if writer.is_none() {
3645 10416 : // Create writer if not initiaized yet
3646 10416 : writer = Some(
3647 : DeltaLayerWriter::new(
3648 10416 : self.conf,
3649 10416 : self.timeline_id,
3650 10416 : self.tenant_shard_id,
3651 10416 : key,
3652 10416 : if dup_end_lsn.is_valid() {
3653 : // this is a layer containing slice of values of the same key
3654 UBC 0 : debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn);
3655 CBC 41 : dup_start_lsn..dup_end_lsn
3656 : } else {
3657 UBC 0 : debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end);
3658 CBC 10375 : lsn_range.clone()
3659 : },
3660 : )
3661 UBC 0 : .await?,
3662 : );
3663 CBC 16382297 : }
3664 :
3665 16392713 : fail_point!("delta-layer-writer-fail-before-finish", |_| {
3666 UBC 0 : Err(CompactionError::Other(anyhow::anyhow!(
3667 0 : "failpoint delta-layer-writer-fail-before-finish"
3668 0 : )))
3669 CBC 16392713 : });
3670 :
3671 16392713 : if !self.shard_identity.is_key_disposable(&key) {
3672 16392713 : writer.as_mut().unwrap().put_value(key, lsn, value).await?;
3673 : } else {
3674 UBC 0 : debug!(
3675 0 : "Dropping key {} during compaction (it belongs on shard {:?})",
3676 0 : key,
3677 0 : self.shard_identity.get_shard_number(&key)
3678 0 : );
3679 : }
3680 :
3681 CBC 16392711 : if !new_layers.is_empty() {
3682 13626799 : fail_point!("after-timeline-compacted-first-L1");
3683 13626799 : }
3684 :
3685 16392711 : prev_key = Some(key);
3686 : }
3687 279 : if let Some(writer) = writer {
3688 279 : new_layers.push(writer.finish(prev_key.unwrap().next(), self).await?);
3689 UBC 0 : }
3690 :
3691 : // Sync layers
3692 CBC 279 : if !new_layers.is_empty() {
3693 : // Print a warning if the created layer is larger than double the target size
3694 : // Add two pages for potential overhead. This should in theory be already
3695 : // accounted for in the target calculation, but for very small targets,
3696 : // we still might easily hit the limit otherwise.
3697 279 : let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2;
3698 10385 : for layer in new_layers.iter() {
3699 10385 : if layer.layer_desc().file_size > warn_limit {
3700 UBC 0 : warn!(
3701 0 : %layer,
3702 0 : "created delta file of size {} larger than double of target of {target_file_size}", layer.layer_desc().file_size
3703 0 : );
3704 CBC 10385 : }
3705 : }
3706 :
3707 : // FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
3708 279 : let layer_paths: Vec<Utf8PathBuf> = new_layers
3709 279 : .iter()
3710 10385 : .map(|l| l.local_path().to_owned())
3711 279 : .collect();
3712 279 :
3713 279 : // Fsync all the layer files and directory using multiple threads to
3714 279 : // minimize latency.
3715 279 : par_fsync::par_fsync_async(&layer_paths)
3716 483 : .await
3717 279 : .context("fsync all new layers")?;
3718 :
3719 279 : let timeline_dir = self
3720 279 : .conf
3721 279 : .timeline_path(&self.tenant_shard_id, &self.timeline_id);
3722 279 :
3723 279 : par_fsync::par_fsync_async(&[timeline_dir])
3724 306 : .await
3725 279 : .context("fsync of timeline dir")?;
3726 UBC 0 : }
3727 :
3728 CBC 279 : stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();
3729 279 : stats.new_deltas_count = Some(new_layers.len());
3730 10385 : stats.new_deltas_size = Some(new_layers.iter().map(|l| l.layer_desc().file_size).sum());
3731 279 :
3732 279 : match TryInto::<CompactLevel0Phase1Stats>::try_into(stats)
3733 279 : .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string"))
3734 : {
3735 279 : Ok(stats_json) => {
3736 279 : info!(
3737 279 : stats_json = stats_json.as_str(),
3738 279 : "compact_level0_phase1 stats available"
3739 279 : )
3740 : }
3741 UBC 0 : Err(e) => {
3742 0 : warn!("compact_level0_phase1 stats failed to serialize: {:#}", e);
3743 : }
3744 : }
3745 :
3746 CBC 279 : Ok(CompactLevel0Phase1Result {
3747 279 : new_layers,
3748 279 : deltas_to_compact: deltas_to_compact
3749 279 : .into_iter()
3750 3603 : .map(|x| x.drop_eviction_guard())
3751 279 : .collect::<Vec<_>>(),
3752 279 : })
3753 1368 : }
3754 :
3755 : ///
3756 : /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
3757 : /// as Level 1 files.
3758 : ///
3759 1372 : async fn compact_level0(
3760 1372 : self: &Arc<Self>,
3761 1372 : target_file_size: u64,
3762 1372 : ctx: &RequestContext,
3763 1372 : ) -> Result<(), CompactionError> {
3764 : let CompactLevel0Phase1Result {
3765 1368 : new_layers,
3766 1368 : deltas_to_compact,
3767 : } = {
3768 1372 : let phase1_span = info_span!("compact_level0_phase1");
3769 1372 : let ctx = ctx.attached_child();
3770 1372 : let mut stats = CompactLevel0Phase1StatsBuilder {
3771 1372 : version: Some(2),
3772 1372 : tenant_id: Some(self.tenant_shard_id),
3773 1372 : timeline_id: Some(self.timeline_id),
3774 1372 : ..Default::default()
3775 1372 : };
3776 1372 :
3777 1372 : let begin = tokio::time::Instant::now();
3778 1372 : let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
3779 1372 : let now = tokio::time::Instant::now();
3780 1372 : stats.read_lock_acquisition_micros =
3781 1372 : DurationRecorder::Recorded(RecordedDuration(now - begin), now);
3782 1372 : self.compact_level0_phase1(phase1_layers_locked, stats, target_file_size, &ctx)
3783 1372 : .instrument(phase1_span)
3784 269983 : .await?
3785 : };
3786 :
3787 1368 : if new_layers.is_empty() && deltas_to_compact.is_empty() {
3788 : // nothing to do
3789 1086 : return Ok(());
3790 282 : }
3791 :
3792 282 : let mut guard = self.layers.write().await;
3793 :
3794 282 : let mut duplicated_layers = HashSet::new();
3795 282 :
3796 282 : let mut insert_layers = Vec::with_capacity(new_layers.len());
3797 :
3798 10707 : for l in &new_layers {
3799 10425 : if guard.contains(l.as_ref()) {
3800 : // expected in tests
3801 40 : tracing::error!(layer=%l, "duplicated L1 layer");
3802 :
3803 : // good ways to cause a duplicate: we repeatedly error after taking the writelock
3804 : // `guard` on self.layers. as of writing this, there are no error returns except
3805 : // for compact_level0_phase1 creating an L0, which does not happen in practice
3806 : // because we have not implemented L0 => L0 compaction.
3807 40 : duplicated_layers.insert(l.layer_desc().key());
3808 10385 : } else if LayerMap::is_l0(l.layer_desc()) {
3809 UBC 0 : return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
3810 CBC 10385 : } else {
3811 10385 : insert_layers.push(l.clone());
3812 10385 : }
3813 : }
3814 :
3815 282 : let remove_layers = {
3816 282 : let mut deltas_to_compact = deltas_to_compact;
3817 282 : // only remove those inputs which were not outputs
3818 3643 : deltas_to_compact.retain(|l| !duplicated_layers.contains(&l.layer_desc().key()));
3819 282 : deltas_to_compact
3820 282 : };
3821 282 :
3822 282 : // deletion will happen later, the layer file manager calls garbage_collect_on_drop
3823 282 : guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
3824 :
3825 282 : if let Some(remote_client) = self.remote_client.as_ref() {
3826 282 : remote_client.schedule_compaction_update(&remove_layers, &new_layers)?;
3827 UBC 0 : }
3828 :
3829 CBC 282 : drop_wlock(guard);
3830 282 :
3831 282 : Ok(())
3832 1368 : }
3833 :
3834 : /// Update information about which layer files need to be retained on
3835 : /// garbage collection. This is separate from actually performing the GC,
3836 : /// and is updated more frequently, so that compaction can remove obsolete
3837 : /// page versions more aggressively.
3838 : ///
3839 : /// TODO: that's wishful thinking, compaction doesn't actually do that
3840 : /// currently.
3841 : ///
3842 : /// The caller specifies how much history is needed with the 3 arguments:
3843 : ///
3844 : /// retain_lsns: keep a version of each page at these LSNs
3845 : /// cutoff_horizon: also keep everything newer than this LSN
3846 : /// pitr: the time duration required to keep data for PITR
3847 : ///
3848 : /// The 'retain_lsns' list is currently used to prevent removing files that
3849 : /// are needed by child timelines. In the future, the user might be able to
3850 : /// name additional points in time to retain. The caller is responsible for
3851 : /// collecting that information.
3852 : ///
3853 : /// The 'cutoff_horizon' point is used to retain recent versions that might still be
3854 : /// needed by read-only nodes. (As of this writing, the caller just passes
3855 : /// the latest LSN subtracted by a constant, and doesn't do anything smart
3856 : /// to figure out what read-only nodes might actually need.)
3857 : ///
3858 : /// The 'pitr' duration is used to calculate a 'pitr_cutoff', which can be used to determine
3859 : /// whether a record is needed for PITR.
3860 : ///
3861 : /// NOTE: This function holds a short-lived lock to protect the 'gc_info'
3862 : /// field, so that the three values passed as argument are stored
3863 : /// atomically. But the caller is responsible for ensuring that no new
3864 : /// branches are created that would need to be included in 'retain_lsns',
3865 : /// for example. The caller should hold `Tenant::gc_cs` lock to ensure
3866 : /// that.
3867 : ///
3868 UBC 0 : #[instrument(skip_all, fields(timeline_id=%self.timeline_id))]
3869 : pub(super) async fn update_gc_info(
3870 : &self,
3871 : retain_lsns: Vec<Lsn>,
3872 : cutoff_horizon: Lsn,
3873 : pitr: Duration,
3874 : cancel: &CancellationToken,
3875 : ctx: &RequestContext,
3876 : ) -> anyhow::Result<()> {
3877 : // First, calculate pitr_cutoff_timestamp and then convert it to LSN.
3878 : //
3879 : // Some unit tests depend on garbage-collection working even when
3880 : // CLOG data is missing, so that find_lsn_for_timestamp() doesn't
3881 : // work, so avoid calling it altogether if time-based retention is not
3882 : // configured. It would be pointless anyway.
3883 : let pitr_cutoff = if pitr != Duration::ZERO {
3884 : let now = SystemTime::now();
3885 : if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) {
3886 : let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp);
3887 :
3888 : match self
3889 : .find_lsn_for_timestamp(pitr_timestamp, cancel, ctx)
3890 : .await?
3891 : {
3892 : LsnForTimestamp::Present(lsn) => lsn,
3893 : LsnForTimestamp::Future(lsn) => {
3894 : // The timestamp is in the future. That sounds impossible,
3895 : // but what it really means is that there hasn't been
3896 : // any commits since the cutoff timestamp.
3897 0 : debug!("future({})", lsn);
3898 : cutoff_horizon
3899 : }
3900 : LsnForTimestamp::Past(lsn) => {
3901 0 : debug!("past({})", lsn);
3902 : // conservative, safe default is to remove nothing, when we
3903 : // have no commit timestamp data available
3904 : *self.get_latest_gc_cutoff_lsn()
3905 : }
3906 : LsnForTimestamp::NoData(lsn) => {
3907 0 : debug!("nodata({})", lsn);
3908 : // conservative, safe default is to remove nothing, when we
3909 : // have no commit timestamp data available
3910 : *self.get_latest_gc_cutoff_lsn()
3911 : }
3912 : }
3913 : } else {
3914 : // If we don't have enough data to convert to LSN,
3915 : // play safe and don't remove any layers.
3916 : *self.get_latest_gc_cutoff_lsn()
3917 : }
3918 : } else {
3919 : // No time-based retention was configured. Set time-based cutoff to
3920 : // same as LSN based.
3921 : cutoff_horizon
3922 : };
3923 :
3924 : // Grab the lock and update the values
3925 : *self.gc_info.write().unwrap() = GcInfo {
3926 : retain_lsns,
3927 : horizon_cutoff: cutoff_horizon,
3928 : pitr_cutoff,
3929 : };
3930 :
3931 : Ok(())
3932 : }
3933 :
3934 : /// Garbage collect layer files on a timeline that are no longer needed.
3935 : ///
3936 : /// Currently, we don't make any attempt at removing unneeded page versions
3937 : /// within a layer file. We can only remove the whole file if it's fully
3938 : /// obsolete.
3939 CBC 646 : pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
3940 646 : // this is most likely the background tasks, but it might be the spawned task from
3941 646 : // immediate_gc
3942 646 : let cancel = crate::task_mgr::shutdown_token();
3943 646 : let _g = tokio::select! {
3944 646 : guard = self.gc_lock.lock() => guard,
3945 : _ = self.cancel.cancelled() => return Ok(GcResult::default()),
3946 : _ = cancel.cancelled() => return Ok(GcResult::default()),
3947 : };
3948 646 : let timer = self.metrics.garbage_collect_histo.start_timer();
3949 :
3950 UBC 0 : fail_point!("before-timeline-gc");
3951 :
3952 : // Is the timeline being deleted?
3953 CBC 646 : if self.is_stopping() {
3954 UBC 0 : anyhow::bail!("timeline is Stopping");
3955 CBC 646 : }
3956 646 :
3957 646 : let (horizon_cutoff, pitr_cutoff, retain_lsns) = {
3958 646 : let gc_info = self.gc_info.read().unwrap();
3959 646 :
3960 646 : let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn());
3961 646 : let pitr_cutoff = gc_info.pitr_cutoff;
3962 646 : let retain_lsns = gc_info.retain_lsns.clone();
3963 646 : (horizon_cutoff, pitr_cutoff, retain_lsns)
3964 646 : };
3965 646 :
3966 646 : let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
3967 :
3968 646 : let res = self
3969 646 : .gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
3970 646 : .instrument(
3971 646 : info_span!("gc_timeline", timeline_id = %self.timeline_id, cutoff = %new_gc_cutoff),
3972 : )
3973 178 : .await?;
3974 :
3975 : // only record successes
3976 641 : timer.stop_and_record();
3977 641 :
3978 641 : Ok(res)
3979 641 : }
3980 :
3981 646 : async fn gc_timeline(
3982 646 : &self,
3983 646 : horizon_cutoff: Lsn,
3984 646 : pitr_cutoff: Lsn,
3985 646 : retain_lsns: Vec<Lsn>,
3986 646 : new_gc_cutoff: Lsn,
3987 646 : ) -> anyhow::Result<GcResult> {
3988 646 : let now = SystemTime::now();
3989 646 : let mut result: GcResult = GcResult::default();
3990 646 :
3991 646 : // Nothing to GC. Return early.
3992 646 : let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn();
3993 646 : if latest_gc_cutoff >= new_gc_cutoff {
3994 104 : info!(
3995 104 : "Nothing to GC: new_gc_cutoff_lsn {new_gc_cutoff}, latest_gc_cutoff_lsn {latest_gc_cutoff}",
3996 104 : );
3997 104 : return Ok(result);
3998 542 : }
3999 :
4000 : // We need to ensure that no one tries to read page versions or create
4001 : // branches at a point before latest_gc_cutoff_lsn. See branch_timeline()
4002 : // for details. This will block until the old value is no longer in use.
4003 : //
4004 : // The GC cutoff should only ever move forwards.
4005 542 : let waitlist = {
4006 542 : let write_guard = self.latest_gc_cutoff_lsn.lock_for_write();
4007 542 : ensure!(
4008 542 : *write_guard <= new_gc_cutoff,
4009 UBC 0 : "Cannot move GC cutoff LSN backwards (was {}, new {})",
4010 0 : *write_guard,
4011 : new_gc_cutoff
4012 : );
4013 CBC 542 : write_guard.store_and_unlock(new_gc_cutoff)
4014 542 : };
4015 542 : waitlist.wait().await;
4016 :
4017 542 : info!("GC starting");
4018 :
4019 UBC 0 : debug!("retain_lsns: {:?}", retain_lsns);
4020 :
4021 CBC 542 : let mut layers_to_remove = Vec::new();
4022 542 : let mut wanted_image_layers = KeySpaceRandomAccum::default();
4023 :
4024 : // Scan all layers in the timeline (remote or on-disk).
4025 : //
4026 : // Garbage collect the layer if all conditions are satisfied:
4027 : // 1. it is older than cutoff LSN;
4028 : // 2. it is older than PITR interval;
4029 : // 3. it doesn't need to be retained for 'retain_lsns';
4030 : // 4. newer on-disk image layers cover the layer's whole key range
4031 : //
4032 : // TODO holding a write lock is too agressive and avoidable
4033 542 : let mut guard = self.layers.write().await;
4034 542 : let layers = guard.layer_map();
4035 16763 : 'outer: for l in layers.iter_historic_layers() {
4036 16763 : result.layers_total += 1;
4037 16763 :
4038 16763 : // 1. Is it newer than GC horizon cutoff point?
4039 16763 : if l.get_lsn_range().end > horizon_cutoff {
4040 UBC 0 : debug!(
4041 0 : "keeping {} because it's newer than horizon_cutoff {}",
4042 0 : l.filename(),
4043 0 : horizon_cutoff,
4044 0 : );
4045 CBC 1770 : result.layers_needed_by_cutoff += 1;
4046 1770 : continue 'outer;
4047 14993 : }
4048 14993 :
4049 14993 : // 2. It is newer than PiTR cutoff point?
4050 14993 : if l.get_lsn_range().end > pitr_cutoff {
4051 UBC 0 : debug!(
4052 0 : "keeping {} because it's newer than pitr_cutoff {}",
4053 0 : l.filename(),
4054 0 : pitr_cutoff,
4055 0 : );
4056 CBC 47 : result.layers_needed_by_pitr += 1;
4057 47 : continue 'outer;
4058 14946 : }
4059 :
4060 : // 3. Is it needed by a child branch?
4061 : // NOTE With that we would keep data that
4062 : // might be referenced by child branches forever.
4063 : // We can track this in child timeline GC and delete parent layers when
4064 : // they are no longer needed. This might be complicated with long inheritance chains.
4065 : //
4066 : // TODO Vec is not a great choice for `retain_lsns`
4067 15090 : for retain_lsn in &retain_lsns {
4068 : // start_lsn is inclusive
4069 557 : if &l.get_lsn_range().start <= retain_lsn {
4070 UBC 0 : debug!(
4071 0 : "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}",
4072 0 : l.filename(),
4073 0 : retain_lsn,
4074 0 : l.is_incremental(),
4075 0 : );
4076 CBC 413 : result.layers_needed_by_branches += 1;
4077 413 : continue 'outer;
4078 144 : }
4079 : }
4080 :
4081 : // 4. Is there a later on-disk layer for this relation?
4082 : //
4083 : // The end-LSN is exclusive, while disk_consistent_lsn is
4084 : // inclusive. For example, if disk_consistent_lsn is 100, it is
4085 : // OK for a delta layer to have end LSN 101, but if the end LSN
4086 : // is 102, then it might not have been fully flushed to disk
4087 : // before crash.
4088 : //
4089 : // For example, imagine that the following layers exist:
4090 : //
4091 : // 1000 - image (A)
4092 : // 1000-2000 - delta (B)
4093 : // 2000 - image (C)
4094 : // 2000-3000 - delta (D)
4095 : // 3000 - image (E)
4096 : //
4097 : // If GC horizon is at 2500, we can remove layers A and B, but
4098 : // we cannot remove C, even though it's older than 2500, because
4099 : // the delta layer 2000-3000 depends on it.
4100 14533 : if !layers
4101 14533 : .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))?
4102 : {
4103 UBC 0 : debug!("keeping {} because it is the latest layer", l.filename());
4104 : // Collect delta key ranges that need image layers to allow garbage
4105 : // collecting the layers.
4106 : // It is not so obvious whether we need to propagate information only about
4107 : // delta layers. Image layers can form "stairs" preventing old image from been deleted.
4108 : // But image layers are in any case less sparse than delta layers. Also we need some
4109 : // protection from replacing recent image layers with new one after each GC iteration.
4110 CBC 13131 : if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
4111 UBC 0 : wanted_image_layers.add_range(l.get_key_range());
4112 CBC 13131 : }
4113 13131 : result.layers_not_updated += 1;
4114 13131 : continue 'outer;
4115 1402 : }
4116 :
4117 : // We didn't find any reason to keep this file, so remove it.
4118 UBC 0 : debug!(
4119 0 : "garbage collecting {} is_dropped: xx is_incremental: {}",
4120 0 : l.filename(),
4121 0 : l.is_incremental(),
4122 0 : );
4123 CBC 1402 : layers_to_remove.push(l);
4124 : }
4125 542 : self.wanted_image_layers
4126 542 : .lock()
4127 542 : .unwrap()
4128 542 : .replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
4129 542 :
4130 542 : if !layers_to_remove.is_empty() {
4131 : // Persist the new GC cutoff value in the metadata file, before
4132 : // we actually remove anything.
4133 : //
4134 : // This does not in fact have any effect as we no longer consider local metadata unless
4135 : // running without remote storage.
4136 : //
4137 : // This unconditionally schedules also an index_part.json update, even though, we will
4138 : // be doing one a bit later with the unlinked gc'd layers.
4139 : //
4140 : // TODO: remove when implementing <https://github.com/neondatabase/neon/issues/4099>.
4141 24 : self.update_metadata_file(self.disk_consistent_lsn.load(), None)
4142 UBC 0 : .await?;
4143 :
4144 CBC 24 : let gc_layers = layers_to_remove
4145 24 : .iter()
4146 1402 : .map(|x| guard.get_from_desc(x))
4147 24 : .collect::<Vec<Layer>>();
4148 24 :
4149 24 : result.layers_removed = gc_layers.len() as u64;
4150 :
4151 24 : if let Some(remote_client) = self.remote_client.as_ref() {
4152 24 : remote_client.schedule_gc_update(&gc_layers)?;
4153 UBC 0 : }
4154 :
4155 CBC 24 : guard.finish_gc_timeline(&gc_layers);
4156 24 :
4157 24 : if result.layers_removed != 0 {
4158 24 : fail_point!("after-timeline-gc-removed-layers");
4159 24 : }
4160 :
4161 : #[cfg(feature = "testing")]
4162 24 : {
4163 24 : result.doomed_layers = gc_layers;
4164 24 : }
4165 518 : }
4166 :
4167 537 : info!(
4168 537 : "GC completed removing {} layers, cutoff {}",
4169 537 : result.layers_removed, new_gc_cutoff
4170 537 : );
4171 :
4172 537 : result.elapsed = now.elapsed()?;
4173 537 : Ok(result)
4174 641 : }
4175 :
4176 : /// Reconstruct a value, using the given base image and WAL records in 'data'.
4177 5861458 : async fn reconstruct_value(
4178 5861458 : &self,
4179 5861458 : key: Key,
4180 5861458 : request_lsn: Lsn,
4181 5861458 : mut data: ValueReconstructState,
4182 5861458 : ) -> Result<Bytes, PageReconstructError> {
4183 5861444 : // Perform WAL redo if needed
4184 5861444 : data.records.reverse();
4185 5861444 :
4186 5861444 : // If we have a page image, and no WAL, we're all set
4187 5861444 : if data.records.is_empty() {
4188 3801226 : if let Some((img_lsn, img)) = &data.img {
4189 UBC 0 : trace!(
4190 0 : "found page image for key {} at {}, no WAL redo required, req LSN {}",
4191 0 : key,
4192 0 : img_lsn,
4193 0 : request_lsn,
4194 0 : );
4195 CBC 3801226 : Ok(img.clone())
4196 : } else {
4197 UBC 0 : Err(PageReconstructError::from(anyhow!(
4198 0 : "base image for {key} at {request_lsn} not found"
4199 0 : )))
4200 : }
4201 : } else {
4202 : // We need to do WAL redo.
4203 : //
4204 : // If we don't have a base image, then the oldest WAL record better initialize
4205 : // the page
4206 CBC 2060218 : if data.img.is_none() && !data.records.first().unwrap().1.will_init() {
4207 UBC 0 : Err(PageReconstructError::from(anyhow!(
4208 0 : "Base image for {} at {} not found, but got {} WAL records",
4209 0 : key,
4210 0 : request_lsn,
4211 0 : data.records.len()
4212 0 : )))
4213 : } else {
4214 CBC 2060218 : if data.img.is_some() {
4215 UBC 0 : trace!(
4216 0 : "found {} WAL records and a base image for {} at {}, performing WAL redo",
4217 0 : data.records.len(),
4218 0 : key,
4219 0 : request_lsn
4220 0 : );
4221 : } else {
4222 0 : trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
4223 : };
4224 :
4225 CBC 2060218 : let last_rec_lsn = data.records.last().unwrap().0;
4226 :
4227 2060218 : let img = match self
4228 2060218 : .walredo_mgr
4229 2060218 : .request_redo(key, request_lsn, data.img, data.records, self.pg_version)
4230 UBC 0 : .await
4231 CBC 2060218 : .context("Failed to reconstruct a page image:")
4232 : {
4233 2060218 : Ok(img) => img,
4234 UBC 0 : Err(e) => return Err(PageReconstructError::WalRedo(e)),
4235 : };
4236 :
4237 CBC 2060218 : if img.len() == page_cache::PAGE_SZ {
4238 2056879 : let cache = page_cache::get();
4239 2056879 : if let Err(e) = cache
4240 2056879 : .memorize_materialized_page(
4241 2056879 : self.tenant_shard_id,
4242 2056879 : self.timeline_id,
4243 2056879 : key,
4244 2056879 : last_rec_lsn,
4245 2056879 : &img,
4246 2056879 : )
4247 9032 : .await
4248 2056879 : .context("Materialized page memoization failed")
4249 : {
4250 UBC 0 : return Err(PageReconstructError::from(e));
4251 CBC 2056879 : }
4252 3339 : }
4253 :
4254 2060218 : Ok(img)
4255 : }
4256 : }
4257 5861444 : }
4258 :
4259 2 : pub(crate) async fn spawn_download_all_remote_layers(
4260 2 : self: Arc<Self>,
4261 2 : request: DownloadRemoteLayersTaskSpawnRequest,
4262 2 : ) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
4263 2 : use pageserver_api::models::DownloadRemoteLayersTaskState;
4264 2 :
4265 2 : // this is not really needed anymore; it has tests which really check the return value from
4266 2 : // http api. it would be better not to maintain this anymore.
4267 2 :
4268 2 : let mut status_guard = self.download_all_remote_layers_task_info.write().unwrap();
4269 2 : if let Some(st) = &*status_guard {
4270 1 : match &st.state {
4271 : DownloadRemoteLayersTaskState::Running => {
4272 UBC 0 : return Err(st.clone());
4273 : }
4274 : DownloadRemoteLayersTaskState::ShutDown
4275 CBC 1 : | DownloadRemoteLayersTaskState::Completed => {
4276 1 : *status_guard = None;
4277 1 : }
4278 : }
4279 1 : }
4280 :
4281 2 : let self_clone = Arc::clone(&self);
4282 2 : let task_id = task_mgr::spawn(
4283 2 : task_mgr::BACKGROUND_RUNTIME.handle(),
4284 2 : task_mgr::TaskKind::DownloadAllRemoteLayers,
4285 2 : Some(self.tenant_shard_id),
4286 2 : Some(self.timeline_id),
4287 2 : "download all remote layers task",
4288 : false,
4289 2 : async move {
4290 10 : self_clone.download_all_remote_layers(request).await;
4291 2 : let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
4292 2 : match &mut *status_guard {
4293 : None => {
4294 UBC 0 : warn!("tasks status is supposed to be Some(), since we are running");
4295 : }
4296 CBC 2 : Some(st) => {
4297 2 : let exp_task_id = format!("{}", task_mgr::current_task_id().unwrap());
4298 2 : if st.task_id != exp_task_id {
4299 UBC 0 : warn!("task id changed while we were still running, expecting {} but have {}", exp_task_id, st.task_id);
4300 CBC 2 : } else {
4301 2 : st.state = DownloadRemoteLayersTaskState::Completed;
4302 2 : }
4303 : }
4304 : };
4305 2 : Ok(())
4306 2 : }
4307 2 : .instrument(info_span!(parent: None, "download_all_remote_layers", tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))
4308 : );
4309 :
4310 2 : let initial_info = DownloadRemoteLayersTaskInfo {
4311 2 : task_id: format!("{task_id}"),
4312 2 : state: DownloadRemoteLayersTaskState::Running,
4313 2 : total_layer_count: 0,
4314 2 : successful_download_count: 0,
4315 2 : failed_download_count: 0,
4316 2 : };
4317 2 : *status_guard = Some(initial_info.clone());
4318 2 :
4319 2 : Ok(initial_info)
4320 2 : }
4321 :
4322 2 : async fn download_all_remote_layers(
4323 2 : self: &Arc<Self>,
4324 2 : request: DownloadRemoteLayersTaskSpawnRequest,
4325 2 : ) {
4326 : use pageserver_api::models::DownloadRemoteLayersTaskState;
4327 :
4328 2 : let remaining = {
4329 2 : let guard = self.layers.read().await;
4330 2 : guard
4331 2 : .layer_map()
4332 2 : .iter_historic_layers()
4333 10 : .map(|desc| guard.get_from_desc(&desc))
4334 2 : .collect::<Vec<_>>()
4335 2 : };
4336 2 : let total_layer_count = remaining.len();
4337 2 :
4338 2 : macro_rules! lock_status {
4339 2 : ($st:ident) => {
4340 2 : let mut st = self.download_all_remote_layers_task_info.write().unwrap();
4341 2 : let st = st
4342 2 : .as_mut()
4343 2 : .expect("this function is only called after the task has been spawned");
4344 2 : assert_eq!(
4345 2 : st.task_id,
4346 2 : format!(
4347 2 : "{}",
4348 2 : task_mgr::current_task_id().expect("we run inside a task_mgr task")
4349 2 : )
4350 2 : );
4351 2 : let $st = st;
4352 2 : };
4353 2 : }
4354 2 :
4355 2 : {
4356 2 : lock_status!(st);
4357 2 : st.total_layer_count = total_layer_count as u64;
4358 2 : }
4359 2 :
4360 2 : let mut remaining = remaining.into_iter();
4361 2 : let mut have_remaining = true;
4362 2 : let mut js = tokio::task::JoinSet::new();
4363 2 :
4364 2 : let cancel = task_mgr::shutdown_token();
4365 2 :
4366 2 : let limit = request.max_concurrent_downloads;
4367 :
4368 : loop {
4369 12 : while js.len() < limit.get() && have_remaining && !cancel.is_cancelled() {
4370 12 : let Some(next) = remaining.next() else {
4371 2 : have_remaining = false;
4372 2 : break;
4373 : };
4374 :
4375 10 : let span = tracing::info_span!("download", layer = %next);
4376 :
4377 10 : js.spawn(
4378 10 : async move {
4379 26 : let res = next.download().await;
4380 10 : (next, res)
4381 10 : }
4382 10 : .instrument(span),
4383 10 : );
4384 : }
4385 :
4386 12 : while let Some(res) = js.join_next().await {
4387 10 : match res {
4388 : Ok((_, Ok(_))) => {
4389 5 : lock_status!(st);
4390 5 : st.successful_download_count += 1;
4391 : }
4392 5 : Ok((layer, Err(e))) => {
4393 5 : tracing::error!(%layer, "download failed: {e:#}");
4394 5 : lock_status!(st);
4395 5 : st.failed_download_count += 1;
4396 : }
4397 UBC 0 : Err(je) if je.is_cancelled() => unreachable!("not used here"),
4398 0 : Err(je) if je.is_panic() => {
4399 0 : lock_status!(st);
4400 0 : st.failed_download_count += 1;
4401 : }
4402 0 : Err(je) => tracing::warn!("unknown joinerror: {je:?}"),
4403 : }
4404 : }
4405 :
4406 CBC 2 : if js.is_empty() && (!have_remaining || cancel.is_cancelled()) {
4407 2 : break;
4408 UBC 0 : }
4409 : }
4410 :
4411 : {
4412 CBC 2 : lock_status!(st);
4413 2 : st.state = DownloadRemoteLayersTaskState::Completed;
4414 2 : }
4415 2 : }
4416 :
4417 57 : pub fn get_download_all_remote_layers_task_info(&self) -> Option<DownloadRemoteLayersTaskInfo> {
4418 57 : self.download_all_remote_layers_task_info
4419 57 : .read()
4420 57 : .unwrap()
4421 57 : .clone()
4422 57 : }
4423 : }
4424 :
4425 : pub(crate) struct DiskUsageEvictionInfo {
4426 : /// Timeline's largest layer (remote or resident)
4427 : pub max_layer_size: Option<u64>,
4428 : /// Timeline's resident layers
4429 : pub resident_layers: Vec<LocalLayerInfoForDiskUsageEviction>,
4430 : }
4431 :
4432 : pub(crate) struct LocalLayerInfoForDiskUsageEviction {
4433 : pub layer: Layer,
4434 : pub last_activity_ts: SystemTime,
4435 : }
4436 :
4437 : impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction {
4438 UBC 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4439 0 : // format the tv_sec, tv_nsec into rfc3339 in case someone is looking at it
4440 0 : // having to allocate a string to this is bad, but it will rarely be formatted
4441 0 : let ts = chrono::DateTime::<chrono::Utc>::from(self.last_activity_ts);
4442 0 : let ts = ts.to_rfc3339_opts(chrono::SecondsFormat::Nanos, true);
4443 0 : struct DisplayIsDebug<'a, T>(&'a T);
4444 0 : impl<'a, T: std::fmt::Display> std::fmt::Debug for DisplayIsDebug<'a, T> {
4445 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4446 0 : write!(f, "{}", self.0)
4447 0 : }
4448 0 : }
4449 0 : f.debug_struct("LocalLayerInfoForDiskUsageEviction")
4450 0 : .field("layer", &DisplayIsDebug(&self.layer))
4451 0 : .field("last_activity", &ts)
4452 0 : .finish()
4453 0 : }
4454 : }
4455 :
4456 : impl LocalLayerInfoForDiskUsageEviction {
4457 CBC 440 : pub fn file_size(&self) -> u64 {
4458 440 : self.layer.layer_desc().file_size
4459 440 : }
4460 : }
4461 :
4462 : impl Timeline {
4463 : /// Returns non-remote layers for eviction.
4464 29 : pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
4465 29 : let guard = self.layers.read().await;
4466 29 : let layers = guard.layer_map();
4467 29 :
4468 29 : let mut max_layer_size: Option<u64> = None;
4469 29 : let mut resident_layers = Vec::new();
4470 :
4471 2894 : for l in layers.iter_historic_layers() {
4472 2894 : let file_size = l.file_size();
4473 2894 : max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
4474 2894 :
4475 2894 : let l = guard.get_from_desc(&l);
4476 :
4477 2894 : let l = match l.keep_resident().await {
4478 2893 : Ok(Some(l)) => l,
4479 1 : Ok(None) => continue,
4480 UBC 0 : Err(e) => {
4481 : // these should not happen, but we cannot make them statically impossible right
4482 : // now.
4483 0 : tracing::warn!(layer=%l, "failed to keep the layer resident: {e:#}");
4484 0 : continue;
4485 : }
4486 : };
4487 :
4488 CBC 2893 : let last_activity_ts = l.access_stats().latest_activity().unwrap_or_else(|| {
4489 UBC 0 : // We only use this fallback if there's an implementation error.
4490 0 : // `latest_activity` already does rate-limited warn!() log.
4491 0 : debug!(layer=%l, "last_activity returns None, using SystemTime::now");
4492 0 : SystemTime::now()
4493 CBC 2893 : });
4494 2893 :
4495 2893 : resident_layers.push(LocalLayerInfoForDiskUsageEviction {
4496 2893 : layer: l.drop_eviction_guard(),
4497 2893 : last_activity_ts,
4498 2893 : });
4499 : }
4500 :
4501 29 : DiskUsageEvictionInfo {
4502 29 : max_layer_size,
4503 29 : resident_layers,
4504 29 : }
4505 29 : }
4506 :
4507 20805 : pub(crate) fn get_shard_index(&self) -> ShardIndex {
4508 20805 : ShardIndex {
4509 20805 : shard_number: self.tenant_shard_id.shard_number,
4510 20805 : shard_count: self.tenant_shard_id.shard_count,
4511 20805 : }
4512 20805 : }
4513 : }
4514 :
4515 : type TraversalPathItem = (
4516 : ValueReconstructResult,
4517 : Lsn,
4518 : Box<dyn Send + FnOnce() -> TraversalId>,
4519 : );
4520 :
4521 : /// Helper function for get_reconstruct_data() to add the path of layers traversed
4522 : /// to an error, as anyhow context information.
4523 1149 : fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructError {
4524 1149 : // We want the original 'msg' to be the outermost context. The outermost context
4525 1149 : // is the most high-level information, which also gets propagated to the client.
4526 1149 : let mut msg_iter = path
4527 1149 : .into_iter()
4528 1217 : .map(|(r, c, l)| {
4529 1217 : format!(
4530 1217 : "layer traversal: result {:?}, cont_lsn {}, layer: {}",
4531 1217 : r,
4532 1217 : c,
4533 1217 : l(),
4534 1217 : )
4535 1217 : })
4536 1149 : .chain(std::iter::once(msg));
4537 1149 : // Construct initial message from the first traversed layer
4538 1149 : let err = anyhow!(msg_iter.next().unwrap());
4539 1149 :
4540 1149 : // Append all subsequent traversals, and the error message 'msg', as contexts.
4541 1217 : let msg = msg_iter.fold(err, |err, msg| err.context(msg));
4542 1149 : PageReconstructError::from(msg)
4543 1149 : }
4544 :
4545 : /// Various functions to mutate the timeline.
4546 : // TODO Currently, Deref is used to allow easy access to read methods from this trait.
4547 : // This is probably considered a bad practice in Rust and should be fixed eventually,
4548 : // but will cause large code changes.
4549 : pub struct TimelineWriter<'a> {
4550 : tl: &'a Timeline,
4551 : _write_guard: tokio::sync::MutexGuard<'a, ()>,
4552 : }
4553 :
4554 : impl Deref for TimelineWriter<'_> {
4555 : type Target = Timeline;
4556 :
4557 UBC 0 : fn deref(&self) -> &Self::Target {
4558 0 : self.tl
4559 0 : }
4560 : }
4561 :
4562 : impl<'a> TimelineWriter<'a> {
4563 : /// Put a new page version that can be constructed from a WAL record
4564 : ///
4565 : /// This will implicitly extend the relation, if the page is beyond the
4566 : /// current end-of-file.
4567 CBC 607051 : pub async fn put(
4568 607051 : &self,
4569 607051 : key: Key,
4570 607051 : lsn: Lsn,
4571 607051 : value: &Value,
4572 607051 : ctx: &RequestContext,
4573 607051 : ) -> anyhow::Result<()> {
4574 607051 : self.tl.put_value(key, lsn, value, ctx).await
4575 607051 : }
4576 :
4577 1168370 : pub(crate) async fn put_batch(
4578 1168370 : &self,
4579 1168370 : batch: &HashMap<Key, Vec<(Lsn, Value)>>,
4580 1168370 : ctx: &RequestContext,
4581 1168370 : ) -> anyhow::Result<()> {
4582 1168369 : self.tl.put_values(batch, ctx).await
4583 1168369 : }
4584 :
4585 6569 : pub(crate) async fn delete_batch(&self, batch: &[(Range<Key>, Lsn)]) -> anyhow::Result<()> {
4586 6569 : self.tl.put_tombstones(batch).await
4587 6569 : }
4588 :
4589 : /// Track the end of the latest digested WAL record.
4590 : /// Remember the (end of) last valid WAL record remembered in the timeline.
4591 : ///
4592 : /// Call this after you have finished writing all the WAL up to 'lsn'.
4593 : ///
4594 : /// 'lsn' must be aligned. This wakes up any wait_lsn() callers waiting for
4595 : /// the 'lsn' or anything older. The previous last record LSN is stored alongside
4596 : /// the latest and can be read.
4597 49362591 : pub(crate) fn finish_write(&self, new_lsn: Lsn) {
4598 49362591 : self.tl.finish_write(new_lsn);
4599 49362591 : }
4600 :
4601 427894 : pub(crate) fn update_current_logical_size(&self, delta: i64) {
4602 427894 : self.tl.update_current_logical_size(delta)
4603 427894 : }
4604 : }
4605 :
4606 : // We need TimelineWriter to be send in upcoming conversion of
4607 : // Timeline::layers to tokio::sync::RwLock.
4608 1 : #[test]
4609 1 : fn is_send() {
4610 1 : fn _assert_send<T: Send>() {}
4611 1 : _assert_send::<TimelineWriter<'_>>();
4612 1 : }
4613 :
4614 : /// Add a suffix to a layer file's name: .{num}.old
4615 : /// Uses the first available num (starts at 0)
4616 1 : fn rename_to_backup(path: &Utf8Path) -> anyhow::Result<()> {
4617 1 : let filename = path
4618 1 : .file_name()
4619 1 : .ok_or_else(|| anyhow!("Path {path} don't have a file name"))?;
4620 1 : let mut new_path = path.to_owned();
4621 :
4622 1 : for i in 0u32.. {
4623 1 : new_path.set_file_name(format!("{filename}.{i}.old"));
4624 1 : if !new_path.exists() {
4625 1 : std::fs::rename(path, &new_path)
4626 1 : .with_context(|| format!("rename {path:?} to {new_path:?}"))?;
4627 1 : return Ok(());
4628 UBC 0 : }
4629 : }
4630 :
4631 0 : bail!("couldn't find an unused backup number for {:?}", path)
4632 CBC 1 : }
4633 :
4634 : #[cfg(test)]
4635 : mod tests {
4636 : use utils::{id::TimelineId, lsn::Lsn};
4637 :
4638 : use crate::tenant::{
4639 : harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline,
4640 : };
4641 :
4642 1 : #[tokio::test]
4643 1 : async fn two_layer_eviction_attempts_at_the_same_time() {
4644 1 : let harness =
4645 1 : TenantHarness::create("two_layer_eviction_attempts_at_the_same_time").unwrap();
4646 1 :
4647 1 : let ctx = any_context();
4648 1 : let tenant = harness.try_load(&ctx).await.unwrap();
4649 1 : let timeline = tenant
4650 1 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
4651 1 : .await
4652 1 : .unwrap();
4653 1 :
4654 1 : let rtc = timeline
4655 1 : .remote_client
4656 1 : .clone()
4657 1 : .expect("just configured this");
4658 :
4659 1 : let layer = find_some_layer(&timeline).await;
4660 1 : let layer = layer
4661 1 : .keep_resident()
4662 UBC 0 : .await
4663 CBC 1 : .expect("no download => no downloading errors")
4664 1 : .expect("should had been resident")
4665 1 : .drop_eviction_guard();
4666 1 :
4667 1 : let first = async { layer.evict_and_wait(&rtc).await };
4668 1 : let second = async { layer.evict_and_wait(&rtc).await };
4669 :
4670 1 : let (first, second) = tokio::join!(first, second);
4671 :
4672 1 : let res = layer.keep_resident().await;
4673 1 : assert!(matches!(res, Ok(None)), "{res:?}");
4674 :
4675 1 : match (first, second) {
4676 UBC 0 : (Ok(()), Ok(())) => {
4677 0 : // because there are no more timeline locks being taken on eviction path, we can
4678 0 : // witness all three outcomes here.
4679 0 : }
4680 CBC 1 : (Ok(()), Err(EvictionError::NotFound)) | (Err(EvictionError::NotFound), Ok(())) => {
4681 1 : // if one completes before the other, this is fine just as well.
4682 1 : }
4683 UBC 0 : other => unreachable!("unexpected {:?}", other),
4684 : }
4685 : }
4686 :
4687 CBC 1 : fn any_context() -> crate::context::RequestContext {
4688 1 : use crate::context::*;
4689 1 : use crate::task_mgr::*;
4690 1 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
4691 1 : }
4692 :
4693 1 : async fn find_some_layer(timeline: &Timeline) -> Layer {
4694 1 : let layers = timeline.layers.read().await;
4695 1 : let desc = layers
4696 1 : .layer_map()
4697 1 : .iter_historic_layers()
4698 1 : .next()
4699 1 : .expect("must find one layer to evict");
4700 1 :
4701 1 : layers.get_from_desc(&desc)
4702 1 : }
4703 : }
|